Related with issue 35832: ImportSampleData now supports multithreading
authorAugusto Mauch <augusto.mauch@openbravo.com>
Wed, 26 Apr 2017 17:47:03 +0200
changeset 943 c6fa7da9db9b
parent 942 312e5d89376c
child 944 bfdec7f711d3
Related with issue 35832: ImportSampleData now supports multithreading

The ImportSampleData class now supports defining the number of threads that will be used to import the files. This can be done by setting the max.threads parameter in the ant task that ends up invoking the ImportSampleData task, for instance:

ant install.source -Dmax.threads=1

It uses the default Platform's implementation, so if the max.threads parameter is not provided, it will use nAvailableCPUs/2.
src/org/openbravo/ddlutils/task/ImportSampledata.java
--- a/src/org/openbravo/ddlutils/task/ImportSampledata.java	Thu May 25 14:45:07 2017 +0200
+++ b/src/org/openbravo/ddlutils/task/ImportSampledata.java	Wed Apr 26 17:47:03 2017 +0200
@@ -24,6 +24,9 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.beanutils.DynaBean;
 import org.apache.commons.dbcp.BasicDataSource;
@@ -35,6 +38,7 @@
 import org.apache.ddlutils.model.Database;
 import org.apache.ddlutils.model.Table;
 import org.apache.ddlutils.platform.postgresql.PostgreSqlDatabaseDataIO;
+import org.apache.log4j.Logger;
 import org.apache.tools.ant.BuildException;
 import org.openbravo.ddlutils.util.DBSMOBUtil;
 import org.openbravo.ddlutils.util.OBDatasetTable;
@@ -52,6 +56,7 @@
   private static final String POSTGRE_RDBMS = "POSTGRE";
 
   private boolean executeModuleScripts = true;
+  private int threads = 0;
   private String rdbms;
 
   public ImportSampledata() {
@@ -67,6 +72,7 @@
     final Platform platform = PlatformFactory.createNewPlatformInstance(ds);
     // default value defined for a column should be used on missing data
     platform.setOverrideDefaultValueOnMissingData(false);
+    platform.setMaxThreads(threads);
 
     try {
 
@@ -147,19 +153,23 @@
           getLog().debug("Number of files read: " + files.size());
 
           getLog().info("Inserting data into the database...");
-
+          ExecutorService es = createExecutorService("Sampledata import", platform);
           for (int i = 0; i < files.size(); i++) {
             File f = files.get(i);
-            getLog().debug("Importing data from file: " + files.get(i).getName());
-            if (f.getName().endsWith(".xml")) {
-              importXmlFile(f, platform, db);
-            } else if (f.getName().endsWith(".copy")) {
-              if (POSTGRE_RDBMS.equals(rdbms)) {
-                importPgCopyFile(f, platform);
-              } else {
-                getLog().warn("File " + f.getName() + " cannot be imported in Oracle");
-              }
-            }
+            getLog().debug("Queueing data import from file: " + files.get(i).getName());
+            ImportRunner ir = new ImportRunner(getLog(), platform, db, f, rdbms);
+            es.execute(ir);
+          }
+          es.shutdown();
+          boolean ok;
+          try {
+            // Wait until all the tables have been imported, or until 24 hours have passed
+            ok = es.awaitTermination(24, TimeUnit.HOURS);
+          } catch (InterruptedException e) {
+            throw new RuntimeException("InterruptedException in ");
+          }
+          if (!ok) {
+            throw new RuntimeException("Didn't finish in timeout");
           }
         }
       }
@@ -299,20 +309,81 @@
     this.executeModuleScripts = executeModuleScripts;
   }
 
-  private void importXmlFile(File file, Platform platform, Database db) {
-    final DatabaseDataIO dbdio = new DatabaseDataIO();
-    dbdio.setEnsureFKOrder(false);
-    DataReader dataReader = null;
-    dbdio.setUseBatchMode(true);
-    dataReader = dbdio.getConfiguredDataReader(platform, db);
-    dataReader.getSink().start();
-    dbdio.writeDataToDatabase(dataReader, file);
-    dataReader.getSink().end();
+  public void setThreads(int threads) {
+    this.threads = threads;
   }
 
-  private void importPgCopyFile(File file, Platform platform) {
-    final PostgreSqlDatabaseDataIO dbdio = new PostgreSqlDatabaseDataIO();
-    dbdio.importCopyFile(file, platform);
+  /**
+   * Creates an executor service that will allow using several threads (up to half the number of
+   * available processor) to execute tasks in parallel
+   * 
+   * @param name
+   *          The name of the executor. It will only be used with logging purposes
+   * @return the executor service
+   */
+  private ExecutorService createExecutorService(String name, Platform platform) {
+    int numThreads = platform.getMaxThreads();
+    getLog().info("Using " + numThreads + " threads for: " + name);
+    ExecutorService es = null;
+    if (numThreads == 1) {
+      getLog().debug("Starting single-threaded ExecutorService for " + name);
+      es = Executors.newSingleThreadExecutor();
+    } else {
+      getLog().debug("Starting ExecutorService with " + numThreads + " threads for " + name);
+      es = Executors.newFixedThreadPool(numThreads);
+    }
+    return es;
+  }
+
+  /**
+   * Runnable class that will read data from a file and will write it in the database
+   *
+   */
+  private static class ImportRunner implements Runnable {
+    private final Logger log;
+    private final Platform platform;
+    private final Database db;
+    private final File file;
+    private final String rdbms;
+
+    public ImportRunner(Logger log, Platform platform, Database db, File file, String rdbms) {
+      this.log = log;
+      this.platform = platform;
+      this.db = db;
+      this.file = file;
+      this.rdbms = rdbms;
+    }
+
+    @Override
+    public void run() {
+      String fileName = file.getName();
+      log.debug("Importing data from file: " + fileName);
+      if (fileName.endsWith(".xml")) {
+        importXmlFile();
+      } else if (fileName.endsWith(".copy")) {
+        if (POSTGRE_RDBMS.equals(rdbms)) {
+          importPgCopyFile();
+        } else {
+          log.warn("File " + fileName + " cannot be imported in Oracle");
+        }
+      }
+    }
+
+    private void importXmlFile() {
+      final DatabaseDataIO dbdio = new DatabaseDataIO();
+      dbdio.setEnsureFKOrder(false);
+      DataReader dataReader = null;
+      dbdio.setUseBatchMode(true);
+      dataReader = dbdio.getConfiguredDataReader(platform, db);
+      dataReader.getSink().start();
+      dbdio.writeDataToDatabase(dataReader, file);
+      dataReader.getSink().end();
+    }
+
+    private void importPgCopyFile() {
+      final PostgreSqlDatabaseDataIO dbdio = new PostgreSqlDatabaseDataIO();
+      dbdio.importCopyFile(file, platform);
+    }
   }
 
 }