Related to issue Related to issue 29766: Retail Operations Buffer: store all transactions in operations table before processing
authorMartin Taal <martin.taal@openbravo.com>
Fri, 05 Jun 2015 09:04:54 +0200
changeset 26861 39e73c518b14
parent 26860 0e4099b33747
child 26862 aec312b262e2
Related to issue Related to issue 29766: Retail Operations Buffer: store all transactions in operations table before processing
Only read all non-clob columns from c_import_entry to prevent oracle driver reserving large memory chunks. Still read all other
properties as they can be used by derived classes to decide to handle or not to handle a specific import entry.
src/org/openbravo/service/importprocess/ImportEntryManager.java
src/org/openbravo/service/importprocess/ImportEntryProcessor.java
--- a/src/org/openbravo/service/importprocess/ImportEntryManager.java	Thu Jun 04 23:31:13 2015 +0000
+++ b/src/org/openbravo/service/importprocess/ImportEntryManager.java	Fri Jun 05 09:04:54 2015 +0200
@@ -22,6 +22,7 @@
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -44,11 +45,13 @@
 import org.hibernate.Query;
 import org.hibernate.ScrollMode;
 import org.hibernate.ScrollableResults;
+import org.openbravo.base.model.Entity;
+import org.openbravo.base.model.ModelProvider;
+import org.openbravo.base.model.Property;
 import org.openbravo.base.provider.OBProvider;
 import org.openbravo.dal.core.OBContext;
 import org.openbravo.dal.core.SessionHandler;
 import org.openbravo.dal.service.OBDal;
