sborya commented on a change in pull request #938: SAMZA-1531: Support run.id in standalone for batch processing. URL: https://github.com/apache/samza/pull/938#discussion_r264474066
########## File path: samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ########## @@ -280,4 +376,68 @@ private void setApplicationFinalStatus() { } } } + + + /** + * Defines a specific implementation of {@link DistributedDataStateListener} for local {@link DistributedDataAccess} + */ + private final class LocalDistributedDataStateListener implements DistributedDataStateListener { + + /** + * upon reconnect check if global runid differs from local runid + */ + @Override + public void handleReconnect() { + if(coordinationUtils == null || runIdLock == null || runIdAccess == null ) { + LOG.warn("Stopping processor {} and shutting down due to failure reading global runid after reconnect", uid); + stopProcessingAndShutDown(); + return; + } + try { + // acquire lock to write or read run.id + DistributedReadWriteLock.AccessType lockAccess = runIdLock.lock(LOCK_TIMEOUT, LOCK_TIMEOUT_UNIT); + if(lockAccess != DistributedReadWriteLock.AccessType.NONE) { + String globalRunId = (String) runIdAccess.readData(RUNID_PATH, new LocalDistributedDataWatcher()); + runIdLock.unlock(); + if(runId != globalRunId) { + LOG.warn("Stopping processor {} and shutting down as local runid {} differs from global runid {} after session reconnect.", uid, runId, + globalRunId); + stopProcessingAndShutDown(); + } + } else { + LOG.warn("Stopping processor {} and shutting down due to failure reading global runid after reconnect", uid); + stopProcessingAndShutDown(); + } + } catch (TimeoutException e) { + LOG.warn("Stopping processor {} and shutting down due to failure reading global runid after reconnect", uid); + stopProcessingAndShutDown(); + } + } + + @Override + public void handleReconnectFailedError() { + LOG.warn("Stopping processor {} and shutting down due to failure to reconnect", uid); + stopProcessingAndShutDown(); + } + } + + /** + * Defines a specific implementation of {@link DistributedDataWatcher} for local {@link DistributedDataAccess} + */ + private final class LocalDistributedDataWatcher implements DistributedDataWatcher { + @Override + public void handleDataChange(Object newData) { + if(runId != (String) newData) { Review comment: you should use equal() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services