Hi Sandeep, Thank you for reporting this issue. We noticed this problem during our testing for upcoming Samza release. You can find more details about the bug - https://issues.apache.org/jira/browse/SAMZA-2463
We have a fix and should be included in our upcoming Samza 1.4 release that is slotted for end of this month. However, we also need to update beam Samza runner to pick up the fix which give or take should take another 2 weeks including testing and verification. Hopefully, we should have a fix by mid March. Thanks, Bharath On Wed, Feb 19, 2020 at 11:54 AM Kathula, Sandeep <sandeep_kath...@intuit.com.invalid> wrote: > Hi, > We are trying to build sessionization where we get clickstream hits > from Kafka and generate sessions from the hits. We are using Apache Beam > for our code and it runs on Samza runner. We have a PCollection<String, > Event> where key is user id and value is clickstream hit. We are grouping > by user id and calculating sessions. > > We are using following windowing strategy: > PCollection.apply("UserSessions", Window.<KV<String, SegmentEvent>>into( > > Sessions.<SegmentEvent>withGapDuration(Duration.standardMinutes(30))) > .triggering(Repeatedly > .forever(AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(60))) > ) > .discardingFiredPanes() > .withAllowedLateness(Duration.standardDays(200)) > ) > > But events we are getting are out of order. So, we are getting timestamp > from the hit and adding it as event timestamp in order to have it as part > of correct session. We are using WithTimestamps.of() for that. > > We are saving intermediate state in Kafka topics. We are getting duplicate > key registered for the same timer exception. When I tried with different > scenarios when this issue is happening, we figured out that when events are > coming out of order. For a user when a hit comes and later some hit of > earlier timestamp comes then it is throwing duplicate key timer exception. > It is writing all these events into intermediate Kafka topic from which > duplicate key timer exception is being thrown. First out of order event is > being written into this Kafka topic and very next moment this process is > failing with duplicate key timer issue. > > Stack trace: > > ERROR o.a.b.r.samza.SamzaPipelineResult - org.apache.samza.SamzaException: > Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, > cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, > 8], offset 825. org.apache.samza.SamzaException: > org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp > SystemStreamPartition [kafka, > cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, > 8], offset 825. at > org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150) at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp > SystemStreamPartition [kafka, > cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, > 8], offset 825. at > org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89) at > org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75) > at > org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33) > at > org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58) > ... 5 common frames omitted Caused by: > org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: > org.apache.beam.sdk.util.UserCodeException: > java.lang.IllegalStateException: Duplicate key > org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc > registration for the same timer at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) > at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:96) > at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) > at > org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55) > at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) > at java.lang.Iterable.forEach(Iterable.java:75) at > org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) > at java.util.ArrayList.forEach(ArrayList.java:1257) at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) > at java.lang.Iterable.forEach(Iterable.java:75) at > org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) > at java.util.ArrayList.forEach(ArrayList.java:1257) at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) > at java.lang.Iterable.forEach(Iterable.java:75) at > org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) > at java.util.Collections$SingletonList.forEach(Collections.java:4822) at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) > at > org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) > at > org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72) > ... 7 common frames omitted Caused by: java.lang.RuntimeException: > org.apache.beam.sdk.util.UserCodeException: > java.lang.IllegalStateException: Duplicate key > org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc > registration for the same timer at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:84) > at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55) > at > org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191) > Caused by: org.apache.beam.sdk.util.UserCodeException: > java.lang.IllegalStateException: Duplicate key > org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc > registration for the same timer at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:179) > at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$1(DoFnRunnerWithMetrics.java:55) > at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$4(DoFnRunnerWithMetrics.java:80) > at > org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42) > at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:78) > at > org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55) > at > org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191) > at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82) > at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) > at > org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55) > at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) > at java.lang.Iterable.forEach(Iterable.java:75) at > org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) > at java.util.ArrayList.forEach(ArrayList.java:1257) at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) > at java.lang.Iterable.forEach(Iterable.java:75) at > org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) > at java.util.ArrayList.forEach(ArrayList.java:1257) at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) > at > org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) > at java.lang.Iterable.forEach(Iterable.java:75) at > org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) > at java.util.Collections$SingletonList.forEach(Collections.java:4822) at > org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) > at > org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) > at > org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72) > at > org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33) > at > org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.IllegalStateException: Duplicate key > org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc > registration for the same timer at > com.google.common.base.Preconditions.checkState(Preconditions.java:459) at > org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62) > at > org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37) > at > org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446) > at > org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214) > at > org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135) > at > org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) > at > org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) > at > org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) > at > org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478) > at > org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210) > at > org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) > at > org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62) > at > org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) > at > org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172) > at > org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510) > at > org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211) > at > org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229) > at > org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436) > at > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) > > > > Does anyone have suggestions or experienced similar issue previously? > > Thanks, > Sandeep > >