Fixes issue 31446: Import entry process can have a stopped but not deregistered thread: tickets remain in the queue
authorMartin Taal <martin.taal@openbravo.com>
Sun, 15 Nov 2015 12:49:47 +0100
changeset 28105 12dfd7922c07
parent 28104 b2793ba35559
child 28106 3af928301873
Fixes issue 31446: Import entry process can have a stopped but not deregistered thread: tickets remain in the queue
Added logging statements, add wait in ImportEntryManager loop, also make sure that the wait time computation does not round, add a catch block in ImportEntryProcessorRunnable,
moved the inner loop to a separate method for readability
src/org/openbravo/service/importprocess/ImportEntryManager.java
src/org/openbravo/service/importprocess/ImportEntryProcessor.java
--- a/src/org/openbravo/service/importprocess/ImportEntryManager.java	Wed Nov 11 20:01:21 2015 +0100
+++ b/src/org/openbravo/service/importprocess/ImportEntryManager.java	Sun Nov 15 12:49:47 2015 +0100
@@ -45,6 +45,7 @@
 import org.hibernate.ScrollableResults;
 import org.openbravo.base.exception.OBException;
 import org.openbravo.base.provider.OBProvider;
+import org.openbravo.base.session.OBPropertiesProvider;
 import org.openbravo.dal.core.OBContext;
 import org.openbravo.dal.core.SessionHandler;
 import org.openbravo.dal.service.OBDal;
@@ -175,6 +176,7 @@
       return;
     }
     threadsStarted = true;
+
     log.debug("Starting Import Entry Framework");
 
     // same as fixed threadpool, will only stop accepting new tasks (throw an exception)
