Hi, We are trying to build sessionization where we get clickstream hits from Kafka and generate sessions from the hits. We are using Apache Beam for our code and it runs on Samza runner. We have a PCollection<String, Event> where key is user id and value is clickstream hit. We are grouping by user id and calculating sessions.
We are using following windowing strategy: PCollection.apply("UserSessions", Window.<KV<String, SegmentEvent>>into( Sessions.<SegmentEvent>withGapDuration(Duration.standardMinutes(30))) .triggering(Repeatedly .forever(AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(60))) ) .discardingFiredPanes() .withAllowedLateness(Duration.standardDays(200)) ) But events we are getting are out of order. So, we are getting timestamp from the hit and adding it as event timestamp in order to have it as part of correct session. We are using WithTimestamps.of() for that. We are saving intermediate state in Kafka topics. We are getting duplicate key registered for the same timer exception. When I tried with different scenarios when this issue is happening, we figured out that when events are coming out of order. For a user when a hit comes and later some hit of earlier timestamp comes then it is throwing duplicate key timer exception. It is writing all these events into intermediate Kafka topic from which duplicate key timer exception is being thrown. First out of order event is being written into this Kafka topic and very next moment this process is failing with duplicate key timer issue. Stack trace: ERROR o.a.b.r.samza.SamzaPipelineResult - org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 8], offset 825. org.apache.samza.SamzaException: org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 8], offset 825. at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 8], offset 825. at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89) at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75) at org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33) at org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58) ... 5 common frames omitted Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:96) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.Collections$SingletonList.forEach(Collections.java:4822) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72) ... 7 common frames omitted Caused by: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:84) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55) at org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:179) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$1(DoFnRunnerWithMetrics.java:55) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$4(DoFnRunnerWithMetrics.java:80) at org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:78) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55) at org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.Collections$SingletonList.forEach(Collections.java:4822) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72) at org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33) at org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at com.google.common.base.Preconditions.checkState(Preconditions.java:459) at org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62) at org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37) at org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446) at org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214) at org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478) at org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210) at org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) at org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62) at org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) at org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172) at org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510) at org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211) at org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229) at org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436) at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) Does anyone have suggestions or experienced similar issue previously? Thanks, Sandeep