+dev On Wed 27 Mar, 2019, 9:47 PM rahul patwari, <rahulpatwari8...@gmail.com> wrote:
> Hi, > I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster - > 1.7.2. > > I have this flow in my pipeline: > KafkaSource(withCreateTime()) --> ApplyWindow(SessionWindow with > gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default > trigger) --> BeamSQL(GroupBy query) --> Window.remerge() --> > Enrichment --> KafkaSink > > I am generating data in such a way that the first two records belong to > two different sessions. And, generating the third record before the first > session expires with the timestamp for the third record in such a way that > the two sessions will be merged to become a single session. > > For Example, These are the sample input and output obtained when I ran the > same pipeline in DirectRunner. > > Sample Input: > {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}} > {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}} > {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}} > > Sample Output: > {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27 > 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}} > {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27 > 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}} > {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27 > 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}} > > Where "NumberOfRecords" is the count, "WST" is the Avro field Name which > indicates the window start time for the session window. Similarly "WET" > indicates the window End time of the session window. I am getting "WST" and > "WET" after remerging and applying ParDo(Enrichment stage of the pipeline). > > The program ran successfully in DirectRunner. But, in FlinkRunner, I am > getting this exception when the third record arrives: > > 2019-03-27 15:31:00,442 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> > DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow) > -> (Window.Into()/Window.Assign.out -> > DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous) > -> > DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group > by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem, > DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter) > -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default > key/Map/ParMultiDo(Anonymous) -> > DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka > ProducerRecord/Map/ParMultiDo(Anonymous) -> > DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)) > (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING. > 2019-03-27 15:33:25,427 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey > -> > DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous) > -> > DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous) > -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) -> > DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) -> > DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo) > -> > DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter) > -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default > key/Map/ParMultiDo(Anonymous) -> > DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka > ProducerRecord/Map/ParMultiDo(Anonymous) -> > DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) > (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED. > org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) > at > org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703) > at > org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517) > at > org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192) > at > org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162) > at > org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132) > at > org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507) > at > org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211) > at > org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229) > at > org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436) > at > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329) > at > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) > > Is this a known issue with FlinkRunner? Is Session Windows with lateness > @experimental in FlinkRunner? > > I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster - > 1.5.3 and came across the same exception. > > I have also tried generating data with lateness as 0, and everything is > working as expected. Seems like there is no problem in merging the windows > of the records which belong to the same session. > > Thanks, > Rahul >