-import org.openbravo.dal.service.OBQuery;
 import org.openbravo.model.ad.access.Role;
 
 /**
@@ -325,21 +328,19 @@
     managerThread.doNotify();
   }
 
-  private void handleImportEntry(ImportEntry importEntry) {
+  private void handleImportEntry(ImportEntryInformation importEntry) {
 
     try {
       ImportEntryProcessor entryProcessor = getImportEntryProcessor(importEntry.getTypeofdata());
       if (entryProcessor == null) {
-        log.warn("No import entry processor defined for type of data "
-            + importEntry.getTypeofdata() + " with json " + importEntry.getJsonInfo()
-            + " imported on " + importEntry.getImported() + " by " + importEntry.getCreatedBy());
+        log.warn("No import entry processor defined for type of data " + importEntry);
       } else {
         entryProcessor.handleImportEntry(importEntry);
       }
     } catch (Throwable t) {
       log.error(
-          "Error while saving import message " + importEntry + " " + importEntry.getJsonInfo()
-              + "  message: " + t.getMessage(), t);
+          "Error while saving import message " + importEntry + " " + "  message: " + t.getMessage(),
+          t);
       setImportEntryErrorIndependent(importEntry.getId(), t);
     }
   }
@@ -457,6 +458,14 @@
 
       Thread.currentThread().setName("Import Entry Manager Main");
 
+      // see ImportEntryInformation javadoc, we don't want to read the
+      // large json/error info columns, but want to have all the other
+      // normal columns, these are computed and stored in the 2 variables.
+      final List<Property> importEntryProperties = ImportEntryInformation
+          .getImportEntryProperties();
+      final String importEntrySelectClause = ImportEntryInformation
+          .getQuerySelectClause(importEntryProperties);
+
       // don't start right away at startup, give the system time to
       // really start
       log.debug("Started, first sleep " + manager.initialWaitTime);
@@ -466,6 +475,7 @@
       }
       log.debug("Run loop started");
       try {
+        List<String> typesOfData = null;
         while (true) {
           // obcontext cleared or wrong obcontext, repair
           if (OBContext.getOBContext() == null
@@ -482,7 +492,6 @@
               continue;
             }
 
-            List<String> typesOfData = null;
             if (typesOfData == null) {
               typesOfData = ImportProcessUtils.getOrderedTypesOfData();
             }
@@ -497,30 +506,30 @@
               // don't block eachother with the limited batch size
               // being read
               for (String typeOfData : typesOfData) {
-                OBQuery<ImportEntry> entriesQry = OBDal.getInstance().createQuery(
-                    ImportEntry.class,
-                    ImportEntry.PROPERTY_TYPEOFDATA + "='" + typeOfData + "' and "
-                        + ImportEntry.PROPERTY_IMPORTSTATUS + "='Initial' order by "
-                        + ImportEntry.PROPERTY_CREATIONDATE);
-                entriesQry.setFilterOnReadableClients(false);
-                entriesQry.setFilterOnReadableOrganization(false);
-                entriesQry.setMaxResult(manager.importBatchSize);
+                final String importEntryQryStr = "select " + importEntrySelectClause + " from "
+                    + ImportEntry.ENTITY_NAME + " where " + ImportEntry.PROPERTY_TYPEOFDATA + "='"
+                    + typeOfData + "' and " + ImportEntry.PROPERTY_IMPORTSTATUS
+                    + "='Initial' order by " + ImportEntry.PROPERTY_CREATIONDATE;
+
+                final Query entriesQry = OBDal.getInstance().getSession()
+                    .createQuery(importEntryQryStr);
+                entriesQry.setMaxResults(manager.importBatchSize);
 
                 final ScrollableResults entries = entriesQry.scroll(ScrollMode.FORWARD_ONLY);
                 while (entries.next()) {
                   entryCount++;
-                  final ImportEntry importEntry = (ImportEntry) entries.get()[0];
+                  final Object[] values = (Object[]) entries.get();
+                  final ImportEntryInformation importEntryInformation = new ImportEntryInformation(
+                      importEntryProperties, values);
                   try {
-                    manager.handleImportEntry(importEntry);
+                    manager.handleImportEntry(importEntryInformation);
                   } catch (Throwable t) {
                     // ImportEntryProcessors are custom implementations which can cause
                     // errors, so always catch them to prevent other import entries
                     // from not getting processed
-                    manager.setImportEntryError(importEntry.getId(), t);
+                    manager.setImportEntryError(importEntryInformation.getId(), t);
                     OBDal.getInstance().flush();
                   }
-                  // get rid of it to keep the session small
-                  OBDal.getInstance().getSession().evict(importEntry);
                 }
               }
 
@@ -638,6 +647,110 @@
       thread.setDaemon(true);
       return thread;
     }
+  }
 
+  /**
+   * A representation of the import entry with only the main fields. This is used when reading the
+   * import entries in the main loop. In this main thread/loop we don't want to read the large
+   * json/error info blobs therefore only the properties which are not soo large are read. For the
+   * foreign key properties only the id of the referenced record are read. So for example the
+   * {@link ImportEntryInformation} has a getOrgId method but not a getOrganization method.
+   * 
+   * The {@link ImportEntryInformation} has getters for the common properties defined in core for
+   * any properties/columns added by modules use the getValue method with the property name you are
+   * looking for.
+   * 
+   * The property names can be found in the {@link ImportEntry} class in the static property name
+   * strings, for example {@link ImportEntry#PROPERTY_ORGANIZATION}.
+   */
+  public static class ImportEntryInformation {
+    private Map<String, Object> values;
+
+    private static List<Property> getImportEntryProperties() {
+      final Entity importEntryEntity = ModelProvider.getInstance().getEntity(ImportEntry.class);
+      final List<Property> result = new ArrayList<Property>();
+      for (Property p : importEntryEntity.getProperties()) {
+        if (p.isOneToMany() || p.isInactive()) {
+          continue;
+        }
+        // skip the big ones!
+        if (p.getFieldLength() > 10000 || ImportEntry.PROPERTY_JSONINFO.equals(p.getName())
+            || ImportEntry.PROPERTY_ERRORINFO.equals(p.getName())) {
+          continue;
+        }
+        result.add(p);
+      }
+      return result;
+    }
+
+    private static String getQuerySelectClause(List<Property> props) {
+      final StringBuilder sb = new StringBuilder();
+      for (Property p : props) {
+        if (sb.length() > 0) {
+          sb.append(", ");
+        }
+        sb.append(p.getName());
+        if (!p.isPrimitive()) {
+          sb.append(".id");
+        }
+      }
+      return sb.toString();
+    }
+
+    private ImportEntryInformation(List<Property> props, Object[] objectValues) {
+      int i = 0;
+      values = new HashMap<String, Object>();
+      for (Property p : props) {
+        values.put(p.getName(), objectValues[i++]);
+      }
+    }
+
+    public String getOrgId() {
+      return (String) values.get(ImportEntry.PROPERTY_ORGANIZATION);
+    }
+
+    public String getClientId() {
+      return (String) values.get(ImportEntry.PROPERTY_CLIENT);
+    }
+
+    public Date getCreated() {
+      return (Date) values.get(ImportEntry.PROPERTY_CREATIONDATE);
+    }
+
+    public Date getUpdated() {
+      return (Date) values.get(ImportEntry.PROPERTY_UPDATED);
+    }
+
+    public String getCreatedBy() {
+      return (String) values.get(ImportEntry.PROPERTY_CREATEDBY);
+    }
+
+    public String getUpdatedBy() {
+      return (String) values.get(ImportEntry.PROPERTY_UPDATEDBY);
+    }
+
+    public String getImportStatus() {
+      return (String) values.get(ImportEntry.PROPERTY_IMPORTSTATUS);
+    }
+
+    public String getTypeofdata() {
+      return (String) values.get(ImportEntry.PROPERTY_TYPEOFDATA);
+    }
+
+    public String getId() {
+      return (String) values.get(ImportEntry.PROPERTY_ID);
+    }
+
+    public String getRoleId() {
+      return (String) values.get(ImportEntry.PROPERTY_ROLE);
+    }
+
+    public Object getValue(String field) {
+      return values.get(field);
+    }
+
+    public String toString() {
+      return values.toString();
+    }
   }
 }
