Hi, I read from a Kafka topic events that are in JSON format These event contain a handling time (aka event time) in epoch milliseconds, a transaction_id and a large nested JSON structure. I need to group the events by transaction_id, order them by handling time and calculate the differences in handling time. The events are updated with this calculated elapsed time and pushed further. So all events that go in should come out with the elapsed time added.
For testing I use events that are old (so handling time is not nearly the wall clock time) Initially I used EventTimeSessionWindows but somehow the processing did not run as expected. When I pushed 1000 events eventually 800 or so would appear at the output. This was resolved by switching to ProcessingTimeSessionWindows . My thought was then that I could remove the watermarkstrategies with watermarks with timestamp assigners (handling time) for the Kafka input stream and the data stream. However this was not the case. Can anyone enlighten me as to why the watermark strategies are still needed? Below the code KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type", trustStoreType) .setProperty("ssl.truststore.password", trustStorePassword) .setProperty("ssl.truststore.location", trustStoreLocation) .setProperty("security.protocol", securityProtocol) .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs) .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint) .setGroupId(inputGroupId) .setClientIdPrefix(clientId) .setTopics(kafkaInputTopic) .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata))) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build(); /* A watermark is needed to prevent duplicates! */ WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) .withTimestampAssigner(new SerializableTimestampAssigner<ObjectNode>() { @Override public long extractTimestamp(ObjectNode element, long eventTime) { return element.get("value").get("handling_time").asLong(); } }); /* Use the watermark stragegy to create a datastream */ DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy, "Kafka Source"); /* Split the ObjectNode into a Tuple4 */ DataStream<Tuple4<Long, Long, String, String>> tuple4ds = ds.flatMap(new Splitter()); WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = WatermarkStrategy .<Tuple4<Long, Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() { @Override public long extractTimestamp(Tuple4<Long, Long, String, String> element, long eventTime) { return element.f0; } }); DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmStrategy); DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, String>() { @Override public String getKey(Tuple4<Long, Long, String, String> value) throws Exception { return value.f2; } }) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap)))) .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness))) .process(new MyProcessWindowFunction()); KafkaSink<String> kSink = KafkaSink.<String>builder() .setBootstrapServers(outputBrokers) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(kafkaOutputTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); // Sink to the Kafka topic tuple4DsWmKeyedbytr.sinkTo(kSink);
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.type", trustStoreType) .setProperty("ssl.truststore.password", trustStorePassword) .setProperty("ssl.truststore.location", trustStoreLocation) .setProperty("security.protocol", securityProtocol) .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs) .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint) .setGroupId(inputGroupId) .setClientIdPrefix(clientId) .setTopics(kafkaInputTopic) .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata))) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build(); /* A watermark is needed to prevent duplicates! */ WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) .withTimestampAssigner(new SerializableTimestampAssigner<ObjectNode>() { @Override public long extractTimestamp(ObjectNode element, long eventTime) { return element.get("value").get("handling_time").asLong(); } }); /* Use the watermark stragegy to create a datastream */ DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy, "Kafka Source"); /* Split the ObjectNode into a Tuple4 */ DataStream<Tuple4<Long, Long, String, String>> tuple4ds = ds.flatMap(new Splitter()); WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = WatermarkStrategy .<Tuple4<Long, Long, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() { @Override public long extractTimestamp(Tuple4<Long, Long, String, String> element, long eventTime) { return element.f0; } }); DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = tuple4ds.assignTimestampsAndWatermarks(wmStrategy); DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, String>() { @Override public String getKey(Tuple4<Long, Long, String, String> value) throws Exception { return value.f2; } }) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap)))) .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness))) .process(new MyProcessWindowFunction()); KafkaSink<String> kSink = KafkaSink.<String>builder() .setBootstrapServers(outputBrokers) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(kafkaOutputTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); // Sink to the Kafka topic tuple4DsWmKeyedbytr.sinkTo(kSink); env.execute("Cag Events"); } // Splits the Object node into a Tuple4 private static class Splitter implements FlatMapFunction<ObjectNode, Tuple4<Long, Long, String, String>> { @Override public void flatMap(ObjectNode json, Collector<Tuple4<Long, Long, String, String>> out) throws Exception { // retrieved handling_time twice intentionally one of which will be used for the watermark strategy and the other for the calculation of the elapsed time out.collect(new Tuple4<Long, Long, String, String>(json.get("value").get("handling_time").asLong(), json.get("value").get("handling_time").asLong(), json.get("value").get("transaction_id").asText(), json.get("value").get("original_event").toPrettyString())); } } // Class to sort the events that belong to the same transactions public static class SortEventsHandlingTime implements Comparator<Tuple4<Long, Long, String, String>> { // Let's compare 2 Tuple4 objects public int compare(Tuple4<Long, Long, String, String> o1, Tuple4<Long, Long, String, String> o2) { int result = Long.compare(Long.parseLong(o1.getField(0).toString()), Long.parseLong(o2.getField(0).toString())); if (result > 0) { return 1; } else if (result == 0) { return 0; } else { return -1; } } } // Sorts the events and calculates the elapsed times static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) throws JsonProcessingException { Long elapsed = 0L; Long pHandlingTime = 0L; Long cumulativeElapsed = 0L; List<Tuple4<Long, Long, String, String>> inputList = new ArrayList<>(); input.forEach(inputList::add); inputList.sort(new SortEventsHandlingTime()); ObjectMapper mapper = new ObjectMapper(); for (Tuple4<Long, Long, String, String> in: inputList){ if (pHandlingTime.equals(0L)) { elapsed = 0L; } else { elapsed = Long.parseLong(in.getField(0).toString()) - pHandlingTime; } cumulativeElapsed = cumulativeElapsed + elapsed; pHandlingTime = Long.parseLong(in.getField(0).toString()); JsonNode originalEvent = mapper.readTree(in.getField(3).toString()); // Cast ObjectNode o = (ObjectNode) originalEvent.get("Message").get("endpoints").get(0).get("endpoint_handlers").get(0); o.put("handling_time", in.getField(0).toString()); o.put("elapsed_time", elapsed.toString()); o.put("cumulative_elapsed_time", cumulativeElapsed.toString()); out.collect(((ObjectNode) originalEvent).toString()); } } }