Hi,
   We are trying to build sessionization where we get clickstream hits from 
Kafka and generate sessions from the hits. We are using Apache Beam for our 
code and it runs on Samza runner. We have a PCollection<String, Event> where 
key is user id and value is clickstream hit. We are grouping by user id and 
calculating sessions.

We are using following windowing strategy:
PCollection.apply("UserSessions", Window.<KV<String, SegmentEvent>>into(
        Sessions.<SegmentEvent>withGapDuration(Duration.standardMinutes(30)))
        .triggering(Repeatedly
                        .forever(AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(60)))
        )
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardDays(200))
)

But events we are getting are out of order. So, we are getting timestamp from 
the hit and adding it as event timestamp in order to have it as part of correct 
session. We are using WithTimestamps.of()   for that.

We are saving intermediate state in Kafka topics. We are getting duplicate key 
registered for the same timer exception. When I tried with different scenarios 
when this issue is happening, we figured out that when events are coming out of 
order. For a user when a hit comes and later some hit of earlier timestamp 
comes then it is throwing duplicate key timer exception. It is writing all 
these events into intermediate Kafka topic from which duplicate key timer 
exception is being thrown. First out of order event is being written into this 
Kafka topic and very next moment this process is failing with duplicate key 
timer issue.

Stack trace:

ERROR o.a.b.r.samza.SamzaPipelineResult - org.apache.samza.SamzaException: 
Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, 
cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 
8], offset 825. org.apache.samza.SamzaException: 
org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp 
SystemStreamPartition [kafka, 
cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 
8], offset 825. at 
org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150) at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp 
SystemStreamPartition [kafka, 
cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 
8], offset 825. at 
org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89) at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
 at 
org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
 at 
org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
 ... 5 common frames omitted Caused by: 
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: 
Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc 
registration for the same timer at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at 
org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:96) at 
org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
 at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) 
at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
 at java.lang.Iterable.forEach(Iterable.java:75) at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
 at java.util.ArrayList.forEach(ArrayList.java:1257) at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) 
at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
 at java.lang.Iterable.forEach(Iterable.java:75) at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
 at java.util.ArrayList.forEach(ArrayList.java:1257) at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) 
at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
 at java.lang.Iterable.forEach(Iterable.java:75) at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
 at java.util.Collections$SingletonList.forEach(Collections.java:4822) at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) 
at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) 
at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
 ... 7 common frames omitted Caused by: java.lang.RuntimeException: 
org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: 
Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc 
registration for the same timer at 
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:84)
 at 
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
 at 
org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
 Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.IllegalStateException: Duplicate key 
org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for 
the same timer at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source) at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
 at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:179)
 at 
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$1(DoFnRunnerWithMetrics.java:55)
 at 
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$4(DoFnRunnerWithMetrics.java:80)
 at 
org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42)
 at 
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:78)
 at 
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
 at 
org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
 at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82) at 
org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at 
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
 at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) 
at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
 at java.lang.Iterable.forEach(Iterable.java:75) at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
 at java.util.ArrayList.forEach(ArrayList.java:1257) at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) 
at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
 at java.lang.Iterable.forEach(Iterable.java:75) at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
 at java.util.ArrayList.forEach(ArrayList.java:1257) at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) 
at 
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
 at java.lang.Iterable.forEach(Iterable.java:75) at 
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
 at java.util.Collections$SingletonList.forEach(Collections.java:4822) at 
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) 
at 
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) 
at 
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
 at 
org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
 at 
org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.IllegalStateException: Duplicate key 
org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for 
the same timer at 
com.google.common.base.Preconditions.checkState(Preconditions.java:459) at 
org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62) 
at 
org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37)
 at 
org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446) 
at 
org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214)
 at 
org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135)
 at 
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
 at 
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
 at 
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
 at 
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478)
 at 
org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210)
 at 
org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
 at 
org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62)
 at 
org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
 at 
org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172)
 at 
org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510)
 at 
org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
 at 
org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
 at 
org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
 at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
 at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)



Does anyone have suggestions or experienced similar issue previously?

Thanks,
Sandeep

Reply via email to