Related to issue Related to issue 29766: Retail Operations Buffer: store all transactions in operations table before processing
authorMartin Taal <martin.taal@openbravo.com>
Sat, 06 Jun 2015 19:47:51 +0200
changeset 26866 8c14e3937b60
parent 26865 1d33bf55d2dc
child 26867 25f553479e35
child 28305 f8ac0641f008
Related to issue Related to issue 29766: Retail Operations Buffer: store all transactions in operations table before processing
Again read the complete import entry, much simpler interface and the previous oom seemed to be related to not closing the scrollableresults
src/org/openbravo/service/importprocess/ImportEntryManager.java
src/org/openbravo/service/importprocess/ImportEntryProcessor.java
--- a/src/org/openbravo/service/importprocess/ImportEntryManager.java	Sat Jun 06 14:51:26 2015 +0000
+++ b/src/org/openbravo/service/importprocess/ImportEntryManager.java	Sat Jun 06 19:47:51 2015 +0200
@@ -22,7 +22,6 @@
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -45,9 +44,6 @@
 import org.hibernate.Query;
 import org.hibernate.ScrollMode;
 import org.hibernate.ScrollableResults;
-import org.openbravo.base.model.Entity;
-import org.openbravo.base.model.ModelProvider;
-import org.openbravo.base.model.Property;
 import org.openbravo.base.provider.OBProvider;
 import org.openbravo.dal.core.OBContext;
 import org.openbravo.dal.core.SessionHandler;
@@ -328,7 +324,7 @@
     managerThread.doNotify();
   }
 
