Hi Sweta,

the output timestamp seems reasonable to me. I guess you're concerned about
watermarks you're seeing, is that correct?

final Instant min = Instant.ofEpochMilli(Long.MIN_VALUE);
final Instant max = Instant.ofEpochMilli(Long.MAX_VALUE);
System.out.printf("Min: %s, Max: %s%n", min, max);

This will print out the following:

Min: -292275055-05-16T16:47:04.192Z, Max: +292278994-08-17T07:12:55.807Z

As you can see the values you're seeing seems to be Long.MIN_VALUE (except
you've lost the minus sign somewhere). My best guess would than be that
either the watermark hasn't progressed at all (may not have been emitted
yet) or you're input records have incorrect timestamp (can you check
those?).

Best,
D.

On Thu, Dec 2, 2021 at 4:12 PM Sweta Kalakuntla <skalakun...@bandwidth.com>
wrote:

> Hi,
>
> I am using a broadcast pattern for publishing rules and aggregating the
> data(https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html).
> My use case is similar and also the code.
> One thing I wanted to capture is to figure out any latevents if any and
> send them to a sink. But when there are events already on the kafka topic
> which weren't consumed and start the app after a couple of hours I see
> output timestamps messed up.
>
> timestamp: 2021-12-02T04:48:20.324+0000, watermark:
> 292269055-12-02T16:47:04.192+0000, timeService.watermark:
> 292269055-12-02T16:47:04.192+0000
>
> I have watermark strategy set on KafkaSource as:
>
> WatermarkStrategy<Record> wmStrategy = WatermarkStrategy
>
> .<CDRRecord>forBoundedOutOfOrderness(Duration.ofSeconds(config.get(OUT_OF_ORDERNESS)))
>         .withTimestampAssigner((cdrRecord, timestamp) ->
> record.getEventTime());
> return
>
> env.addSource(recordSource.assignTimestampsAndWatermarks(wmStrategy))
>                 .name("records Source")
>                 .setParallelism(config.get(SOURCE_PARALLELISM));
>
> Please let me know if you need any more information.
>
> Thanks,
> Sweta
>
>

Reply via email to