Hello Matthias,

I am still using ProcessingTimeSessionWindow.
But it turns out I was wrong.
I tested a couple of times and it did not seem to work.
But now it does with both watermarkstrategies removed.

My apologies.'
Regards Hans-Peter

This is the code:

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(Integer.parseInt(envMaxParallelism));
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(Integer.parseInt(envEnableCheckpointing));


        Properties kafkaProps  = new Properties();
        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
inputBrokers);
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
inputGroupId);
        kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
autoCommit);

kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
autoCommitInterval);
        kafkaProps.setProperty("ssl.truststore.type", inputTrustStoreType);
        kafkaProps.setProperty("ssl.truststore.password",
inputTrustStorePassword);
        kafkaProps.setProperty("ssl.truststore.location",
inputTrustStoreLocation);
        kafkaProps.setProperty("security.protocol", inputSecurityProtocol);
        kafkaProps.setProperty("ssl.enabled.protocols",
inputSslEnabledProtocols);

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setGroupId(inputGroupId)
                .setClientIdPrefix(clientId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .build();


        /* Use the watermark stragegy to create a datastream */
        DataStream<ObjectNode> ds = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");

        /* Split the ObjectNode into a Tuple4 */
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter())

        DataStream<String>  tuple4DsWmKeyedbytr =  tuple4ds
            .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
String>() {
                @Override
                public String getKey(Tuple4<Long, Long, String, String>
value) throws Exception {
                    return value.f2;
                }
            })

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))

.allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
            .process(new MyProcessWindowFunction());



        Properties sinkkafkaProps  = new Properties();
        sinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);
        sinkkafkaProps.setProperty("ssl.truststore.type",
outputTrustStoreType);
        sinkkafkaProps.setProperty("ssl.truststore.location",
outputTrustStoreLocation);
        sinkkafkaProps.setProperty("ssl.truststore.password",
outputTrustStorePassword);
        sinkkafkaProps.setProperty("security.protocol",
outputSecurityProtocol);
        sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
        sinkkafkaProps.setProperty("ssl.enabled.protocols",
outputSslEnabledProtocols);


        KafkaSink<String> kSink = KafkaSink.<String>builder()
                    .setBootstrapServers(outputBrokers)
                    .setKafkaProducerConfig(sinkkafkaProps)

.setRecordSerializer(KafkaRecordSerializationSchema.builder()
                            .setTopic(kafkaOutputTopic)
                            .setValueSerializationSchema(new
SimpleStringSchema())
                            .build()
                    )
                    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .build();

            // Sink to the Kafka topic
            tuple4DsWmKeyedbytr.sinkTo(kSink);


    // Splits the Object node into a Tuple4
    private static class Splitter implements FlatMapFunction<ObjectNode,
Tuple4<Long, Long, String, String>> {
        @Override
        public void flatMap(ObjectNode json, Collector<Tuple4<Long, Long,
String, String>> out) throws Exception {
            // retrieved handling_time twice intentionally one of which
will be used for the watermark strategy and the other for the calculation
of the elapsed time
            out.collect(new Tuple4<Long, Long, String,
String>(json.get("value").get("handling_time").asLong(),
json.get("value").get("handling_time").asLong(),
json.get("value").get("transaction_id").asText(),
 json.get("value").get("original_event").toPrettyString()));
        }
    }

    // Class to sort the events that belong to the same transactions
    public static class SortEventsHandlingTime implements
Comparator<Tuple4<Long, Long, String, String>> {

        // Let's compare 2 Tuple4 objects
        public int compare(Tuple4<Long, Long, String, String> o1,
Tuple4<Long, Long, String, String> o2) {
            int result =
Long.compare(Long.parseLong(o1.getField(0).toString()),
Long.parseLong(o2.getField(0).toString()));
            if (result > 0) {
                return 1;
            } else if (result == 0) {
                return 0;
            } else {
                return -1;
            }
        }
    }

    // Sorts the events and calculates the elapsed times
    static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple4<Long, Long, String, String>, String, String,
TimeWindow> {
        @Override
        public void process(String key, Context context,
Iterable<Tuple4<Long, Long, String, String>> input, Collector<String> out)
throws JsonProcessingException {
            Long elapsed       = 0L;
            Long pHandlingTime = 0L;
            Long cumulativeElapsed  = 0L;

            List<Tuple4<Long, Long, String, String>> inputList = new
ArrayList<>();
            input.forEach(inputList::add);
            inputList.sort(new SortEventsHandlingTime());

            ObjectMapper mapper = new ObjectMapper();

            for (Tuple4<Long, Long, String, String> in: inputList){

                if (pHandlingTime.equals(0L)) {
                    elapsed       = 0L;
                } else {
                    elapsed       =
Long.parseLong(in.getField(0).toString()) - pHandlingTime;
                }
                cumulativeElapsed  = cumulativeElapsed + elapsed;
                pHandlingTime = Long.parseLong(in.getField(0).toString());

                JsonNode originalEvent =
mapper.readTree(in.getField(3).toString());

                // Cast
                ObjectNode o = (ObjectNode)
originalEvent.get("Message").get("endpoints").get(0).get("endpoint_handlers").get(0);
                o.put("handling_time", in.getField(0).toString());
                o.put("elapsed_time", elapsed.toString());
                o.put("cumulative_elapsed_time",
cumulativeElapsed.toString());

                out.collect(((ObjectNode) originalEvent).toString());
            }
        }
    }