-  private void handleImportEntry(ImportEntryInformation importEntry) {
+  private void handleImportEntry(ImportEntry importEntry) {
 
     try {
       ImportEntryProcessor entryProcessor = getImportEntryProcessor(importEntry.getTypeofdata());
@@ -458,14 +454,6 @@
 
       Thread.currentThread().setName("Import Entry Manager Main");
 
-      // see ImportEntryInformation javadoc, we don't want to read the
-      // large json/error info columns, but want to have all the other
-      // normal columns, these are computed and stored in the 2 variables.
-      final List<Property> importEntryProperties = ImportEntryInformation
-          .getImportEntryProperties();
-      final String importEntrySelectClause = ImportEntryInformation
-          .getQuerySelectClause(importEntryProperties);
-
       // don't start right away at startup, give the system time to
       // really start
       log.debug("Started, first sleep " + manager.initialWaitTime);
@@ -506,10 +494,10 @@
               // don't block eachother with the limited batch size
               // being read
               for (String typeOfData : typesOfData) {
-                final String importEntryQryStr = "select " + importEntrySelectClause + " from "
-                    + ImportEntry.ENTITY_NAME + " where " + ImportEntry.PROPERTY_TYPEOFDATA + "='"
-                    + typeOfData + "' and " + ImportEntry.PROPERTY_IMPORTSTATUS
-                    + "='Initial' order by " + ImportEntry.PROPERTY_CREATIONDATE;
+                final String importEntryQryStr = "from " + ImportEntry.ENTITY_NAME + " where "
+                    + ImportEntry.PROPERTY_TYPEOFDATA + "='" + typeOfData + "' and "
+                    + ImportEntry.PROPERTY_IMPORTSTATUS + "='Initial' order by "
+                    + ImportEntry.PROPERTY_CREATIONDATE;
 
                 final Query entriesQry = OBDal.getInstance().getSession()
                     .createQuery(importEntryQryStr);
@@ -521,16 +509,14 @@
                 try {
                   while (entries.next()) {
                     entryCount++;
-                    final Object[] values = (Object[]) entries.get();
-                    final ImportEntryInformation importEntryInformation = new ImportEntryInformation(
-                        importEntryProperties, values);
+                    final ImportEntry entry = (ImportEntry) entries.get(0);
                     try {
-                      manager.handleImportEntry(importEntryInformation);
+                      manager.handleImportEntry(entry);
                     } catch (Throwable t) {
                       // ImportEntryProcessors are custom implementations which can cause
                       // errors, so always catch them to prevent other import entries
                       // from not getting processed
-                      manager.setImportEntryError(importEntryInformation.getId(), t);
+                      manager.setImportEntryError(entry.getId(), t);
                       OBDal.getInstance().flush();
                     }
                   }
@@ -654,109 +640,4 @@
       return thread;
     }
   }
-
-  /**
-   * A representation of the import entry with only the main fields. This is used when reading the
-   * import entries in the main loop. In this main thread/loop we don't want to read the large
-   * json/error info blobs therefore only the properties which are not soo large are read. For the
-   * foreign key properties only the id of the referenced record are read. So for example the
-   * {@link ImportEntryInformation} has a getOrgId method but not a getOrganization method.
-   * 
-   * The {@link ImportEntryInformation} has getters for the common properties defined in core for
-   * any properties/columns added by modules use the getValue method with the property name you are
-   * looking for.
-   * 
-   * The property names can be found in the {@link ImportEntry} class in the static property name
-   * strings, for example {@link ImportEntry#PROPERTY_ORGANIZATION}.
-   */
-  public static class ImportEntryInformation {
-    private Map<String, Object> values;
-
-    private static List<Property> getImportEntryProperties() {
-      final Entity importEntryEntity = ModelProvider.getInstance().getEntity(ImportEntry.class);
-      final List<Property> result = new ArrayList<Property>();
-      for (Property p : importEntryEntity.getProperties()) {
-        if (p.isOneToMany() || p.isInactive()) {
-          continue;
-        }
-        // skip the big ones!
-        if (p.getFieldLength() > 10000 || ImportEntry.PROPERTY_JSONINFO.equals(p.getName())
-            || ImportEntry.PROPERTY_ERRORINFO.equals(p.getName())) {
-          continue;
-        }
-        result.add(p);
-      }
-      return result;
-    }
-
-    private static String getQuerySelectClause(List<Property> props) {
-      final StringBuilder sb = new StringBuilder();
-      for (Property p : props) {
-        if (sb.length() > 0) {
-          sb.append(", ");
-        }
-        sb.append(p.getName());
-        if (!p.isPrimitive()) {
-          sb.append(".id");
-        }
-      }
-      return sb.toString();
-    }
-
-    private ImportEntryInformation(List<Property> props, Object[] objectValues) {
-      int i = 0;
-      values = new HashMap<String, Object>();
-      for (Property p : props) {
-        values.put(p.getName(), objectValues[i++]);
-      }
-    }
-
-    public String getOrgId() {
-      return (String) values.get(ImportEntry.PROPERTY_ORGANIZATION);
-    }
-
-    public String getClientId() {
-      return (String) values.get(ImportEntry.PROPERTY_CLIENT);
-    }
-
-    public Date getCreated() {
-      return (Date) values.get(ImportEntry.PROPERTY_CREATIONDATE);
-    }
-
-    public Date getUpdated() {
-      return (Date) values.get(ImportEntry.PROPERTY_UPDATED);
-    }
-
-    public String getCreatedBy() {
-      return (String) values.get(ImportEntry.PROPERTY_CREATEDBY);
-    }
-
-    public String getUpdatedBy() {
-      return (String) values.get(ImportEntry.PROPERTY_UPDATEDBY);
-    }
-
-    public String getImportStatus() {
-      return (String) values.get(ImportEntry.PROPERTY_IMPORTSTATUS);
-    }
-
-    public String getTypeofdata() {
-      return (String) values.get(ImportEntry.PROPERTY_TYPEOFDATA);
-    }
-
-    public String getId() {
-      return (String) values.get(ImportEntry.PROPERTY_ID);
-    }
-
-    public String getRoleId() {
-      return (String) values.get(ImportEntry.PROPERTY_ROLE);
-    }
-
-    public Object getValue(String field) {
-      return values.get(field);
-    }
-
-    public String toString() {
-      return values.toString();
-    }
-  }
 }
--- a/src/org/openbravo/service/importprocess/ImportEntryProcessor.java	Sat Jun 06 14:51:26 2015 +0000
+++ b/src/org/openbravo/service/importprocess/ImportEntryProcessor.java	Sat Jun 06 19:47:51 2015 +0200
@@ -34,11 +34,11 @@
 import org.apache.log4j.Logger;
 import org.openbravo.base.secureApp.VariablesSecureApp;
 import org.openbravo.client.kernel.RequestContext;
+import org.openbravo.dal.core.DalUtil;
 import org.openbravo.dal.core.OBContext;
 import org.openbravo.dal.core.SessionHandler;
 import org.openbravo.dal.service.OBDal;
 import org.openbravo.model.common.enterprise.Organization;
-import org.openbravo.service.importprocess.ImportEntryManager.ImportEntryInformation;
 
 /**
  * The {@link ImportEntryProcessor} is responsible for importing/processing {@link ImportEntry}
@@ -50,9 +50,9 @@
  * 
  * It is important that a specific ImportEntry is assigned to the right processing thread to prevent
  * for example deadlocks in the database. To make this possible a concept of
- * {@link #getProcessSelectionKey(ImportEntryInformation)} is used. The process selection key is a
- * unique key derived from the {@link ImportEntry} which can be used to create/identify the thread
- * which should process the {@link ImportEntry}. If no such thread exists a new
+ * {@link #getProcessSelectionKey(ImportEntry)} is used. The process selection key is a unique key
+ * derived from the {@link ImportEntry} which can be used to create/identify the thread which should
+ * process the {@link ImportEntry}. If no such thread exists a new
  * {@link ImportEntryProcessRunnable} is created. The exact type of
  * {@link ImportEntryProcessRunnable} is determined by the extending subclass through the
  * {@link #createImportEntryProcessRunnable()} method.
@@ -60,19 +60,19 @@
  * For example if ImportEntry records of the same organization should be processed after each other
  * (so not in parallel) to prevent DB deadlocks, this means that the records of the same
  * organization should be assigned to the same thread object. So that they are indeed processed
- * sequential and not in parallel. The {@link #getProcessSelectionKey(ImportEntryInformation)}
- * should in this case return the {@link Organization#getId()} so that
- * {@link ImportEntryProcessRunnable} are keyed/registered using the organization. Other
- * {@link ImportEntry} records of the same organization are then processed by the same thread,
- * always sequential, not parallel, preventing DB deadlocks.
+ * sequential and not in parallel. The {@link #getProcessSelectionKey(ImportEntry)} should in this
+ * case return the {@link Organization#getId()} so that {@link ImportEntryProcessRunnable} are
+ * keyed/registered using the organization. Other {@link ImportEntry} records of the same
+ * organization are then processed by the same thread, always sequential, not parallel, preventing
+ * DB deadlocks.
  * 
  * The {@link ImportEntryManager} passes new {@link ImportEntry} records to the the
- * {@link ImportEntryProcessor} by calling its {@link #handleImportEntry(ImportEntryInformation)}.
- * The {@link ImportEntryProcessor} then can decide how to handle this {@link ImportEntry}, create a
- * new thread or assign it to an existing thread (which is busy processing previous entries). This
- * is all done in this generic class. An implementing subclass needs to implement the
- * {@link #getProcessSelectionKey(ImportEntryInformation)} method. This method determines which/how
- * the correct {@link ImportEntryProcessRunnable} is chosen.
+ * {@link ImportEntryProcessor} by calling its {@link #handleImportEntry(ImportEntry)}. The
+ * {@link ImportEntryProcessor} then can decide how to handle this {@link ImportEntry}, create a new
+ * thread or assign it to an existing thread (which is busy processing previous entries). This is
+ * all done in this generic class. An implementing subclass needs to implement the
+ * {@link #getProcessSelectionKey(ImportEntry)} method. This method determines which/how the correct
+ * {@link ImportEntryProcessRunnable} is chosen.
  * 
  * The default/base implementation of the {@link ImportEntryProcessRunnable} provides standard
  * features related to caching of {@link OBContext}, error handling and transaction handling.
@@ -136,7 +136,7 @@
    * implementation should check if the {@link ImportEntry} was possibly already handled and ignore
    * it then.
    */
-  public void handleImportEntry(ImportEntryInformation importEntry) {
+  public void handleImportEntry(ImportEntry importEntry) {
 
     if (!canHandleImportEntry(importEntry)) {
       return;
@@ -152,7 +152,7 @@
 
   // synchronized to handle the case that a thread tries to deregister
   // itself at the same time
-  protected synchronized void assignEntryToThread(String key, ImportEntryInformation importEntry) {
+  protected synchronized void assignEntryToThread(String key, ImportEntry importEntry) {
 
     // runnables is a concurrent hashmap
     ImportEntryProcessRunnable runnable = runnables.get(key);
@@ -220,14 +220,14 @@
    * {@link ImportEntryManager} thread and then offered again to this {@link ImportEntryProcessor}
    * to be processed.
    */
-  protected abstract boolean canHandleImportEntry(ImportEntryInformation importEntryInformation);
+  protected abstract boolean canHandleImportEntry(ImportEntry importEntryInformation);
 
   /**
    * Based on the {@link ImportEntry} returns a key which uniquely identifies the thread which
    * should process this {@link ImportEntry}. Can be used to place import entries which block/use
    * the same records in the same import thread, in this way preventing DB (dead)locks.
    */
-  protected abstract String getProcessSelectionKey(ImportEntryInformation importEntry);
+  protected abstract String getProcessSelectionKey(ImportEntry importEntry);
 
   /**
    * The default implementation of the ImportEntryProcessRunnable. It performs the following
@@ -433,7 +433,7 @@
     }
 
     // is called by the processor in the main EntityManagerThread
-    private void addEntry(ImportEntryInformation importEntry) {
+    private void addEntry(ImportEntry importEntry) {
 
       // ignore the entry, queue is too large
       // prevents memory problems
@@ -469,12 +469,12 @@
       final String clientId;
       final String roleId;
 
-      QueuedEntry(ImportEntryInformation importEntry) {
+      QueuedEntry(ImportEntry importEntry) {
         importEntryId = importEntry.getId();
-        userId = importEntry.getCreatedBy();
-        orgId = importEntry.getOrgId();
-        roleId = importEntry.getRoleId();
-        clientId = importEntry.getClientId();
+        userId = (String) DalUtil.getId(importEntry.getCreatedBy());
+        orgId = (String) DalUtil.getId(importEntry.getOrganization());
+        roleId = (String) DalUtil.getId(importEntry.getRole());
+        clientId = (String) DalUtil.getId(importEntry.getClient());
       }
     }
   }