--- a/src/org/openbravo/service/importprocess/ImportEntryProcessor.java	Thu Jun 04 23:31:13 2015 +0000
+++ b/src/org/openbravo/service/importprocess/ImportEntryProcessor.java	Fri Jun 05 09:04:54 2015 +0200
@@ -34,11 +34,11 @@
 import org.apache.log4j.Logger;
 import org.openbravo.base.secureApp.VariablesSecureApp;
 import org.openbravo.client.kernel.RequestContext;
-import org.openbravo.dal.core.DalUtil;
 import org.openbravo.dal.core.OBContext;
 import org.openbravo.dal.core.SessionHandler;
 import org.openbravo.dal.service.OBDal;
 import org.openbravo.model.common.enterprise.Organization;
+import org.openbravo.service.importprocess.ImportEntryManager.ImportEntryInformation;
 
 /**
  * The {@link ImportEntryProcessor} is responsible for importing/processing {@link ImportEntry}
@@ -50,9 +50,9 @@
  * 
  * It is important that a specific ImportEntry is assigned to the right processing thread to prevent
  * for example deadlocks in the database. To make this possible a concept of
- * {@link #getProcessSelectionKey(ImportEntry)} is used. The process selection key is a unique key
- * derived from the {@link ImportEntry} which can be used to create/identify the thread which should
- * process the {@link ImportEntry}. If no such thread exists a new
+ * {@link #getProcessSelectionKey(ImportEntryInformation)} is used. The process selection key is a
+ * unique key derived from the {@link ImportEntry} which can be used to create/identify the thread
+ * which should process the {@link ImportEntry}. If no such thread exists a new
  * {@link ImportEntryProcessRunnable} is created. The exact type of
  * {@link ImportEntryProcessRunnable} is determined by the extending subclass through the
  * {@link #createImportEntryProcessRunnable()} method.
@@ -60,19 +60,19 @@
  * For example if ImportEntry records of the same organization should be processed after each other
  * (so not in parallel) to prevent DB deadlocks, this means that the records of the same
  * organization should be assigned to the same thread object. So that they are indeed processed
- * sequential and not in parallel. The {@link #getProcessSelectionKey(ImportEntry)} should in this
- * case return the {@link Organization#getId()} so that {@link ImportEntryProcessRunnable} are
- * keyed/registered using the organization. Other {@link ImportEntry} records of the same
- * organization are then processed by the same thread, always sequential, not parallel, preventing
- * DB deadlocks.
+ * sequential and not in parallel. The {@link #getProcessSelectionKey(ImportEntryInformation)}
+ * should in this case return the {@link Organization#getId()} so that
+ * {@link ImportEntryProcessRunnable} are keyed/registered using the organization. Other
+ * {@link ImportEntry} records of the same organization are then processed by the same thread,
+ * always sequential, not parallel, preventing DB deadlocks.
  * 
  * The {@link ImportEntryManager} passes new {@link ImportEntry} records to the the
- * {@link ImportEntryProcessor} by calling its {@link #handleImportEntry(ImportEntry)}. The
- * {@link ImportEntryProcessor} then can decide how to handle this {@link ImportEntry}, create a new
- * thread or assign it to an existing thread (which is busy processing previous entries). This is
- * all done in this generic class. An implementing subclass needs to implement the
- * {@link #getProcessSelectionKey(ImportEntry)} method. This method determines which/how the correct
- * {@link ImportEntryProcessRunnable} is chosen.
+ * {@link ImportEntryProcessor} by calling its {@link #handleImportEntry(ImportEntryInformation)}.
+ * The {@link ImportEntryProcessor} then can decide how to handle this {@link ImportEntry}, create a
+ * new thread or assign it to an existing thread (which is busy processing previous entries). This
+ * is all done in this generic class. An implementing subclass needs to implement the
+ * {@link #getProcessSelectionKey(ImportEntryInformation)} method. This method determines which/how
+ * the correct {@link ImportEntryProcessRunnable} is chosen.
  * 
  * The default/base implementation of the {@link ImportEntryProcessRunnable} provides standard
  * features related to caching of {@link OBContext}, error handling and transaction handling.
@@ -136,7 +136,7 @@
    * implementation should check if the {@link ImportEntry} was possibly already handled and ignore
    * it then.
    */
