Forgot to mention: A FixedWindow of duration 1 minute is applied before
applying SqlTransform.

On Tue, Sep 10, 2019 at 6:03 PM rahul patwari <rahulpatwari8...@gmail.com>
wrote:

> Hi,
> I am facing this issue too.
> +dev <d...@beam.apache.org>
>
> Here is the Pipeline that we are using(providing a very simple pipeline to
> highlight the issue):
> KafkaSource -> SqlTransform -> KafkaSink
>
> We are reading from a single topic in KafkaSource with a single partition.
>
> Here is the data that we are producing to KafkaSource topic:
> "str1", "2019-09-10 11:36:42"
> "str2", "2019-09-10 11:36:44"
> "str3", "2019-09-10 11:36:45"
>
> The first column name is "strCol".
> The second column, i.e. the timestamp in string format is being used as
> the timestamp of the element.
> The timestamp is the wall time when the record got generated.
> After publishing this data to the Kafka topic, we are not publishing any
> more data. The topic is idle after that.
> The timestamps of the records are monotonically increasing.
>
> Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol"
>
> Here is the result from KafkaSink:
> {"strCol":{"string":"str1"}}
> {"strCol":{"string":"str3"}}
> {"strCol":{"string":"str2"}}
>
> The expected result is written to KafkaSink Correctly, *but with a delay*.
>
> Here are the logs from Spark Driver:
> ...
> 19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z,
> highWatermark=2019-09-10T11:43:37.273Z,
> synchronizedProcessingTime=2019-09-10T11:40:33.000Z}}
> ...
> 19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z,
> highWatermark=2019-09-10T11:44:54.238Z,
> synchronizedProcessingTime=2019-09-10T11:41:17.000Z}}
>
> As per the logs,
> when the processing time was 12:12:42, the highWatermark was at 11:43:37.
> Almost 30 minutes delay. And
> when the processing time was 12:18:53, the highWatermark was at 11:44:54.
>
> From the above logs, it seems that the watermark is moving slowly.
>
> Is there an IT for SparkRunner with Unbounded data and Windowing
> aggregation?
> Is this a known bug?
>
> Thanks,
> Rahul
>
> On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram <shantachakp...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We have detected an issue with SparkRunner and Watermark.
>>
>> *Pipeline*: Read from two Kafka Sources => Apply fixed window of
>> duration 1 minute to both the PCollections => Apply SqlTransform with query
>> "select c.datetime, c.country ,s.name, s.id from `kafka_source1` as s
>> join `kafka_source2` as c on s.name = c.name" => write the emitted
>> output to Kafka Sink
>>
>> we are using the watermark provided in
>> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
>> We have given maxDelay as 0.
>>
>> As we have applied fixed window of 1 minute duration and as the elements
>> timestamps are monotonically increasing, we are expecting the output to be
>> emitted when the current processing time crosses 12-02-00 with a reasonable
>> delay(say 10 seconds). But, we are getting the result of the window after a
>> long delay.
>>
>> In Spark logs it seems that the watermark is lagging.
>> Here are the logs:
>> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
>> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
>> highWatermark=2019-09-05T11:57:06.302Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
>> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
>> highWatermark=2019-09-05T11:57:06.686Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
>> 19/09/05 12:02:50 INFO
>> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
>> timestamp: 1567684500500 has completed, watermarks have been updated.
>>
>> As you can see, when the current processing time is 12:02:50, the
>> highWatermark is 11:57:06.
>> As the processing time progresses, the gap between processing time and
>> highWatermark is increasing.
>>
>> We ran the same pipeline with same data in Flink Runner and Direct Runner
>> and we have not seen this issue. In these runners, we can see that the
>> Watermark is almost equal to Processing time.
>>
>> Sample Input Data :
>>
>> kafka_source1:
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
>> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}
>>
>> kafka_source2:
>> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
>> 12-01-26 704060'}
>> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05
>> 12-01-27 712300'}
>> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05
>> 12-01-28 713951'}
>>
>> what can be the issue here?
>>
>> Regards,
>> shanta
>>
>

Reply via email to