Hi all, I'll illustrate my approach with an example as it is definitely unorthodox. Here's some sample code. It works for me...I hope there are no (obvious) flaws!
//myStream should be a stream of objects associated to a timestamp. the idea is to create a Flink app that //sends each object to kafka with the ability of also sending an extra end-of-stream message after all events //for the same associated timestamp have been sent final SingleOutputStreamOperator<Tuple2<Long, Object>> myStream = null; //replace with actual stream final String KAFKA_SINK_TOPIC = "OUTPUT_TOPIC"; final SinkFunction<Tuple2<Long, Object>> singleObjectKafkaSink = null; //replace with... //needs to take care of mapping a timestamp into an end-of-stream marker record final KeyedSerializationSchema serializationSchema = null; //replace with... final Properties producerProperties = null; //replace with... final int kafkaProducerPoolSize = 6; //for example... //sinks every event of a window myStream.addSink(singleObjectKafkaSink); //sends one end-of-stream message per kafka partition myStream .map(item -> item._1()) //keep only the time .keyBy(item -> 1) //forces every event to go to the same task .process(new KeyedProcessFunction<Integer, Long, Tuple2<Long, Integer>>() { private KafkaProducer<String, Object> kafkaProducer; @Override public void open(Configuration parameters) throws Exception { kafkaProducer = new KafkaProducer<>(producerProperties); } @Override public void close() throws Exception { if (kafkaProducer != null) { kafkaProducer.close(); } } @Override public void processElement(Long timestamp, Context ctx, Collector<Tuple2<Long, Integer>> out) throws Exception { //timer coalescing avoids firing more than once per timestamp ctx.timerService().registerEventTimeTimer(timestamp); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Long, Integer>> out) throws Exception { //Flink guarantees operator will only fire after normal sink has finished producing messages //send one event per partition downstream kafkaProducer.partitionsFor("OUTPUT_TOPIC").forEach(partitionInfo -> out.collect(new Tuple2<>(timestamp, partitionInfo.partition()))); } }).addSink( new FlinkKafkaProducer<Tuple2<Long, Integer>>(KAFKA_SINK_TOPIC, serializationSchema, producerProperties, Optional.of(new FlinkKafkaPartitioner<Tuple2<Long, Integer>>() { @Override public int partition(Tuple2<Long, Integer> record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return record._2(); //send to the partition it was designed to be sent } }), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducerPoolSize)); On Mon, Sep 2, 2019 at 5:07 PM Padarn Wilson <pad...@gmail.com> wrote: > > Hi Fabian, > > > but each partition may only be written by a single task > > Sorry I think I misunderstand something here then: If I have a topic with one > partition, but multiple sink tasks (or parallelism > 1).. this means the data > must all be shuffled to the single task writing that partition? > > Padarn > > On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske <fhue...@gmail.com> wrote: >> >> Hi Padarn, >> >> Regarding your throughput concerns: A sink task may write to multiple >> partitions, but each partition may only be written by a single task. >> >> @Eduardo: Thanks for sharing your approach! Not sure if I understood it >> correctly, but I think that the approach does not guarantee that all results >> of a window are emitted before the end-of-window marker is written. >> Since the sink operator and the single-task-operator are separate operators, >> the output records might get stuck (or be bufffered) in one of the sink >> tasks and the single-task would still emit an end-of-window marker record >> because it doesn't know about the sink task. >> >> Best, >> Fabian >> >> Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor >> <eduardo.winpe...@gmail.com>: >>> >>> Hi, >>> >>> I'll chip in with an approach I'm trying at the moment that seems to work, >>> and I say seems because I'm only running this on a personal project. >>> >>> Personally, I don't have anything against end-of-message markers per >>> partition, Padarn you seem to not prefer this option as it overloads the >>> meaning of the output payload. My approach is equally valid when producing >>> watermarks/end-of-message markers on a side output though. >>> >>> The main problem of both approaches is knowing when the window has finished >>> across all partitions without having to wait for the start of the next >>> window. >>> >>> I've taken the approach of sending all output messages of the window to 1. >>> the sink but also 2. a single task operator. The single task operator >>> registers an event time based timer at the time of the end of the window. >>> You have the confidence of the task's timer triggering only once at the >>> right time because all the post-window watermarks go through to the same >>> task. At that point I make the task send an end-of-message marker to every >>> partition. I don't need to send the count because Kafka messages are >>> ordered. AND IF you prefer to not overload the semantic of your original >>> Kafka topic you can post the message to a separate location of your choice. >>> >>> While this does mean that the end of marker message only gets sent through >>> once the window has finished across all substreams (as opposed to per >>> stream), it does mean you don't need to wait for the next window to start >>> AND the watermark gap between substreams should never grow that much anyway. >>> >>> This approach should be particularly useful when the number of partitions >>> or keying mechanism is different between the input and output topics. >>> >>> Hopefully that doesn't sound like a terrible idea. >>> >>> eduardo >>> >>> >>> >>> >>> On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <pad...@gmail.com> wrote: >>>> >>>> Hi again Fabian, >>>> >>>> Thanks for pointing this out to me. In my case there is no need for keyed >>>> writing - but I do wonder if having each kafka task write only to a single >>>> partition would significantly affect performance. >>>> >>>> Actually now that I think about it, the approach to just wait for the >>>> first records of the next window is also subject to the problem you >>>> mention above: a producer lagging behind the rest could end up with a >>>> partition containing element out of ‘window order’. >>>> >>>> I was also thinking this problem is very similar to that of checkpoint >>>> barriers. I intended to dig into the details of the exactly once Kafka >>>> sink for some inspiration. >>>> >>>> Padarn >>>> >>>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>>>> >>>>> Hi Padarn, >>>>> >>>>> Yes, this is quite tricky. >>>>> The "problem" with watermarks is that you need to consider how you write >>>>> to Kafka. >>>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is >>>>> written by multiple producers), you need to broadcast the watermarks to >>>>> each partition, i.e., each partition would receive watermarks from each >>>>> parallel sink task. So in order to reason about the current watermark of >>>>> a partition, you need to observe them and take the minimum WM across all >>>>> current sink task WMs. >>>>> Things become much easier, if each partition is only written by a single >>>>> task but this also means that data is not key-partitioned in Kafka. >>>>> In that case, the sink task only needs to write a WM message to each of >>>>> its assigned partitions. >>>>> >>>>> Hope this helps, >>>>> Fabian >>>>> >>>>> >>>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson >>>>> <pad...@gmail.com>: >>>>>> >>>>>> Hi Fabian, thanks for your input >>>>>> >>>>>> Exactly. Actually my first instinct was to see if it was possible to >>>>>> publish the watermarks somehow - my initial idea was to insert regular >>>>>> watermark messages into each partition of the stream, but exposing this >>>>>> seemed quite troublesome. >>>>>> >>>>>> > In that case, you could have a ProcessFunction that is chained before >>>>>> > the sink and which counts the window results per time slice and emits >>>>>> > the result when the watermark passes to a side output. >>>>>> All side output messages are collected by a single task and can be >>>>>> published to a Kafka topic or even be made available via Queryable State. >>>>>> >>>>>> I understand the idea here (and exactly once semantics are probably fine >>>>>> for my use case), but counting events seems a bit fragile. I'm not >>>>>> totally confident the consumer can guarantee it won't read duplicates >>>>>> (its a golang kafka library that seems to have some quirks). >>>>>> >>>>>> I think ideally each partition of the kafka topic would have some >>>>>> regular information about watermarks. Perhaps the kafka producer can be >>>>>> modified to support this. >>>>>> >>>>>> Padarn >>>>>> >>>>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Padarn, >>>>>>> >>>>>>> What you describe is essentially publishing Flink's watermarks to an >>>>>>> outside system. >>>>>>> Flink processes time windows, by waiting for a watermark that's past >>>>>>> the window end time. When it receives such a WM it processes and emits >>>>>>> all ended windows and forwards the watermark. >>>>>>> When a sink received a WM for say 12:45:15, you know that all window >>>>>>> results with until 12:45:00 have been emitted. >>>>>>> Hence, the watermark tells you about the completeness of data. >>>>>>> >>>>>>> However, using this information is not so easy, mostly because of the >>>>>>> failure semantics. >>>>>>> Things become much easier if you produce to Kafka with exactly-once >>>>>>> semantics. >>>>>>> >>>>>>> In that case, you could have a ProcessFunction that is chained before >>>>>>> the sink and which counts the window results per time slice and emits >>>>>>> the result when the watermark passes to a side output. >>>>>>> All side output messages are collected by a single task and can be >>>>>>> published to a Kafka topic or even be made available via Queryable >>>>>>> State. >>>>>>> >>>>>>> For at-least once output, it's much harder because you'll have >>>>>>> duplicates in the output after a job recovered. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> I think you have two options to let the consuming app know about the >>>>>>> progress. >>>>>>> You can either >>>>>>> >>>>>>> The ProcessFunction could count per window end timestamp how many >>>>>>> records passed and forward that information via a side output. >>>>>>> You could then >>>>>>> >>>>>>> >>>>>>> Essentially, you'd like to publish Flink's watermarks to an outside >>>>>>> system (possibly via Kafka). >>>>>>> >>>>>>> >>>>>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson >>>>>>> <pad...@gmail.com>: >>>>>>>> >>>>>>>> Hello Users, >>>>>>>> >>>>>>>> I have a question that is perhaps not best solved within Flink: It has >>>>>>>> to do with notifying a downstream application that a Flink window has >>>>>>>> completed. >>>>>>>> >>>>>>>> The (simplified) scenario is this: >>>>>>>> - We have a Flink job that consumes from Kafka, does some >>>>>>>> preprocessing, and then has a sliding window of 10 minutes and slide >>>>>>>> time of 1 minute. >>>>>>>> - The number of keys in each slide is not fixed >>>>>>>> - The output of the window is then output to Kafka, which is read by a >>>>>>>> downstream application. >>>>>>>> >>>>>>>> What I want to achieve is that the downstream application can someone >>>>>>>> know when it has read all of the data for a single window, without >>>>>>>> waiting for the next window to arrive. >>>>>>>> >>>>>>>> Some options I've considered: >>>>>>>> - Producing a second window over the window results that counts the >>>>>>>> output size, which can then be used by the downstream application to >>>>>>>> see when it has received the same number: This seems fragile, as there >>>>>>>> it relies on there being no loss or duplication of data. Its also an >>>>>>>> extra window and Kafka stream which is a tad messy. >>>>>>>> - Somehow adding an 'end of window' element to each partitions of the >>>>>>>> Kafka topic which can be read by the consumer: This seems a bit messy >>>>>>>> because it mixes different types of events into the same Kafka stream, >>>>>>>> and there is no really simple way to do this in Flink >>>>>>>> - Package the whole window output into a single message and make this >>>>>>>> the unit of transaction: This is possible, but the message would be >>>>>>>> quite large then (at least 10s of mb), as the volume of this stream is >>>>>>>> quite large. >>>>>>>> - Assume that if the consumer has no elements to read, or if the next >>>>>>>> window has started to be read, then it has read the whole window: This >>>>>>>> seems reasonable, and if it wasn't for the fact that my consumer on >>>>>>>> the application end was a bit inflexible right now, it is probably the >>>>>>>> solution I would use. >>>>>>>> >>>>>>>> Any further/better ideas? >>>>>>>> >>>>>>>> Thanks >>>>>>>> Padarn