-  public void handleImportEntry(ImportEntry importEntry) {
+  public void handleImportEntry(ImportEntryInformation importEntry) {
 
     if (!canHandleImportEntry(importEntry)) {
       return;
@@ -152,7 +152,7 @@
 
   // synchronized to handle the case that a thread tries to deregister
   // itself at the same time
-  protected synchronized void assignEntryToThread(String key, ImportEntry importEntry) {
+  protected synchronized void assignEntryToThread(String key, ImportEntryInformation importEntry) {
 
     // runnables is a concurrent hashmap
     ImportEntryProcessRunnable runnable = runnables.get(key);
@@ -220,14 +220,14 @@
    * {@link ImportEntryManager} thread and then offered again to this {@link ImportEntryProcessor}
    * to be processed.
    */
-  protected abstract boolean canHandleImportEntry(ImportEntry importEntry);
+  protected abstract boolean canHandleImportEntry(ImportEntryInformation importEntryInformation);
 
   /**
    * Based on the {@link ImportEntry} returns a key which uniquely identifies the thread which
    * should process this {@link ImportEntry}. Can be used to place import entries which block/use
    * the same records in the same import thread, in this way preventing DB (dead)locks.
    */
-  protected abstract String getProcessSelectionKey(ImportEntry importEntry);
+  protected abstract String getProcessSelectionKey(ImportEntryInformation importEntry);
 
   /**
    * The default implementation of the ImportEntryProcessRunnable. It performs the following
@@ -302,8 +302,13 @@
               }
 
               // not changed, process
+              final String typeOfData = localImportEntry.getTypeofdata();
               processEntry(localImportEntry);
 
+              // don't use the import entry anymore, touching methods on it
+              // may re-open a session
+              localImportEntry = null;
+
               // processed so can be removed
               importEntryIds.remove(queuedImportEntry.importEntryId);
 
@@ -311,7 +316,7 @@
               cnt++;
               final long timeForEntry = (System.currentTimeMillis() - t0);
               totalT += timeForEntry;
-              importEntryManager.reportStats(localImportEntry.getTypeofdata(), timeForEntry);
+              importEntryManager.reportStats(typeOfData, timeForEntry);
               if ((cnt % 100) == 0 && logger.isDebugEnabled()) {
                 logger.debug("Runnable: " + key + ", processed " + cnt + " import entries in "
                     + totalT + " millis, " + (totalT / cnt)
@@ -322,7 +327,7 @@
               if (SessionHandler.isSessionHandlerPresent()) {
                 // change to warning if the code in the subclasses really works correctly
                 logger
-                    .debug("Session handler present after processing import entry, this indicates that the processing code "
+                    .warn("Session handler present after processing import entry, this indicates that the processing code "
                         + "does not correctly clean/close the session after its last actions. This should be fixed.");
                 OBDal.getInstance().commitAndClose();
               }
@@ -428,7 +433,7 @@
     }
 
     // is called by the processor in the main EntityManagerThread
-    private void addEntry(ImportEntry importEntry) {
+    private void addEntry(ImportEntryInformation importEntry) {
 
       // ignore the entry, queue is too large
       // prevents memory problems
@@ -464,13 +469,12 @@
       final String clientId;
       final String roleId;
 
-      QueuedEntry(ImportEntry importEntry) {
+      QueuedEntry(ImportEntryInformation importEntry) {
         importEntryId = importEntry.getId();
-        userId = (String) DalUtil.getId(importEntry.getCreatedBy());
-        orgId = (String) DalUtil.getId(importEntry.getOrganization());
-        roleId = importEntry.getRole() == null ? null : (String) DalUtil.getId(importEntry
-            .getRole());
-        clientId = (String) DalUtil.getId(importEntry.getClient());
+        userId = importEntry.getCreatedBy();
+        orgId = importEntry.getOrgId();
+        roleId = importEntry.getRoleId();
+        clientId = importEntry.getClientId();
       }
     }
   }