Hello Matthias, I am still using ProcessingTimeSessionWindow. But it turns out I was wrong. I tested a couple of times and it did not seem to work. But now it does with both watermarkstrategies removed.
My apologies.' Regards Hans-Peter This is the code: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(Integer.parseInt(envMaxParallelism)); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(Integer.parseInt(envEnableCheckpointing)); Properties kafkaProps = new Properties(); kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, inputBrokers); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, inputGroupId); kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); kafkaProps.setProperty("ssl.truststore.type", inputTrustStoreType); kafkaProps.setProperty("ssl.truststore.password", inputTrustStorePassword); kafkaProps.setProperty("ssl.truststore.location", inputTrustStoreLocation); kafkaProps.setProperty("security.protocol", inputSecurityProtocol); kafkaProps.setProperty("ssl.enabled.protocols", inputSslEnabledProtocols); KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() .setProperties(kafkaProps) .setGroupId(inputGroupId) .setClientIdPrefix(clientId) .setTopics(kafkaInputTopic) .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata))) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .build(); /* Use the watermark stragegy to create a datastream */ DataStream<ObjectNode> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); /* Split the ObjectNode into a Tuple4 */ DataStream<Tuple4<Long, Long, String, String>> tuple4ds = ds.flatMap(new Splitter()) DataStream<String> tuple4DsWmKeyedbytr = tuple4ds .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, String>() { @Override public String getKey(Tuple4<Long, Long, String, String> value) throws Exception { return value.f2; } }) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap)))) .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness))) .process(new MyProcessWindowFunction()); Properties sinkkafkaProps = new Properties(); sinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outputBrokers); sinkkafkaProps.setProperty("ssl.truststore.type", outputTrustStoreType); sinkkafkaProps.setProperty("ssl.truststore.location", outputTrustStoreLocation); sinkkafkaProps.setProperty("ssl.truststore.password", outputTrustStorePassword); sinkkafkaProps.setProperty("security.protocol", outputSecurityProtocol); sinkkafkaProps.setProperty("max.request.size", maxRequestSize); sinkkafkaProps.setProperty("ssl.enabled.protocols", outputSslEnabledProtocols); KafkaSink<String> kSink = KafkaSink.<String>builder() .setBootstrapServers(outputBrokers) .setKafkaProducerConfig(sinkkafkaProps) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(kafkaOutputTopic) .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); // Sink to the Kafka topic tuple4DsWmKeyedbytr.sinkTo(kSink); // Splits the Object node into a Tuple4 private static class Splitter implements FlatMapFunction<ObjectNode, Tuple4<Long, Long, String, String>> { @Override public void flatMap(ObjectNode json, Collector<Tuple4<Long, Long, String, String>> out) throws Exception { // retrieved handling_time twice intentionally one of which will be used for the watermark strategy and the other for the calculation of the elapsed time out.collect(new Tuple4<Long, Long, String, String>(json.get("value").get("handling_time").asLong(), json.get("value").get("handling_time").asLong(), json.get("value").get("transaction_id").asText(), json.get("value").get("original_event").toPrettyString())); } } // Class to sort the events that belong to the same transactions public static class SortEventsHandlingTime implements Comparator<Tuple4<Long, Long, String, String>> { // Let's compare 2 Tuple4 objects public int compare(Tuple4<Long, Long, String, String> o1, Tuple4<Long, Long, String, String> o2) { int result = Long.compare(Long.parseLong(o1.getField(0).toString()), Long.parseLong(o2.getField(0).toString())); if (result > 0) { return 1; } else if (result == 0) { return 0; } else { return -1; } } } // Sorts the events and calculates the elapsed times static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out) throws JsonProcessingException { Long elapsed = 0L; Long pHandlingTime = 0L; Long cumulativeElapsed = 0L; List<Tuple4<Long, Long, String, String>> inputList = new ArrayList<>(); input.forEach(inputList::add); inputList.sort(new SortEventsHandlingTime()); ObjectMapper mapper = new ObjectMapper(); for (Tuple4<Long, Long, String, String> in: inputList){ if (pHandlingTime.equals(0L)) { elapsed = 0L; } else { elapsed = Long.parseLong(in.getField(0).toString()) - pHandlingTime; } cumulativeElapsed = cumulativeElapsed + elapsed; pHandlingTime = Long.parseLong(in.getField(0).toString()); JsonNode originalEvent = mapper.readTree(in.getField(3).toString()); // Cast ObjectNode o = (ObjectNode) originalEvent.get("Message").get("endpoints").get(0).get("endpoint_handlers").get(0); o.put("handling_time", in.getField(0).toString()); o.put("elapsed_time", elapsed.toString()); o.put("cumulative_elapsed_time", cumulativeElapsed.toString()); out.collect(((ObjectNode) originalEvent).toString()); } } } Op di 29 mrt. 2022 om 15:23 schreef Schwalbe Matthias < matthias.schwa...@viseca.ch>: > Hello Hans-Peter, > > > > I’m a little confused which version of your code you are testing against: > > - ProcessingTimeSessionWindows or EventTimeSessionWindows? > - did you keep the withIdleness() ?? > > > > As said before: > > - for ProcessingTimeSessionWindows, watermarks play no role > - if you keep withIdleness(), then the respective sparse DataStream is > event-time-less most of the time, i.e. no triggers fire to close a session > window > - withIdleness() makes only sense if you merge/union/connect multiple > DataStream where at least one stream has their watermarks updated regularly > (i.e. it is not withIdleness()) > - this is not your case, your DAG is linear, no union nor connects > - in event-time mode processing time plays no role, watermarks > exclusively take the role of the progress of model (event) time and hence > the triggering of windows > - in order to trigger a (session-)window at time A the window operator > needs to receive a watermark of at least time A > - next catch regards partitioning > - your first watermark strategy kafkaWmstrategy generates > per-Kafka-partition watermarks > - a keyBy() reshuffles these partitions onto the number of subtasks > according to the hash of the key > - this results in a per subtask calculation of the *lowest* > watermark of all Kafka partitions that happen to be processed by that > subtask > - i.e. if a single Kafka partition makes no watermark progress the > subtask watermark makes no progress > - this surfaces in sparse data as in your case > - your second watermark strategy wmStrategy makes things worse because > - it discards the correct watermarks of the first watermark strategy > - and replaces it with something that is arbitrary (at this point > it is hard to guess the correct max lateness that is a mixture of the > events from multiple Kafka partitions) > > > > Concusion: > > The only way to make the event time session windows work for you in a > timely manner is to make sure watermarks on all involved partitions make > progress, i.e. new events arrive on all partitions in a regular manner. > > > > Hope this helps > > > > Thias > > > > > > *From:* HG <hanspeter.sl...@gmail.com> > *Sent:* Tuesday, March 29, 2022 1:07 PM > *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: Watermarks event time vs processing time > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hello Matthias, > > > > When I remove all the watermark strategies it does not process anything . > > For example when I use WatermarkStrategy*.*noWatermarks*()* instead of > the one I build nothing seems to happen at all. > > > > Also when I skip the part where I add wmStrategy to create tuple4dswm: > DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = > tuple4ds.assignTimestampsAndWatermarks(wmStrategy); > > > > Nothing is processed. > > > > Regards Hans-Peter > > > > Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias < > matthias.schwa...@viseca.ch>: > > Hi Hanspeter, > > > > Let me relate some hints that might help you getting concepts clearer. > > > > From your description I make following assumptions where your are not > specific enough (please confirm or correct in your answer): > > a. You store incoming events in state per transaction_id to be > sorted/aggregated(min/max time) by event time later on > > b. So far you used a session window to determine the point in time > when to emit the stored/enriched/sorted events > > c. Watermarks are generated with bounded out of orderness > > d. You use session windows with a specific gap > > e. In your experiment you ever only send 1000 events and then stop > producing incoming events > > > > Now to your questions: > > - For processing time session windows, watermarks play no role > whatsoever, you simply assume that you’ve seen all events belonging so a > single transaction id if the last such event for a specific transaction id > was processed sessionWindowGap milliseconds ago > - Therefore you see all enriched incoming events the latest > sessionWindowGap ms after the last incoming event (+ some latency) > - In event time mode and resp event time session windows the situation > is exactly the same, only that processing time play no role > - A watermark means (ideally) that no event older than the watermark > time ever follows the watermark (which itself is a meta-event that flows > with the proper events on the same channels) > - In order for a session gap window to forward the enriched events the > window operator needs to receive a watermark that is sessionWindowGap > milliseconds beyond the latest incoming event (in terms of the respective > event time) > - The watermark generator in order to generate a new watermark that > triggers this last session window above needs to encounter an (any) event > that has a timestamp of (<latest event in session window> + outOfOrderness > + sessionWindowGap + 1ms) > - Remember, the watermark generator never generated watermarks based > on processing time, but only based on the timestamps it has seen in events > actually encountered > - Coming back to your idleness configuration: it only means that the > incoming stream becomes idle == timeless after a while … i.e. watermarks > won’t make progress from this steam, and it tells all downstream operators > - Idleness specification is only useful if a respective operator has > another source of valid watermarks (i.e. after a union of two streams, one > active/one idle ….). this is not your case > > > > I hope this clarifies your situation. > > > > Cheers > > > > > > Thias > > > > > > *From:* HG <hanspeter.sl...@gmail.com> > *Sent:* Mittwoch, 16. März 2022 10:06 > *To:* user <user@flink.apache.org> > *Subject:* Watermarks event time vs processing time > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi, > > > > I read from a Kafka topic events that are in JSON format > > These event contain a handling time (aka event time) in epoch > milliseconds, a transaction_id and a large nested JSON structure. > > I need to group the events by transaction_id, order them by handling time > and calculate the differences in handling time. > > The events are updated with this calculated elapsed time and pushed > further. > > So all events that go in should come out with the elapsed time added. > > > > For testing I use events that are old (so handling time is not nearly the > wall clock time) > > Initially I used EventTimeSessionWindows but somehow the processing did > not run as expected. > > When I pushed 1000 events eventually 800 or so would appear at the output. > This was resolved by switching to ProcessingTimeSessionWindows . > My thought was then that I could remove the watermarkstrategies with > watermarks with timestamp assigners (handling time) for the Kafka input > stream and the data stream. > > However this was not the case. > > > > Can anyone enlighten me as to why the watermark strategies are still > needed? > > > > Below the code > > > > KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder() > .setProperties(kafkaProps) > .setProperty("ssl.truststore.type", trustStoreType) > .setProperty("ssl.truststore.password", trustStorePassword) > .setProperty("ssl.truststore.location", trustStoreLocation) > .setProperty("security.protocol", securityProtocol) > .setProperty("partition.discovery.interval.ms", > partitionDiscoveryIntervalMs) > .setProperty("commit.offsets.on.checkpoint", > commitOffsetsOnCheckpoint) > .setGroupId(inputGroupId) > .setClientIdPrefix(clientId) > .setTopics(kafkaInputTopic) > .setDeserializer(KafkaRecordDeserializationSchema.of(new > JSONKeyValueDeserializationSchema(fetchMetadata))) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > .build(); > > /* A watermark is needed to prevent duplicates! */ > WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy > > .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) > > .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) > .withTimestampAssigner(new > SerializableTimestampAssigner<ObjectNode>() { > @Override > public long extractTimestamp(ObjectNode element, long > eventTime) { > return > element.get("value").get("handling_time").asLong(); > } > }); > > /* Use the watermark stragegy to create a datastream */ > DataStream<ObjectNode> ds = env.fromSource(source, > kafkaWmstrategy, "Kafka Source"); > > /* Split the ObjectNode into a Tuple4 */ > DataStream<Tuple4<Long, Long, String, String>> tuple4ds = > ds.flatMap(new Splitter()); > > WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy = > WatermarkStrategy > .<Tuple4<Long, Long, String, > String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness))) > > .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness))) > .withTimestampAssigner(new > SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() { > @Override > public long extractTimestamp(Tuple4<Long, Long, > String, String> element, long eventTime) { > return element.f0; > } > }); > > DataStream<Tuple4<Long, Long, String, String>> tuple4dswm = > tuple4ds.assignTimestampsAndWatermarks(wmStrategy); > > DataStream<String> tuple4DsWmKeyedbytr = tuple4dswm > .keyBy(new KeySelector<Tuple4<Long, Long, String, String>, > String>() { > @Override > public String getKey(Tuple4<Long, Long, String, String> > value) throws Exception { > return value.f2; > } > }) > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap)))) > > .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness))) > .process(new MyProcessWindowFunction()); > > > KafkaSink<String> kSink = KafkaSink.<String>builder() > .setBootstrapServers(outputBrokers) > > .setRecordSerializer(KafkaRecordSerializationSchema.builder() > .setTopic(kafkaOutputTopic) > .setValueSerializationSchema(new > SimpleStringSchema()) > .build() > ) > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build(); > > // Sink to the Kafka topic > tuple4DsWmKeyedbytr.sinkTo(kSink); > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >