+dev

On Wed 27 Mar, 2019, 9:47 PM rahul patwari, <rahulpatwari8...@gmail.com>
wrote:

> Hi,
> I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster -
> 1.7.2.
>
> I have this flow in my pipeline:
> KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
> gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default
> trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->
> Enrichment  -->  KafkaSink
>
> I am generating data in such a way that the first two records belong to
> two different sessions. And, generating the third record before the first
> session expires with the timestamp for the third record in such a way that
> the two sessions will be merged to become a single session.
>
> For Example, These are the sample input and output obtained when I ran the
> same pipeline in DirectRunner.
>
> Sample Input:
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
>
> Sample Output:
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
>
> Where "NumberOfRecords" is the count, "WST" is the Avro field Name which
> indicates the window start time for the session window. Similarly "WET"
> indicates the window End time of the session window. I am getting "WST" and
> "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
>
> The program ran successfully in DirectRunner. But, in FlinkRunner, I am
> getting this exception when the third record arrives:
>
> 2019-03-27 15:31:00,442 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
> -> (Window.Into()/Window.Assign.out ->
> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
> by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
> DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
> (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
> 2019-03-27 15:33:25,427 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
> -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) ->
> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
> ->
> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
> (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
> org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
>         at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>         at
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>         at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
>         at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
>         at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
>         at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
>         at
> org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
>         at
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
>         at
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
>         at
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
>         at
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
>         at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
>         at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
> Is this a known issue with FlinkRunner? Is Session Windows with lateness
> @experimental in FlinkRunner?
>
> I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster -
> 1.5.3 and came across the same exception.
>
> I have also tried generating data with lateness as 0, and everything is
> working as expected. Seems like there is no problem in merging the windows
> of the records which belong to the same session.
>
> Thanks,
> Rahul
>

Reply via email to