Hi Sahith, It isn't immediately obvious to me what the error might be, though I was able to sift through the stacktrace and find areas of the codebase that it touches ( https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java#L412 ).
Perhaps we can schedule a call to look into the code and learn more about what might be going wrong? Alternatively, you could file a GCP Support ticket, that'll give us access to look into the Dataflow job to see if we can find any more evidence of what might be going wrong. Thanks, Svetak Sundhar Technical Solutions Engineer, Data s <nellywil...@google.com>vetaksund...@google.com On Fri, Mar 17, 2023 at 1:02 PM Sahith Nallapareddy via dev < dev@beam.apache.org> wrote: > Hello, > > We are working on writing a custom windowing function. The functionality > is similar to the one described in this book > https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html > (the bounded session and per key fixed window is what we are trying). > However, we are not sure what is wrong with our implementation as we run > into this error in dataflow: Error message from worker: > java.lang.IllegalStateException: > [2023-03-14T21:28:46.639Z..2023-03-14T21:58:46.639Z) is in more than one > state address window set > > Can anyone explain what this error means and how we can reproduce it? we > have tests setup and the tests pass fine, this only appears in dataflow > > > Full stack trace: > > java.lang.IllegalStateException: > [2023-03-14T21:28:54.817Z..2023-03-14T21:43:54.817Z) is in more than one > state address window set at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants(MergingActiveWindowSet.java:335) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist(MergingActiveWindowSet.java:89) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist(ReduceFnRunner.java:385) > at > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:98) > at > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) > at > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > at > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) > at > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120) > at > org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > > Thanks, > > Sahith >