Hi, I am having a problem getting watermark right. The setup is - I have a Flink Job which reads from a Kafka topic, uses Protobuf Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up the value and finally returns the result.
The code is pasted below. The problem here is, I'm not able to reach the sink. I am able to reach the assignTimestamp when the timestamp arrives, but past that, neither process function nor the sink function is getting invoked in spite of pumping events regularly. I'm not able to figure out how to debug this issue. Plz help. public class StreamingJob { public static void main(String[] args) throws Exception { Properties kafkaConsumerProps = new Properties(); kafkaConsumerProps.setProperty("bootstrap.servers", "{bootstrap_servers}"); kafkaConsumerProps.setProperty("group.id", "{group_id}"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.enableCheckpointing(100); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setMaxParallelism(5); env.setParallelism(5); SingleOutputStreamOperator<Eventi.Event> texStream = env .addSource(new FlinkKafkaConsumer011<>("auth", new EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5); SlidingEventTimeWindows window = SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30)); texStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Eventi.Event>() { @Override public long extractAscendingTimestamp(Eventi.Event element) { return element.getEventTime().getSeconds() * 1000; } }).keyBy(Eventi.Event::getEventTime).window(window).process(new ProcessWindowFunction<Eventi.Event, Object, Timestamp, TimeWindow>() { @Override public void process(Timestamp timestamp, Context context, Iterable<Eventi.Event> elements, Collector<Object> out) throws Exception { int sum = 0; for (Eventi.Event element : elements) { sum++; } out.collect(sum); } }).print() env.execute(); } } -- *"The information contained in this e-mail and any accompanying documents may contain information that is confidential or otherwise protected from disclosure. If you are not the intended recipient of this message, or if this message has been addressed to you in error, please immediately alert the sender by replying to this e-mail and then delete this message, including any attachments. Any dissemination, distribution or other use of the contents of this message by anyone other than the intended recipient is strictly prohibited. All messages sent to and from this e-mail address may be monitored as permitted by applicable law and regulations to ensure compliance with our internal policies and to protect our business."*