Forgot to mention: A FixedWindow of duration 1 minute is applied before applying SqlTransform.
On Tue, Sep 10, 2019 at 6:03 PM rahul patwari <rahulpatwari8...@gmail.com> wrote: > Hi, > I am facing this issue too. > +dev <d...@beam.apache.org> > > Here is the Pipeline that we are using(providing a very simple pipeline to > highlight the issue): > KafkaSource -> SqlTransform -> KafkaSink > > We are reading from a single topic in KafkaSource with a single partition. > > Here is the data that we are producing to KafkaSource topic: > "str1", "2019-09-10 11:36:42" > "str2", "2019-09-10 11:36:44" > "str3", "2019-09-10 11:36:45" > > The first column name is "strCol". > The second column, i.e. the timestamp in string format is being used as > the timestamp of the element. > The timestamp is the wall time when the record got generated. > After publishing this data to the Kafka topic, we are not publishing any > more data. The topic is idle after that. > The timestamps of the records are monotonically increasing. > > Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol" > > Here is the result from KafkaSink: > {"strCol":{"string":"str1"}} > {"strCol":{"string":"str3"}} > {"strCol":{"string":"str2"}} > > The expected result is written to KafkaSink Correctly, *but with a delay*. > > Here are the logs from Spark Driver: > ... > 19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block: > {0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z, > highWatermark=2019-09-10T11:43:37.273Z, > synchronizedProcessingTime=2019-09-10T11:40:33.000Z}} > ... > 19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block: > {0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z, > highWatermark=2019-09-10T11:44:54.238Z, > synchronizedProcessingTime=2019-09-10T11:41:17.000Z}} > > As per the logs, > when the processing time was 12:12:42, the highWatermark was at 11:43:37. > Almost 30 minutes delay. And > when the processing time was 12:18:53, the highWatermark was at 11:44:54. > > From the above logs, it seems that the watermark is moving slowly. > > Is there an IT for SparkRunner with Unbounded data and Windowing > aggregation? > Is this a known bug? > > Thanks, > Rahul > > On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram <shantachakp...@gmail.com> > wrote: > >> Hi, >> >> We have detected an issue with SparkRunner and Watermark. >> >> *Pipeline*: Read from two Kafka Sources => Apply fixed window of >> duration 1 minute to both the PCollections => Apply SqlTransform with query >> "select c.datetime, c.country ,s.name, s.id from `kafka_source1` as s >> join `kafka_source2` as c on s.name = c.name" => write the emitted >> output to Kafka Sink >> >> we are using the watermark provided in >> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74. >> We have given maxDelay as 0. >> >> As we have applied fixed window of 1 minute duration and as the elements >> timestamps are monotonically increasing, we are expecting the output to be >> emitted when the current processing time crosses 12-02-00 with a reasonable >> delay(say 10 seconds). But, we are getting the result of the window after a >> long delay. >> >> In Spark logs it seems that the watermark is lagging. >> Here are the logs: >> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block: >> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z, >> highWatermark=2019-09-05T11:57:06.302Z, >> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}, >> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z, >> highWatermark=2019-09-05T11:57:06.686Z, >> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}} >> 19/09/05 12:02:50 INFO >> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with >> timestamp: 1567684500500 has completed, watermarks have been updated. >> >> As you can see, when the current processing time is 12:02:50, the >> highWatermark is 11:57:06. >> As the processing time progresses, the gap between processing time and >> highWatermark is increasing. >> >> We ran the same pipeline with same data in Flink Runner and Direct Runner >> and we have not seen this issue. In these runners, we can see that the >> Watermark is almost equal to Processing time. >> >> Sample Input Data : >> >> kafka_source1: >> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'} >> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'} >> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'} >> >> kafka_source2: >> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05 >> 12-01-26 704060'} >> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 >> 12-01-27 712300'} >> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 >> 12-01-28 713951'} >> >> what can be the issue here? >> >> Regards, >> shanta >> >