Related to issue
31410: Import entry process can have a stopped but not deregistered thread: tickets remain in the queue
Added logging statements, add wait in ImportEntryManager loop, also make sure that the wait time computation does not round, add a catch block in ImportEntryProcessorRunnable, moved the inner loop to a separate method for readability
--- a/src/org/openbravo/service/importprocess/ImportEntryManager.java Mon Nov 09 12:58:54 2015 +0100
+++ b/src/org/openbravo/service/importprocess/ImportEntryManager.java Tue Nov 10 09:05:49 2015 +0100
@@ -572,13 +572,16 @@
// a next batch of entries
try {
// wait one second per 30 records, somewhat arbitrary
- // but high enough for most cases
+ // but high enough for most cases, also always wait 300 millis additional to
+ // start up threads etc.
+ // note computation of timing ensures that int rounding is done on 1000* entrycount
if (isTest) {
// in case of test don't wait minimal 2 seconds
- Thread.sleep(1000 * (entryCount / 30));
+ Thread.sleep(300 + ((1000 * entryCount) / 30));
} else {
+ log.debug("Entries have been processed, wait a shorter time, and try again to capture new entries which have been added");
// wait minimal 2 seconds or based on entry count
- Thread.sleep(Math.max(2000, 1000 * (entryCount / 30)));
+ Thread.sleep(Math.max(2000, 300 + ((1000 * entryCount) / 30)));
}
} catch (Exception ignored) {
}
--- a/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Mon Nov 09 12:58:54 2015 +0100
+++ b/src/org/openbravo/service/importprocess/ImportEntryProcessor.java Tue Nov 10 09:05:49 2015 +0100
@@ -195,14 +195,19 @@
* Is synchronized to be handle the case that deregistering happens while also an entry was added.
* If an entry was added false is returned and the thread continues.
*/
- private synchronized boolean deregisterProcessThread(ImportEntryProcessRunnable runnable) {
+ private synchronized boolean tryDeregisterProcessThread(ImportEntryProcessRunnable runnable) {
if (!runnable.getImportEntryQueue().isEmpty()) {
+ log.debug("Not deregistering process thread as new entries have been added to it");
// a new entry was entered while we tried to deregister
return false;
}
+ doDeregisterProcessThread(runnable);
+ return true;
+ }
+
+ private synchronized void doDeregisterProcessThread(ImportEntryProcessRunnable runnable) {
log.debug("Removing runnable " + runnable.getKey());
runnables.remove(runnable.getKey());
- return true;
}
/**
@@ -271,121 +276,134 @@
@Override
public void run() {
- while (true) {
- try {
- int cnt = 0;
- long totalT = 0;
- QueuedEntry queuedImportEntry;
- while ((queuedImportEntry = importEntries.poll()) != null) {
+ try {
+ while (true) {
+ try {
+ doRunCycle();
+ } catch (Throwable logIt) {
+ // prevent the loop from exiting, only log the exception
+ // normally low level errors end up here
+ ImportProcessUtils.logError(logger, logIt);
+ } finally {
+
+ // bit rough but ensures that the connection is released/closed
try {
- final long t0 = System.currentTimeMillis();
+ OBDal.getInstance().rollbackAndClose();
+ } catch (Exception ignored) {
+ }
- // set the same obcontext as was being used for the original
- // entry
- setOBContext(queuedImportEntry);
+ try {
+ if (TriggerHandler.getInstance().isDisabled()) {
+ TriggerHandler.getInstance().enable();
+ }
+ OBDal.getInstance().commitAndClose();
+ } catch (Exception ignored) {
+ }
- OBContext.setAdminMode(true);
- ImportEntry localImportEntry;
- try {
- // reload the importEntry
- localImportEntry = OBDal.getInstance().get(ImportEntry.class,
- queuedImportEntry.importEntryId);
+ logger.debug("Trying to deregister process " + key);
- // check if already processed, if so skip it
- if (localImportEntry == null
- || !"Initial".equals(localImportEntry.getImportStatus())) {
- continue;
- }
- } finally {
- OBContext.restorePreviousMode();
- }
-
- // not changed, process
- final String typeOfData = localImportEntry.getTypeofdata();
-
- if (logger.isDebugEnabled()) {
- logger.debug("Processing entry " + localImportEntry.getIdentifier() + " "
- + typeOfData);
- }
-
- processEntry(localImportEntry);
-
- if (logger.isDebugEnabled()) {
- logger.debug("Finished Processing entry " + localImportEntry.getIdentifier() + " "
- + typeOfData);
- }
-
- // don't use the import entry anymore, touching methods on it
- // may re-open a session
- localImportEntry = null;
-
- // processed so can be removed
- importEntryIds.remove(queuedImportEntry.importEntryId);
-
- // keep some stats
- cnt++;
- final long timeForEntry = (System.currentTimeMillis() - t0);
- totalT += timeForEntry;
- importEntryManager.reportStats(typeOfData, timeForEntry);
- if ((cnt % 100) == 0 && logger.isDebugEnabled()) {
- logger.debug("Runnable: " + key + ", processed " + cnt + " import entries in "
- + totalT + " millis, " + (totalT / cnt)
- + " per import entry, current queue size: " + importEntries.size());
- }
-
- if (TriggerHandler.getInstance().isDisabled()) {
- logger
- .error("Triggers disabled at end of processing an entry, this is a coding error, "
- + "call TriggerHandler.enable in your code. Triggers are enabled again for now!");
- TriggerHandler.getInstance().enable();
- OBDal.getInstance().commitAndClose();
- }
-
- // the import entry processEntry calls should not leave an open active session
- if (SessionHandler.isSessionHandlerPresent()) {
- // change to warning if the code in the subclasses really works correctly
- logger
- .warn("Session handler present after processing import entry, this indicates that the processing code "
- + "does not correctly clean/close the session after its last actions. This should be fixed.");
- OBDal.getInstance().commitAndClose();
- }
-
- } catch (Throwable t) {
- ImportProcessUtils.logError(logger, t);
-
- // bit rough but ensures that the connection is released/closed
- try {
- OBDal.getInstance().rollbackAndClose();
- } catch (Exception ignored) {
- }
- try {
- if (TriggerHandler.getInstance().isDisabled()) {
- TriggerHandler.getInstance().enable();
- }
- OBDal.getInstance().commitAndClose();
- } catch (Exception ignored) {
- }
-
- // store the error
- importEntryManager.setImportEntryErrorIndependent(queuedImportEntry.importEntryId, t);
- } finally {
- cleanUpThreadForNextCycle();
+ // no more entries and deregistered, if so go away
+ if (importEntryProcessor.tryDeregisterProcessThread(this)) {
+ logger.debug("All entries processed, exiting thread");
+ importEntryIds.clear();
+ cachedOBContexts.clear();
+ return;
}
}
+ }
+ } finally {
+ // always deregister at this point to be sure that we are not re-used
+ logger.debug("Loop finished removing runnable " + getKey());
+ importEntryProcessor.doDeregisterProcessThread(this);
+ }
+ }
+
+ protected void doRunCycle() {
+ int cnt = 0;
+ long totalT = 0;
+ QueuedEntry queuedImportEntry;
+ while ((queuedImportEntry = importEntries.poll()) != null) {
+ try {
+ final long t0 = System.currentTimeMillis();
+
+ // set the same obcontext as was being used for the original
+ // entry
+ setOBContext(queuedImportEntry);
+
+ OBContext.setAdminMode(true);
+ ImportEntry localImportEntry;
+ try {
+ // reload the importEntry
+ localImportEntry = OBDal.getInstance().get(ImportEntry.class,
+ queuedImportEntry.importEntryId);
+
+ // check if already processed, if so skip it
+ if (localImportEntry == null || !"Initial".equals(localImportEntry.getImportStatus())) {
+ logger
+ .debug("Entry already processed skipping it " + queuedImportEntry.importEntryId);
+ continue;
+ }
+ } finally {
+ OBContext.restorePreviousMode();
+ }
+
+ // not changed, process
+ final String typeOfData = localImportEntry.getTypeofdata();
+
if (logger.isDebugEnabled()) {
+ logger.debug("Processing entry " + localImportEntry.getIdentifier() + " " + typeOfData);
+ }
+
+ processEntry(localImportEntry);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Finished Processing entry " + localImportEntry.getIdentifier() + " "
+ + typeOfData);
+ }
+
+ // don't use the import entry anymore, touching methods on it
+ // may re-open a session
+ localImportEntry = null;
+
+ // processed so can be removed
+ importEntryIds.remove(queuedImportEntry.importEntryId);
+
+ // keep some stats
+ cnt++;
+ final long timeForEntry = (System.currentTimeMillis() - t0);
+ totalT += timeForEntry;
+ importEntryManager.reportStats(typeOfData, timeForEntry);
+ if ((cnt % 100) == 0 && logger.isDebugEnabled()) {
logger.debug("Runnable: " + key + ", processed " + cnt + " import entries in " + totalT
+ " millis, " + (totalT / cnt) + " per import entry, current queue size: "
+ importEntries.size());
+ }
+ if (TriggerHandler.getInstance().isDisabled()) {
+ logger
+ .error("Triggers disabled at end of processing an entry, this is a coding error, "
+ + "call TriggerHandler.enable in your code. Triggers are enabled again for now!");
+ TriggerHandler.getInstance().enable();
+ OBDal.getInstance().commitAndClose();
}
- } finally {
+
+ // the import entry processEntry calls should not leave an open active session
+ if (SessionHandler.isSessionHandlerPresent()) {
+ // change to warning if the code in the subclasses really works correctly
+ logger
+ .warn("Session handler present after processing import entry, this indicates that the processing code "
+ + "does not correctly clean/close the session after its last actions. This should be fixed.");
+ OBDal.getInstance().commitAndClose();
+ }
+
+ } catch (Throwable t) {
+ ImportProcessUtils.logError(logger, t);
// bit rough but ensures that the connection is released/closed
try {
OBDal.getInstance().rollbackAndClose();
} catch (Exception ignored) {
}
-
try {
if (TriggerHandler.getInstance().isDisabled()) {
TriggerHandler.getInstance().enable();
@@ -394,15 +412,21 @@
} catch (Exception ignored) {
}
- logger.debug("Trying to deregister process " + key);
+ // store the error
+ try {
+ importEntryManager.setImportEntryErrorIndependent(queuedImportEntry.importEntryId, t);
+ } catch (Throwable ignore) {
+ ImportProcessUtils.logError(logger, ignore);
+ }
+ } finally {
+ cleanUpThreadForNextCycle();
+ }
+ }
+ if (logger.isDebugEnabled() & cnt > 0) {
+ logger.debug("Runnable: " + key + ", processed " + cnt + " import entries in " + totalT
+ + " millis, " + (totalT / cnt) + " per import entry, current queue size: "
+ + importEntries.size());
- // no more entries and deregistered
- if (importEntryProcessor.deregisterProcessThread(this)) {
- importEntryIds.clear();
- cachedOBContexts.clear();
- return;
- }
- }
}
}
@@ -489,6 +513,8 @@
// cache a queued entry as it has a much lower mem foot print than the import
// entry itself
importEntries.add(new QueuedEntry(importEntry));
+ } else {
+ logger.debug("Not adding entry, it is already in the list of ids " + importEntry.getId());
}
}