Hi,

I read from a Kafka topic events that are in JSON format
These event contain a handling time (aka event time) in epoch milliseconds,
a transaction_id and a large nested JSON structure.
I need to group the events by transaction_id, order them by handling time
and calculate the differences in handling time.
The events are updated with this calculated elapsed time and pushed
further.
So all events that go in should come out with the elapsed time added.

For testing I use events that are old (so handling time is not nearly the
wall clock time)
Initially I used EventTimeSessionWindows but somehow the processing did not
run as expected.
When I pushed 1000 events eventually 800 or so would appear at the output.
This was resolved by switching to ProcessingTimeSessionWindows .
My thought was then that I could remove the watermarkstrategies with
watermarks with timestamp assigners (handling time) for the Kafka input
stream and the data stream.
However this was not the case.

Can anyone enlighten me as to why the watermark strategies are still needed?

Below the code

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type", trustStoreType)
                .setProperty("ssl.truststore.password", trustStorePassword)
                .setProperty("ssl.truststore.location", trustStoreLocation)
                .setProperty("security.protocol", securityProtocol)
                .setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();

        /* A watermark is needed to prevent duplicates! */
        WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy

.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))

.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new
SerializableTimestampAssigner<ObjectNode>() {
                    @Override
                    public long extractTimestamp(ObjectNode element, long
eventTime) {
                        return
element.get("value").get("handling_time").asLong();
                    }
                });

        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy,
"Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter());

        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
WatermarkStrategy
                .<Tuple4<Long, Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))

.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String,
String> element, long eventTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String>
value) throws Exception {
                    return value.f2;
                }
            })

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))

.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                .setBootstrapServers(outputBrokers)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(kafkaOutputTopic)
                        .setValueSerializationSchema(new
SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // Sink to the Kafka topic
        tuple4DsWmKeyedbytr.sinkTo(kSink);
        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type", trustStoreType)
                .setProperty("ssl.truststore.password", trustStorePassword)
                .setProperty("ssl.truststore.location", trustStoreLocation)
                .setProperty("security.protocol", securityProtocol)
                .setProperty("partition.discovery.interval.ms", 
partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint", 
commitOffsetsOnCheckpoint)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new 
JSONKeyValueDeserializationSchema(fetchMetadata)))
                
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();

        /* A watermark is needed to prevent duplicates! */
        WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
                
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new 
SerializableTimestampAssigner<ObjectNode>() {
                    @Override
                    public long extractTimestamp(ObjectNode element, long 
eventTime) {
                        return 
element.get("value").get("handling_time").asLong();
                    }
                });

        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source, kafkaWmstrategy, 
"Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds = 
ds.flatMap(new Splitter());

        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = 
WatermarkStrategy
                .<Tuple4<Long, Long, String, 
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
                .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new 
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String, 
String> element, long eventTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = 
tuple4ds.assignTimestampsAndWatermarks(wmStrategy);

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, 
String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String> value) 
throws Exception {
                    return value.f2;
                }
            })
            
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
            
.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                .setBootstrapServers(outputBrokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(kafkaOutputTopic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        // Sink to the Kafka topic
        tuple4DsWmKeyedbytr.sinkTo(kSink);

        env.execute("Cag Events");
    }

    // Splits the Object node into a Tuple4
    private static class Splitter implements FlatMapFunction<ObjectNode, 
Tuple4<Long, Long, String, String>> {
        @Override
        public void flatMap(ObjectNode json, Collector<Tuple4<Long, Long, 
String, String>> out) throws Exception {
            // retrieved handling_time twice intentionally one of which will be 
used for the watermark strategy and the other for the calculation of the 
elapsed time
            out.collect(new Tuple4<Long, Long, String, 
String>(json.get("value").get("handling_time").asLong(), 
json.get("value").get("handling_time").asLong(), 
json.get("value").get("transaction_id").asText(),  
json.get("value").get("original_event").toPrettyString()));
        }
    }

    // Class to sort the events that belong to the same transactions
    public static class SortEventsHandlingTime implements 
Comparator<Tuple4<Long, Long, String, String>> {

        // Let's compare 2 Tuple4 objects
        public int compare(Tuple4<Long, Long, String, String> o1, Tuple4<Long, 
Long, String, String> o2) {
            int result = 
Long.compare(Long.parseLong(o1.getField(0).toString()), 
Long.parseLong(o2.getField(0).toString()));
            if (result > 0) {
                return 1;
            } else if (result == 0) {
                return 0;
            } else {
                return -1;
            }
        }
    }

    // Sorts the events and calculates the elapsed times
    static class MyProcessWindowFunction extends 
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, 
TimeWindow> {
        @Override
        public void process(String key, Context context, Iterable<Tuple4<Long, 
Long, String, String>> input, Collector<String> out) throws 
JsonProcessingException {
            Long elapsed       = 0L;
            Long pHandlingTime = 0L;
            Long cumulativeElapsed  = 0L;

            List<Tuple4<Long, Long, String, String>> inputList = new 
ArrayList<>();
            input.forEach(inputList::add);
            inputList.sort(new SortEventsHandlingTime());

            ObjectMapper mapper = new ObjectMapper();

            for (Tuple4<Long, Long, String, String> in: inputList){

                if (pHandlingTime.equals(0L)) {
                    elapsed       = 0L;
                } else {
                    elapsed       = Long.parseLong(in.getField(0).toString()) - 
pHandlingTime;
                }
                cumulativeElapsed  = cumulativeElapsed + elapsed;
                pHandlingTime = Long.parseLong(in.getField(0).toString());

                JsonNode originalEvent = 
mapper.readTree(in.getField(3).toString());

                // Cast
                ObjectNode o = (ObjectNode) 
originalEvent.get("Message").get("endpoints").get(0).get("endpoint_handlers").get(0);
                o.put("handling_time", in.getField(0).toString());
                o.put("elapsed_time", elapsed.toString());
                o.put("cumulative_elapsed_time", cumulativeElapsed.toString());

                out.collect(((ObjectNode) originalEvent).toString());
            }
        }
    }

Reply via email to