Hi Rahul,

Thanks for providing the detailed report. This looks like a bug rather than a limitation of the Flink Runner. We have integration tests for session windows with the Flink Runner but they seemed to have missed this issue.

Let me investigate and get back to you. Tracking issue: https://jira.apache.org/jira/browse/BEAM-6929

Thanks,
Max

On 28.03.19 03:01, rahul patwari wrote:
+dev

On Wed 27 Mar, 2019, 9:47 PM rahul patwari, <rahulpatwari8...@gmail.com <mailto:rahulpatwari8...@gmail.com>> wrote:

    Hi,
    I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink
    Cluster - 1.7.2.

    I have this flow in my pipeline:
    KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
    gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes,
default trigger)  -->  BeamSQL(GroupBy query)  --> Window.remerge()  -->  Enrichment  -->  KafkaSink

    I am generating data in such a way that the first two records belong
    to two different sessions. And, generating the third record before
    the first session expires with the timestamp for the third record in
    such a way that the two sessions will be merged to become a single
    session.

    For Example, These are the sample input and output obtained when I
    ran the same pipeline in DirectRunner.

    Sample Input:
    {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
    {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
    {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}

    Sample Output:
    
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
    15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
    
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
    15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
    
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
    15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}

    Where "NumberOfRecords" is the count, "WST" is the Avro field Name
    which indicates the window start time for the session window.
    Similarly "WET" indicates the window End time of the session window.
    I am getting "WST" and "WET" after remerging and applying
    ParDo(Enrichment stage of the pipeline).

    The program ran successfully in DirectRunner. But, in FlinkRunner, I
    am getting this exception when the third record arrives:

2019-03-27 15:31:00,442 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph        -
    Source: DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) ->
    Flat Map ->
    
DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
    -> (Window.Into()/Window.Assign.out ->
    
DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
    ->
    
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
    by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
    
DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
    -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
    key/Map/ParMultiDo(Anonymous) ->
    DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
    ProducerRecord/Map/ParMultiDo(Anonymous) ->
    
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
    (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to
    RUNNING.
2019-03-27 15:33:25,427 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph        -
    
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
    ->
    
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
    ->
    DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
    -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc)
    -> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
    
DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
    ->
    
DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
    -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
    key/Map/ParMultiDo(Anonymous) ->
    DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
    ProducerRecord/Map/ParMultiDo(Anonymous) ->
    
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
    (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to
    FAILED.
    org.apache.beam.sdk.util.UserCodeException:
    java.lang.NullPointerException
             at
    org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
             at
    
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
    Source)
             at
    
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
             at
    
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
             at
    
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
             at
    
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
             at
    
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
             at
    
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
             at
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
             at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
             at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
             at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
             at
    
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
             at
    
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
             at
    
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
             at
    
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
             at
    
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
             at
    
org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
             at
    
org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
             at
    
org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
             at
    
org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
             at
    
org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
             at
    
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
             at
    
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)

    Is this a known issue with FlinkRunner? Is Session Windows with
    lateness @experimental in FlinkRunner?

    I have also tried with Runner - beam-runners-flink_2.11, Flink
    Cluster - 1.5.3 and came across the same exception.

    I have also tried generating data with lateness as 0, and everything
    is working as expected. Seems like there is no problem in merging
    the windows of the records which belong to the same session.

    Thanks,
    Rahul

Reply via email to