Related to issue Related to issue
29766: Retail Operations Buffer: store all transactions in operations table before processing
Again read the complete import entry, much simpler interface and the previous oom seemed to be related to not closing the scrollableresults
--- a/src/org/openbravo/service/importprocess/ImportEntryManager.java Sat Jun 06 14:51:26 2015 +0000
+++ b/src/org/openbravo/service/importprocess/ImportEntryManager.java Sat Jun 06 19:47:51 2015 +0200
@@ -22,7 +22,6 @@
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -45,9 +44,6 @@
import org.hibernate.Query;
import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
-import org.openbravo.base.model.Entity;
-import org.openbravo.base.model.ModelProvider;
-import org.openbravo.base.model.Property;
import org.openbravo.base.provider.OBProvider;
import org.openbravo.dal.core.OBContext;
import org.openbravo.dal.core.SessionHandler;
@@ -328,7 +324,7 @@
managerThread.doNotify();
}
- private void handleImportEntry(ImportEntryInformation importEntry) {
+ private void handleImportEntry(ImportEntry importEntry) {
try {
ImportEntryProcessor entryProcessor = getImportEntryProcessor(importEntry.getTypeofdata());
@@ -458,14 +454,6 @@
Thread.currentThread().setName("Import Entry Manager Main");
- // see ImportEntryInformation javadoc, we don't want to read the
- // large json/error info columns, but want to have all the other
- // normal columns, these are computed and stored in the 2 variables.
- final List<Property> importEntryProperties = ImportEntryInformation
- .getImportEntryProperties();
- final String importEntrySelectClause = ImportEntryInformation
- .getQuerySelectClause(importEntryProperties);
-
// don't start right away at startup, give the system time to
// really start
log.debug("Started, first sleep " + manager.initialWaitTime);
@@ -506,10 +494,10 @@
// don't block eachother with the limited batch size
// being read
for (String typeOfData : typesOfData) {
- final String importEntryQryStr = "select " + importEntrySelectClause + " from "
- + ImportEntry.ENTITY_NAME + " where " + ImportEntry.PROPERTY_TYPEOFDATA + "='"
- + typeOfData + "' and " + ImportEntry.PROPERTY_IMPORTSTATUS
- + "='Initial' order by " + ImportEntry.PROPERTY_CREATIONDATE;
+ final String importEntryQryStr = "from " + ImportEntry.ENTITY_NAME + " where "
+ + ImportEntry.PROPERTY_TYPEOFDATA + "='" + typeOfData + "' and "
+ + ImportEntry.PROPERTY_IMPORTSTATUS + "='Initial' order by "
+ + ImportEntry.PROPERTY_CREATIONDATE;
final Query entriesQry = OBDal.getInstance().getSession()
.createQuery(importEntryQryStr);
@@ -521,16 +509,14 @@
try {
while (entries.next()) {
entryCount++;
- final Object[] values = (Object[]) entries.get();
- final ImportEntryInformation importEntryInformation = new ImportEntryInformation(
- importEntryProperties, values);
+ final ImportEntry entry = (ImportEntry) entries.get(0);
try {
- manager.handleImportEntry(importEntryInformation);
+ manager.handleImportEntry(entry);
} catch (Throwable t) {
// ImportEntryProcessors are custom implementations which can cause
// errors, so always catch them to prevent other import entries
// from not getting processed
- manager.setImportEntryError(importEntryInformation.getId(), t);
+ manager.setImportEntryError(entry.getId(), t);
OBDal.getInstance().flush();
}
}
@@ -654,109 +640,4 @@
return thread;
}
}
-
- /**
- * A representation of the import entry with only the main fields. This is used when reading the
- * import entries in the main loop. In this main thread/loop we don't want to read the large
- * json/error info blobs therefore only the properties which are not soo large are read. For the
- * foreign key properties only the id of the referenced record are read. So for example the
- * {@link ImportEntryInformation} has a getOrgId method but not a getOrganization method.
- *
- * The {@link ImportEntryInformation} has getters for the common properties defined in core for
- * any properties/columns added by modules use the getValue method with the property name you are
- * looking for.
- *
- * The property names can be found in the {@link ImportEntry} class in the static property name
- * strings, for example {@link ImportEntry#PROPERTY_ORGANIZATION}.
- */
- public static class ImportEntryInformation {
- private Map<String, Object> values;
-
- private static List<Property> getImportEntryProperties() {
- final Entity importEntryEntity = ModelProvider.getInstance().getEntity(ImportEntry.class);
- final List<Property> result = new ArrayList<Property>();
- for (Property p : importEntryEntity.getProperties()) {
- if (p.isOneToMany() || p.isInactive()) {
- continue;
- }
- // skip the big ones!
- if (p.getFieldLength() > 10000 || ImportEntry.PROPERTY_JSONINFO.equals(p.getName())
- || ImportEntry.PROPERTY_ERRORINFO.equals(p.getName())) {
- continue;
- }
- result.add(p);
- }
- return result;
- }
-
- private static String getQuerySelectClause(List<Property> props) {
- final StringBuilder sb = new StringBuilder();
- for (Property p : props) {
- if (sb.length() > 0) {
- sb.append(", ");
- }
- sb.append(p.getName());
- if (!p.isPrimitive()) {
- sb.append(".id");
- }
- }
- return sb.toString();
- }
-
- private ImportEntryInformation(List<Property> props, Object[] objectValues) {
- int i = 0;
- values = new HashMap<String, Object>();
- for (Property p : props) {
- values.put(p.getName(), objectValues[i++]);
- }
- }
-
- public String getOrgId() {
- return (String) values.get(ImportEntry.PROPERTY_ORGANIZATION);
- }
-
- public String getClientId() {
- return (String) values.get(ImportEntry.PROPERTY_CLIENT);
- }
-
- public Date getCreated() {
- return (Date) values.get(ImportEntry.PROPERTY_CREATIONDATE);
- }
-
- public Date getUpdated() {
- return (Date) values.get(ImportEntry.PROPERTY_UPDATED);
- }
-
- public String getCreatedBy() {
- return (String) values.get(ImportEntry.PROPERTY_CREATEDBY);
- }
-
- public String getUpdatedBy() {
- return (String) values.get(ImportEntry.PROPERTY_UPDATEDBY);
- }
-
- public String getImportStatus() {
- return (String) values.get(ImportEntry.PROPERTY_IMPORTSTATUS);
- }
-
- public String getTypeofdata() {
- return (String) values.get(ImportEntry.PROPERTY_TYPEOFDATA);
- }
-
- public String getId() {
- return (String) values.get(ImportEntry.PROPERTY_ID);
- }
-
- public String getRoleId() {
- return (String) values.get(ImportEntry.PROPERTY_ROLE);
- }
-
- public Object getValue(String field) {
- return values.get(field);
- }
-
- public String toString() {
- return values.toString();
- }
- }
}
--- a/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Sat Jun 06 14:51:26 2015 +0000
+++ b/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Sat Jun 06 19:47:51 2015 +0200
@@ -34,11 +34,11 @@
import org.apache.log4j.Logger;
import org.openbravo.base.secureApp.VariablesSecureApp;
import org.openbravo.client.kernel.RequestContext;
+import org.openbravo.dal.core.DalUtil;
import org.openbravo.dal.core.OBContext;
import org.openbravo.dal.core.SessionHandler;
import org.openbravo.dal.service.OBDal;
import org.openbravo.model.common.enterprise.Organization;
-import org.openbravo.service.importprocess.ImportEntryManager.ImportEntryInformation;
/**
* The {@link ImportEntryProcessor} is responsible for importing/processing {@link ImportEntry}
@@ -50,9 +50,9 @@
*
* It is important that a specific ImportEntry is assigned to the right processing thread to prevent
* for example deadlocks in the database. To make this possible a concept of
- * {@link #getProcessSelectionKey(ImportEntryInformation)} is used. The process selection key is a
- * unique key derived from the {@link ImportEntry} which can be used to create/identify the thread
- * which should process the {@link ImportEntry}. If no such thread exists a new
+ * {@link #getProcessSelectionKey(ImportEntry)} is used. The process selection key is a unique key
+ * derived from the {@link ImportEntry} which can be used to create/identify the thread which should
+ * process the {@link ImportEntry}. If no such thread exists a new
* {@link ImportEntryProcessRunnable} is created. The exact type of
* {@link ImportEntryProcessRunnable} is determined by the extending subclass through the
* {@link #createImportEntryProcessRunnable()} method.
@@ -60,19 +60,19 @@
* For example if ImportEntry records of the same organization should be processed after each other
* (so not in parallel) to prevent DB deadlocks, this means that the records of the same
* organization should be assigned to the same thread object. So that they are indeed processed
- * sequential and not in parallel. The {@link #getProcessSelectionKey(ImportEntryInformation)}
- * should in this case return the {@link Organization#getId()} so that
- * {@link ImportEntryProcessRunnable} are keyed/registered using the organization. Other
- * {@link ImportEntry} records of the same organization are then processed by the same thread,
- * always sequential, not parallel, preventing DB deadlocks.
+ * sequential and not in parallel. The {@link #getProcessSelectionKey(ImportEntry)} should in this
+ * case return the {@link Organization#getId()} so that {@link ImportEntryProcessRunnable} are
+ * keyed/registered using the organization. Other {@link ImportEntry} records of the same
+ * organization are then processed by the same thread, always sequential, not parallel, preventing
+ * DB deadlocks.
*
* The {@link ImportEntryManager} passes new {@link ImportEntry} records to the the
- * {@link ImportEntryProcessor} by calling its {@link #handleImportEntry(ImportEntryInformation)}.
- * The {@link ImportEntryProcessor} then can decide how to handle this {@link ImportEntry}, create a
- * new thread or assign it to an existing thread (which is busy processing previous entries). This
- * is all done in this generic class. An implementing subclass needs to implement the
- * {@link #getProcessSelectionKey(ImportEntryInformation)} method. This method determines which/how
- * the correct {@link ImportEntryProcessRunnable} is chosen.
+ * {@link ImportEntryProcessor} by calling its {@link #handleImportEntry(ImportEntry)}. The
+ * {@link ImportEntryProcessor} then can decide how to handle this {@link ImportEntry}, create a new
+ * thread or assign it to an existing thread (which is busy processing previous entries). This is
+ * all done in this generic class. An implementing subclass needs to implement the
+ * {@link #getProcessSelectionKey(ImportEntry)} method. This method determines which/how the correct
+ * {@link ImportEntryProcessRunnable} is chosen.
*
* The default/base implementation of the {@link ImportEntryProcessRunnable} provides standard
* features related to caching of {@link OBContext}, error handling and transaction handling.
@@ -136,7 +136,7 @@
* implementation should check if the {@link ImportEntry} was possibly already handled and ignore
* it then.
*/
- public void handleImportEntry(ImportEntryInformation importEntry) {
+ public void handleImportEntry(ImportEntry importEntry) {
if (!canHandleImportEntry(importEntry)) {
return;
@@ -152,7 +152,7 @@
// synchronized to handle the case that a thread tries to deregister
// itself at the same time
- protected synchronized void assignEntryToThread(String key, ImportEntryInformation importEntry) {
+ protected synchronized void assignEntryToThread(String key, ImportEntry importEntry) {
// runnables is a concurrent hashmap
ImportEntryProcessRunnable runnable = runnables.get(key);
@@ -220,14 +220,14 @@
* {@link ImportEntryManager} thread and then offered again to this {@link ImportEntryProcessor}
* to be processed.
*/
- protected abstract boolean canHandleImportEntry(ImportEntryInformation importEntryInformation);
+ protected abstract boolean canHandleImportEntry(ImportEntry importEntryInformation);
/**
* Based on the {@link ImportEntry} returns a key which uniquely identifies the thread which
* should process this {@link ImportEntry}. Can be used to place import entries which block/use
* the same records in the same import thread, in this way preventing DB (dead)locks.
*/
- protected abstract String getProcessSelectionKey(ImportEntryInformation importEntry);
+ protected abstract String getProcessSelectionKey(ImportEntry importEntry);
/**
* The default implementation of the ImportEntryProcessRunnable. It performs the following
@@ -433,7 +433,7 @@
}
// is called by the processor in the main EntityManagerThread
- private void addEntry(ImportEntryInformation importEntry) {
+ private void addEntry(ImportEntry importEntry) {
// ignore the entry, queue is too large
// prevents memory problems
@@ -469,12 +469,12 @@
final String clientId;
final String roleId;
- QueuedEntry(ImportEntryInformation importEntry) {
+ QueuedEntry(ImportEntry importEntry) {
importEntryId = importEntry.getId();
- userId = importEntry.getCreatedBy();
- orgId = importEntry.getOrgId();
- roleId = importEntry.getRoleId();
- clientId = importEntry.getClientId();
+ userId = (String) DalUtil.getId(importEntry.getCreatedBy());
+ orgId = (String) DalUtil.getId(importEntry.getOrganization());
+ roleId = (String) DalUtil.getId(importEntry.getRole());
+ clientId = (String) DalUtil.getId(importEntry.getClient());
}
}
}