@@ -223,10 +225,11 @@
    */
   public void shutdown() {
     log.debug("Shutting down Import Entry Framework");
-    
+
     isShutDown = true;
 
     executorService.shutdownNow();
+
     for (ImportEntryProcessor importEntryProcessor : importEntryProcessors.values()) {
       importEntryProcessor.shutdown();
     }
@@ -466,6 +469,8 @@
 
       Thread.currentThread().setName("Import Entry Manager Main");
 
+      boolean isTest = OBPropertiesProvider.getInstance().getBooleanProperty("test.environment");
+
       // don't start right away at startup, give the system time to
       // really start
       log.debug("Started, first sleep " + manager.initialWaitTime);
@@ -566,8 +571,17 @@
               // a next batch of entries
               try {
                 // wait one second per 30 records, somewhat arbitrary
-                // but high enough for most cases
-                Thread.sleep(Math.max(2000, 1000 * (entryCount / 30)));
+                // but high enough for most cases, also always wait 300 millis additional to
+                // start up threads etc.
+                // note computation of timing ensures that int rounding is done on 1000* entrycount
+                if (isTest) {
+                  // in case of test don't wait minimal 2 seconds
+                  Thread.sleep(300 + ((1000 * entryCount) / 30));
+                } else {
+                  log.debug("Entries have been processed, wait a shorter time, and try again to capture new entries which have been added");
+                  // wait minimal 2 seconds or based on entry count
+                  Thread.sleep(Math.max(2000, 300 + ((1000 * entryCount) / 30)));
+                }
               } catch (Exception ignored) {
               }
             } else {
--- a/src/org/openbravo/service/importprocess/ImportEntryProcessor.java	Wed Nov 11 20:01:21 2015 +0100
+++ b/src/org/openbravo/service/importprocess/ImportEntryProcessor.java	Sun Nov 15 12:49:47 2015 +0100
@@ -162,11 +162,8 @@
     // as runnable can already be in a queue of the executorservice
     // waiting to be processed, but not yet started
     if (runnable != null) {
-      // there is runnable which can handle this ImportEntry
-      if (log.isDebugEnabled()) {
-        log.debug("Adding entry to runnable with key " + key);
-      }
-      // give it to the runnable
+      // give it to the runnable, the addEntry checks if the import entry
+      // is not already being handled, if so it is skipped
       runnable.addEntry(importEntry);
 
       // done
@@ -198,14 +195,19 @@
    * Is synchronized to be handle the case that deregistering happens while also an entry was added.
    * If an entry was added false is returned and the thread continues.
    */
-  private synchronized boolean deregisterProcessThread(ImportEntryProcessRunnable runnable) {
+  private synchronized boolean tryDeregisterProcessThread(ImportEntryProcessRunnable runnable) {
     if (!runnable.getImportEntryQueue().isEmpty()) {
+      log.debug("Not deregistering process thread as new entries have been added to it");
       // a new entry was entered while we tried to deregister
       return false;
     }
+    doDeregisterProcessThread(runnable);
+    return true;
+  }
+
+  private synchronized void doDeregisterProcessThread(ImportEntryProcessRunnable runnable) {
     log.debug("Removing runnable " + runnable.getKey());
     runnables.remove(runnable.getKey());
-    return true;
   }
 
   /**
@@ -268,125 +270,140 @@
     // when the garbagecollector runs
     private Map<String, OBContext> cachedOBContexts = new HashMap<String, OBContext>();
 
+    public ImportEntryProcessRunnable() {
+      logger = Logger.getLogger(this.getClass());
+    }
+
     @Override
     public void run() {
+      try {
+        while (true) {
+          try {
+            doRunCycle();
+          } catch (Throwable logIt) {
+            // prevent the loop from exiting, only log the exception
+            // normally low level errors end up here
+            ImportProcessUtils.logError(logger, logIt);
+          } finally {
 
-      logger = Logger.getLogger(this.getClass());
-      while (true) {
-        try {
-          int cnt = 0;
-          long totalT = 0;
-          QueuedEntry queuedImportEntry;
-          while ((queuedImportEntry = importEntries.poll()) != null) {
+            // bit rough but ensures that the connection is released/closed
             try {
-              final long t0 = System.currentTimeMillis();
+              OBDal.getInstance().rollbackAndClose();
+            } catch (Exception ignored) {
+            }
 
-              // set the same obcontext as was being used for the original
-              // entry
-              setOBContext(queuedImportEntry);
+            try {
+              if (TriggerHandler.getInstance().isDisabled()) {
+                TriggerHandler.getInstance().enable();
+              }
+              OBDal.getInstance().commitAndClose();
+            } catch (Exception ignored) {
+            }
 
-              OBContext.setAdminMode(true);
-              ImportEntry localImportEntry;
-              try {
-                // reload the importEntry
-                localImportEntry = OBDal.getInstance().get(ImportEntry.class,
-                    queuedImportEntry.importEntryId);
+            logger.debug("Trying to deregister process " + key);
 
-                // check if already processed, if so skip it
-                if (localImportEntry == null
-                    || !"Initial".equals(localImportEntry.getImportStatus())) {
-                  continue;
-                }
-              } finally {
-                OBContext.restorePreviousMode();
-              }
-
-              // not changed, process
-              final String typeOfData = localImportEntry.getTypeofdata();
-
-              if (logger.isDebugEnabled()) {
-                logger.debug("Processing entry " + localImportEntry.getIdentifier() + " "
-                    + typeOfData);
-              }
-
-              processEntry(localImportEntry);
-
-              if (logger.isDebugEnabled()) {
-                logger.debug("Finished Processing entry " + localImportEntry.getIdentifier() + " "
-                    + typeOfData);
-              }
-
-              // don't use the import entry anymore, touching methods on it
-              // may re-open a session
-              localImportEntry = null;
-
-              // processed so can be removed
-              importEntryIds.remove(queuedImportEntry.importEntryId);
-
-              // keep some stats
-              cnt++;
-              final long timeForEntry = (System.currentTimeMillis() - t0);
-              totalT += timeForEntry;
-              importEntryManager.reportStats(typeOfData, timeForEntry);
-              if ((cnt % 100) == 0 && logger.isDebugEnabled()) {
-                logger.debug("Runnable: " + key + ", processed " + cnt + " import entries in "
-                    + totalT + " millis, " + (totalT / cnt)
-                    + " per import entry, current queue size: " + importEntries.size());
-              }
-
-              if (TriggerHandler.getInstance().isDisabled()) {
-                logger
-                    .error("Triggers disabled at end of processing an entry, this is a coding error, "
-                        + "call TriggerHandler.enable in your code. Triggers are enabled again for now!");
-                TriggerHandler.getInstance().enable();
-                OBDal.getInstance().commitAndClose();
-              }
-
-              // the import entry processEntry calls should not leave an open active session
-              if (SessionHandler.isSessionHandlerPresent()) {
-                // change to warning if the code in the subclasses really works correctly
-                logger
-                    .warn("Session handler present after processing import entry, this indicates that the processing code "
-                        + "does not correctly clean/close the session after its last actions. This should be fixed.");
-                OBDal.getInstance().commitAndClose();
-              }
-
-            } catch (Throwable t) {
-              ImportProcessUtils.logError(logger, t);
-
-              // bit rough but ensures that the connection is released/closed
-              try {
-                OBDal.getInstance().rollbackAndClose();
-              } catch (Exception ignored) {
-              }
-              try {
-                if (TriggerHandler.getInstance().isDisabled()) {
-                  TriggerHandler.getInstance().enable();
-                }
-                OBDal.getInstance().commitAndClose();
-              } catch (Exception ignored) {
-              }
-
-              // store the error
-              importEntryManager.setImportEntryErrorIndependent(queuedImportEntry.importEntryId, t);
-            } finally {
-              cleanUpThreadForNextCycle();
+            // no more entries and deregistered, if so go away
+            if (importEntryProcessor.tryDeregisterProcessThread(this)) {
+              logger.debug("All entries processed, exiting thread");
+              importEntryIds.clear();
+              cachedOBContexts.clear();
+              return;
             }
           }
+        }
+      } finally {
+        // always deregister at this point to be sure that we are not re-used
+        logger.debug("Loop finished removing runnable " + getKey());
+        importEntryProcessor.doDeregisterProcessThread(this);
+      }
+    }
+
+    protected void doRunCycle() {
+      int cnt = 0;
+      long totalT = 0;
+      QueuedEntry queuedImportEntry;
+      while ((queuedImportEntry = importEntries.poll()) != null) {
+        try {
+          final long t0 = System.currentTimeMillis();
+
+          // set the same obcontext as was being used for the original
+          // entry
+          setOBContext(queuedImportEntry);
+
+          OBContext.setAdminMode(true);
+          ImportEntry localImportEntry;
+          try {
+            // reload the importEntry
+            localImportEntry = OBDal.getInstance().get(ImportEntry.class,
+                queuedImportEntry.importEntryId);
+
+            // check if already processed, if so skip it
+            if (localImportEntry == null || !"Initial".equals(localImportEntry.getImportStatus())) {
+              logger
+                  .debug("Entry already processed skipping it " + queuedImportEntry.importEntryId);
+              continue;
+            }
+          } finally {
+            OBContext.restorePreviousMode();
+          }
+
+          // not changed, process
+          final String typeOfData = localImportEntry.getTypeofdata();
+
           if (logger.isDebugEnabled()) {
+            logger.debug("Processing entry " + localImportEntry.getIdentifier() + " " + typeOfData);
+          }
+
+          processEntry(localImportEntry);
+
+          if (logger.isDebugEnabled()) {
+            logger.debug("Finished Processing entry " + localImportEntry.getIdentifier() + " "
+                + typeOfData);
+          }
+
+          // don't use the import entry anymore, touching methods on it
+          // may re-open a session
+          localImportEntry = null;
+
+          // processed so can be removed
+          importEntryIds.remove(queuedImportEntry.importEntryId);
+
+          // keep some stats
+          cnt++;
+          final long timeForEntry = (System.currentTimeMillis() - t0);
+          totalT += timeForEntry;
+          importEntryManager.reportStats(typeOfData, timeForEntry);
+          if ((cnt % 100) == 0 && logger.isDebugEnabled()) {
             logger.debug("Runnable: " + key + ", processed " + cnt + " import entries in " + totalT
                 + " millis, " + (totalT / cnt) + " per import entry, current queue size: "
                 + importEntries.size());
+          }
 
+          if (TriggerHandler.getInstance().isDisabled()) {
+            logger
+                .error("Triggers disabled at end of processing an entry, this is a coding error, "
+                    + "call TriggerHandler.enable in your code. Triggers are enabled again for now!");
+            TriggerHandler.getInstance().enable();
+            OBDal.getInstance().commitAndClose();
           }
-        } finally {
+
+          // the import entry processEntry calls should not leave an open active session
+          if (SessionHandler.isSessionHandlerPresent()) {
+            // change to warning if the code in the subclasses really works correctly
+            logger
+                .warn("Session handler present after processing import entry, this indicates that the processing code "
+                    + "does not correctly clean/close the session after its last actions. This should be fixed.");
+            OBDal.getInstance().commitAndClose();
+          }
+
+        } catch (Throwable t) {
+          ImportProcessUtils.logError(logger, t);
 
           // bit rough but ensures that the connection is released/closed
           try {
             OBDal.getInstance().rollbackAndClose();
           } catch (Exception ignored) {
           }
-
           try {
             if (TriggerHandler.getInstance().isDisabled()) {
               TriggerHandler.getInstance().enable();
@@ -395,15 +412,21 @@
           } catch (Exception ignored) {
           }
 
-          logger.debug("Trying to deregister process " + key);
+          // store the error
+          try {
+            importEntryManager.setImportEntryErrorIndependent(queuedImportEntry.importEntryId, t);
+          } catch (Throwable ignore) {
+            ImportProcessUtils.logError(logger, ignore);
+          }
+        } finally {
+          cleanUpThreadForNextCycle();
+        }
+      }
+      if (logger.isDebugEnabled() & cnt > 0) {
+        logger.debug("Runnable: " + key + ", processed " + cnt + " import entries in " + totalT
+            + " millis, " + (totalT / cnt) + " per import entry, current queue size: "
+            + importEntries.size());
 
-          // no more entries and deregistered
-          if (importEntryProcessor.deregisterProcessThread(this)) {
-            importEntryIds.clear();
-            cachedOBContexts.clear();
-            return;
-          }
-        }
       }
     }
 
@@ -484,10 +507,14 @@
       }
 
       if (!importEntryIds.contains(importEntry.getId())) {
+        logger.debug("Adding entry to runnable with key " + key);
+
         importEntryIds.add(importEntry.getId());
         // cache a queued entry as it has a much lower mem foot print than the import
         // entry itself
         importEntries.add(new QueuedEntry(importEntry));
+      } else {
+        logger.debug("Not adding entry, it is already in the list of ids " + importEntry.getId());
       }
     }
 
@@ -512,7 +539,12 @@
         importEntryId = importEntry.getId();
         userId = (String) DalUtil.getId(importEntry.getCreatedBy());
         orgId = (String) DalUtil.getId(importEntry.getOrganization());
-        roleId = (String) DalUtil.getId(importEntry.getRole());
+        if (importEntry.getRole() != null) {
+          roleId = (String) DalUtil.getId(importEntry.getRole());
+        } else {
+          // will use the default role of the user
+          roleId = null;
+        }
         clientId = (String) DalUtil.getId(importEntry.getClient());
       }
     }