Op di 29 mrt. 2022 om 15:23 schreef Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hello Hans-Peter,
>
>
>
> I’m a little confused which version of your code you are testing against:
>
>    - ProcessingTimeSessionWindows or EventTimeSessionWindows?
>    - did you keep the withIdleness() ??
>
>
>
> As said before:
>
>    - for ProcessingTimeSessionWindows, watermarks play no role
>    - if you keep withIdleness(), then the respective sparse DataStream is
>    event-time-less most of the time, i.e. no triggers fire to close a session
>    window
>    - withIdleness() makes only sense if you merge/union/connect multiple
>    DataStream where at least one stream has their watermarks updated regularly
>    (i.e. it is not withIdleness())
>       - this is not your case, your DAG is linear, no union nor connects
>    - in event-time mode processing time plays no role, watermarks
>    exclusively take the role of the progress of model (event) time and hence
>    the triggering of windows
>    - in order to trigger a (session-)window at time A the window operator
>    needs to receive a watermark of at least time A
>    - next catch regards partitioning
>       - your first watermark strategy kafkaWmstrategy generates
>       per-Kafka-partition watermarks
>       - a keyBy() reshuffles these partitions onto the number of subtasks
>       according to the hash of the key
>       - this results in a per subtask calculation of the *lowest*
>       watermark of all Kafka partitions that happen to be processed by that
>       subtask
>       - i.e. if a single Kafka partition makes no watermark progress the
>       subtask watermark makes no progress
>       - this surfaces in sparse data as in your case
>    - your second watermark strategy wmStrategy makes things worse because
>       - it discards the correct watermarks of the first watermark strategy
>       - and replaces it with something that is arbitrary (at this point
>       it is hard to guess the correct max lateness that is a mixture of the
>       events from multiple Kafka partitions)
>
>
>
> Concusion:
>
> The only way to make the event time session windows work for you in a
> timely manner is to make sure watermarks on all involved partitions make
> progress, i.e. new events arrive on all partitions in a regular manner.
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <hanspeter.sl...@gmail.com>
> *Sent:* Tuesday, March 29, 2022 1:07 PM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello Matthias,
>
>
>
> When I remove all the watermark strategies it does not process anything .
>
> For example when I use WatermarkStrategy*.*noWatermarks*()* instead of
> the one I build nothing seems to happen at all.
>
>
>
>  Also when I skip the part where I add wmStrategy  to create tuple4dswm:
>  DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>
>
> Nothing is processed.
>
>
>
> Regards Hans-Peter
>
>
>
> Op wo 16 mrt. 2022 om 15:52 schreef Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Hi Hanspeter,
>
>
>
> Let me relate some hints that might help you getting concepts clearer.
>
>
>
> From your description I make following assumptions where your are not
> specific enough (please confirm or correct in your answer):
>
> a.       You store incoming events in state per transaction_id to be
> sorted/aggregated(min/max time) by event time later on
>
> b.       So far you used a session window to determine the point in time
> when to emit the stored/enriched/sorted events
>
> c.        Watermarks are generated with bounded out of orderness
>
> d.       You use session windows with a specific gap
>
> e.       In your experiment you ever only send 1000 events and then stop
> producing incoming events
>
>
>
> Now to your questions:
>
>    - For processing time session windows, watermarks play no role
>    whatsoever, you simply assume that you’ve seen all events belonging so a
>    single transaction id if the last such event for a specific transaction id
>    was processed sessionWindowGap milliseconds ago
>    - Therefore you see all enriched incoming events the latest
>    sessionWindowGap ms after the last incoming event (+ some latency)
>    - In event time mode and resp event time session windows the situation
>    is exactly the same, only that processing time play no role
>    - A watermark means (ideally) that no event older than the watermark
>    time ever follows the watermark (which itself is a meta-event that flows
>    with the proper events on the same channels)
>    - In order for a session gap window to forward the enriched events the
>    window operator needs to receive a watermark that is sessionWindowGap
>    milliseconds beyond the latest incoming event (in terms of the respective
>    event time)
>    - The watermark generator in order to generate a new watermark that
>    triggers this last session window above needs to encounter an (any) event
>    that has a timestamp of (<latest event in session window> + outOfOrderness
>    + sessionWindowGap + 1ms)
>    - Remember, the watermark generator never generated watermarks based
>    on processing time, but only based on the timestamps it has seen in events
>    actually encountered
>    - Coming back to your idleness configuration: it only means that the
>    incoming stream becomes idle == timeless after a while … i.e. watermarks
>    won’t make progress from this steam, and it tells all downstream operators
>    - Idleness specification is only useful if a respective operator has
>    another source of valid watermarks (i.e. after a union of two streams, one
>    active/one idle ….). this is not your case
>
>
>
> I hope this clarifies your situation.
>
>
>
> Cheers
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* HG <hanspeter.sl...@gmail.com>
> *Sent:* Mittwoch, 16. März 2022 10:06
> *To:* user <user@flink.apache.org>
> *Subject:* Watermarks event time vs processing time
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
>
>
>
> I read from a Kafka topic events that are in JSON format
>
> These event contain a handling time (aka event time) in epoch
> milliseconds, a transaction_id and a large nested JSON structure.
>
> I need to group the events by transaction_id, order them by handling time
> and calculate the differences in handling time.
>
> The events are updated with this calculated elapsed time and pushed
> further.
>
> So all events that go in should come out with the elapsed time added.
>
>
>
> For testing I use events that are old (so handling time is not nearly the
> wall clock time)
>
> Initially I used EventTimeSessionWindows but somehow the processing did
> not run as expected.
>
> When I pushed 1000 events eventually 800 or so would appear at the output.
> This was resolved by switching to ProcessingTimeSessionWindows .
> My thought was then that I could remove the watermarkstrategies with
> watermarks with timestamp assigners (handling time) for the Kafka input
> stream and the data stream.
>
> However this was not the case.
>
>
>
> Can anyone enlighten me as to why the watermark strategies are still
> needed?
>
>
>
> Below the code
>
>
>
>         KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>                 .setProperties(kafkaProps)
>                 .setProperty("ssl.truststore.type", trustStoreType)
>                 .setProperty("ssl.truststore.password", trustStorePassword)
>                 .setProperty("ssl.truststore.location", trustStoreLocation)
>                 .setProperty("security.protocol", securityProtocol)
>                 .setProperty("partition.discovery.interval.ms",
> partitionDiscoveryIntervalMs)
>                 .setProperty("commit.offsets.on.checkpoint",
> commitOffsetsOnCheckpoint)
>                 .setGroupId(inputGroupId)
>                 .setClientIdPrefix(clientId)
>                 .setTopics(kafkaInputTopic)
>                 .setDeserializer(KafkaRecordDeserializationSchema.of(new
> JSONKeyValueDeserializationSchema(fetchMetadata)))
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>                 .build();
>
>         /* A watermark is needed to prevent duplicates! */
>         WatermarkStrategy<ObjectNode> kafkaWmstrategy = WatermarkStrategy
>
> .<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<ObjectNode>() {
>                     @Override
>                     public long extractTimestamp(ObjectNode element, long
> eventTime) {
>                         return
> element.get("value").get("handling_time").asLong();
>                     }
>                 });
>
>         /* Use the watermark stragegy to create a datastream */
>         DataStream<ObjectNode> ds = env.fromSource(source,
> kafkaWmstrategy, "Kafka Source");
>
>         /* Split the ObjectNode into a Tuple4 */
>         DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
> ds.flatMap(new Splitter());
>
>         WatermarkStrategy<Tuple4<Long, Long, String, String>> wmStrategy =
> WatermarkStrategy
>                 .<Tuple4<Long, Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>                 .withTimestampAssigner(new
> SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
>                     @Override
>                     public long extractTimestamp(Tuple4<Long, Long,
> String, String> element, long eventTime) {
>                         return element.f0;
>                     }
>                 });
>
>         DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
> tuple4ds.assignTimestampsAndWatermarks(wmStrategy);
>
>         DataStream<String>  tuple4DsWmKeyedbytr =  tuple4dswm
>             .keyBy(new KeySelector<Tuple4<Long, Long, String, String>,
> String>() {
>                 @Override
>                 public String getKey(Tuple4<Long, Long, String, String>
> value) throws Exception {
>                     return value.f2;
>                 }
>             })
>
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(Integer.parseInt(sessionWindowGap))))
>
> .allowedLateness(Time.seconds(Integer.parseInt(sessionAllowedLateness)))
>             .process(new MyProcessWindowFunction());
>
>
>         KafkaSink<String> kSink = KafkaSink.<String>builder()
>                 .setBootstrapServers(outputBrokers)
>
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>                         .setTopic(kafkaOutputTopic)
>                         .setValueSerializationSchema(new
> SimpleStringSchema())
>                         .build()
>                 )
>                 .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>                 .build();
>
>         // Sink to the Kafka topic
>         tuple4DsWmKeyedbytr.sinkTo(kSink);
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to