Hi, We are using flink 12.1 on AWS EMR. The job reads the event stream and enrich stream from another topic. We extend AssignerWithPeriodicWatermarks to assign watermarks and extract timestamp from the event and handle idle source partitions. AutoWatermarkInterval set to 5000L. The timestamp extractor looks like below -
@Override public long extractTimestamp(Raw event, long previousElementTimestamp) { lastRecordProcessingTime = System.currentTimeMillis(); Double eventTime = Double.parseDouble(event.getTimestamp().toString()).longValue(); long timestamp = Instant.ofEpochMilli(eventTime *1_000).toEpochMilli(); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } Second step the rules are joined to events, this is done in keyedprocess function. What we have observed is that at times when the job starts consuming from the beginning of the event source stream, the timestamp accessed in the keyedprocess fn using context.timestamp comes as null and the code is throwing NPE. This happens only for some records intermittently and the same event when we try to process in another environment it processes fine, that means the event is getting parsed fine. What could be the issue, anyone has any idea, because as far as timestamp goes it could only be null if the timestamp extractor sends null. Thanks.