Hi all,

I'll illustrate my approach with an example as it is definitely
unorthodox. Here's some sample code. It works for me...I hope there
are no (obvious) flaws!

//myStream should be a stream of objects associated to a timestamp.
the idea is to create a Flink app that
//sends each object to kafka with the ability of also sending an extra
end-of-stream message after all events
//for the same associated timestamp have been sent
final SingleOutputStreamOperator<Tuple2<Long, Object>> myStream =
null; //replace with actual stream
final String KAFKA_SINK_TOPIC = "OUTPUT_TOPIC";
final SinkFunction<Tuple2<Long, Object>> singleObjectKafkaSink = null;
//replace with...
//needs to take care of mapping a timestamp into an end-of-stream marker record
final KeyedSerializationSchema serializationSchema = null; //replace with...
final Properties producerProperties = null; //replace with...
final int kafkaProducerPoolSize = 6; //for example...

//sinks every event of a window
myStream.addSink(singleObjectKafkaSink);

//sends one end-of-stream message per kafka partition
myStream
        .map(item -> item._1()) //keep only the time
        .keyBy(item -> 1) //forces every event to go to the same task
        .process(new KeyedProcessFunction<Integer, Long, Tuple2<Long,
Integer>>() {

            private KafkaProducer<String, Object> kafkaProducer;

            @Override
            public void open(Configuration parameters) throws Exception {
                kafkaProducer = new KafkaProducer<>(producerProperties);
            }

            @Override
            public void close() throws Exception {
                if (kafkaProducer != null) {
                    kafkaProducer.close();
                }
            }

            @Override
            public void processElement(Long timestamp, Context ctx,
Collector<Tuple2<Long, Integer>> out) throws Exception {
                //timer coalescing avoids firing more than once per timestamp
                ctx.timerService().registerEventTimeTimer(timestamp);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Tuple2<Long, Integer>> out) throws Exception {
                //Flink guarantees operator will only fire after
normal sink has finished producing messages
                //send one event per partition downstream

kafkaProducer.partitionsFor("OUTPUT_TOPIC").forEach(partitionInfo ->
out.collect(new Tuple2<>(timestamp, partitionInfo.partition())));
            }
        }).addSink(
        new FlinkKafkaProducer<Tuple2<Long,
Integer>>(KAFKA_SINK_TOPIC, serializationSchema, producerProperties,
Optional.of(new FlinkKafkaPartitioner<Tuple2<Long, Integer>>() {
            @Override
            public int partition(Tuple2<Long, Integer> record, byte[]
key, byte[] value, String targetTopic, int[] partitions) {
                return record._2(); //send to the partition it was
designed to be sent
            }
        }), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducerPoolSize));


On Mon, Sep 2, 2019 at 5:07 PM Padarn Wilson <pad...@gmail.com> wrote:
>
> Hi Fabian,
>
> > but each partition may only be written by a single task
>
> Sorry I think I misunderstand something here then: If I have a topic with one 
> partition, but multiple sink tasks (or parallelism > 1).. this means the data 
> must all be shuffled to the single task writing that partition?
>
> Padarn
>
> On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>> Hi Padarn,
>>
>> Regarding your throughput concerns: A sink task may write to multiple 
>> partitions, but each partition may only be written by a single task.
>>
>> @Eduardo: Thanks for sharing your approach! Not sure if I understood it 
>> correctly, but I think that the approach does not guarantee that all results 
>> of a window are emitted before the end-of-window marker is written.
>> Since the sink operator and the single-task-operator are separate operators, 
>> the output records might get stuck (or be bufffered) in one of the sink 
>> tasks and the single-task would still emit an end-of-window marker record 
>> because it doesn't know about the sink task.
>>
>> Best,
>> Fabian
>>
>> Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor 
>> <eduardo.winpe...@gmail.com>:
>>>
>>> Hi,
>>>
>>> I'll chip in with an approach I'm trying at the moment that seems to work, 
>>> and I say seems because I'm only running this on a personal project.
>>>
>>> Personally, I don't have anything against end-of-message markers per 
>>> partition, Padarn you seem to not prefer this option as it overloads the 
>>> meaning of the output payload. My approach is equally valid when producing 
>>> watermarks/end-of-message markers on a side output though.
>>>
>>> The main problem of both approaches is knowing when the window has finished 
>>> across all partitions without having to wait for the start of the next 
>>> window.
>>>
>>> I've taken the approach of sending all output messages of the window to 1. 
>>> the sink but also 2. a single task operator. The single task operator 
>>> registers an event time based timer at the time of the end of the window. 
>>> You have the confidence of the task's timer triggering only once at the 
>>> right time because all the post-window watermarks go through to the same 
>>> task. At that point I make the task send an end-of-message marker to every 
>>> partition. I don't need to send the count because Kafka messages are 
>>> ordered. AND IF you prefer to not overload the semantic of your original 
>>> Kafka topic you can post the message to a separate location of your choice.
>>>
>>> While this does mean that the end of marker message only gets sent through 
>>> once the window has finished across all substreams (as opposed to per 
>>> stream), it does mean you don't need to wait for the next window to start 
>>> AND the watermark gap between substreams should never grow that much anyway.
>>>
>>> This approach should be particularly useful when the number of partitions 
>>> or keying mechanism is different between the input and output topics.
>>>
>>> Hopefully that doesn't sound like a terrible idea.
>>>
>>> eduardo
>>>
>>>
>>>
>>>
>>> On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <pad...@gmail.com> wrote:
>>>>
>>>> Hi again Fabian,
>>>>
>>>> Thanks for pointing this out to me. In my case there is no need for keyed 
>>>> writing - but I do wonder if having each kafka task write only to a single 
>>>> partition would significantly affect performance.
>>>>
>>>> Actually now that I think about it, the approach to just wait for the 
>>>> first records of the next window is also subject to the problem you 
>>>> mention above: a producer lagging behind the rest could end up with a 
>>>> partition containing element out of ‘window order’.
>>>>
>>>> I was also thinking this problem is very similar to that of checkpoint 
>>>> barriers. I intended to dig into the details of the exactly once Kafka 
>>>> sink for some inspiration.
>>>>
>>>> Padarn
>>>>
>>>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>
>>>>> Hi Padarn,
>>>>>
>>>>> Yes, this is quite tricky.
>>>>> The "problem" with watermarks is that you need to consider how you write 
>>>>> to Kafka.
>>>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is 
>>>>> written by multiple producers), you need to broadcast the watermarks to 
>>>>> each partition, i.e., each partition would receive watermarks from each 
>>>>> parallel sink task. So in order to reason about the current watermark of 
>>>>> a partition, you need to observe them and take the minimum WM across all 
>>>>> current sink task WMs.
>>>>> Things become much easier, if each partition is only written by a single 
>>>>> task but this also means that data is not key-partitioned in Kafka.
>>>>> In that case, the sink task only needs to write a WM message to each of 
>>>>> its assigned partitions.
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>>
>>>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson 
>>>>> <pad...@gmail.com>:
>>>>>>
>>>>>> Hi Fabian, thanks for your input
>>>>>>
>>>>>> Exactly. Actually my first instinct was to see if it was possible to 
>>>>>> publish the watermarks somehow - my initial idea was to insert regular 
>>>>>> watermark messages into each partition of the stream, but exposing this 
>>>>>> seemed quite troublesome.
>>>>>>
>>>>>> > In that case, you could have a ProcessFunction that is chained before 
>>>>>> > the sink and which counts the window results per time slice and emits 
>>>>>> > the result when the watermark passes to a side output.
>>>>>> All side output messages are collected by a single task and can be 
>>>>>> published to a Kafka topic or even be made available via Queryable State.
>>>>>>
>>>>>> I understand the idea here (and exactly once semantics are probably fine 
>>>>>> for my use case), but counting events seems a bit fragile. I'm not 
>>>>>> totally confident the consumer can guarantee it won't read duplicates 
>>>>>> (its a golang kafka library that seems to have some quirks).
>>>>>>
>>>>>> I think ideally each partition of the kafka topic would have some 
>>>>>> regular information about watermarks. Perhaps the kafka producer can be 
>>>>>> modified to support this.
>>>>>>
>>>>>> Padarn
>>>>>>
>>>>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Padarn,
>>>>>>>
>>>>>>> What you describe is essentially publishing Flink's watermarks to an 
>>>>>>> outside system.
>>>>>>> Flink processes time windows, by waiting for a watermark that's past 
>>>>>>> the window end time. When it receives such a WM it processes and emits 
>>>>>>> all ended windows and forwards the watermark.
>>>>>>> When a sink received a WM for say 12:45:15, you know that all window 
>>>>>>> results with until 12:45:00 have been emitted.
>>>>>>> Hence, the watermark tells you about the completeness of data.
>>>>>>>
>>>>>>> However, using this information is not so easy, mostly because of the 
>>>>>>> failure semantics.
>>>>>>> Things become much easier if you produce to Kafka with exactly-once 
>>>>>>> semantics.
>>>>>>>
>>>>>>> In that case, you could have a ProcessFunction that is chained before 
>>>>>>> the sink and which counts the window results per time slice and emits 
>>>>>>> the result when the watermark passes to a side output.
>>>>>>> All side output messages are collected by a single task and can be 
>>>>>>> published to a Kafka topic or even be made available via Queryable 
>>>>>>> State.
>>>>>>>
>>>>>>> For at-least once output, it's much harder because you'll have 
>>>>>>> duplicates in the output after a job recovered.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> I think you have two options to let the consuming app know about the 
>>>>>>> progress.
>>>>>>> You can either
>>>>>>>
>>>>>>> The ProcessFunction could count per window end timestamp how many 
>>>>>>> records passed and forward that information via a side output.
>>>>>>> You could then
>>>>>>>
>>>>>>>
>>>>>>> Essentially, you'd like to publish Flink's watermarks to an outside 
>>>>>>> system (possibly via Kafka).
>>>>>>>
>>>>>>>
>>>>>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson 
>>>>>>> <pad...@gmail.com>:
>>>>>>>>
>>>>>>>> Hello Users,
>>>>>>>>
>>>>>>>> I have a question that is perhaps not best solved within Flink: It has 
>>>>>>>> to do with notifying a downstream application that a Flink window has 
>>>>>>>> completed.
>>>>>>>>
>>>>>>>> The (simplified) scenario is this:
>>>>>>>> - We have a Flink job that consumes from Kafka, does some 
>>>>>>>> preprocessing, and then has a sliding window of 10 minutes and slide 
>>>>>>>> time of 1 minute.
>>>>>>>> - The number of keys in each slide is not fixed
>>>>>>>> - The output of the window is then output to Kafka, which is read by a 
>>>>>>>> downstream application.
>>>>>>>>
>>>>>>>> What I want to achieve is that the downstream application can someone 
>>>>>>>> know when it has read all of the data for a single window, without 
>>>>>>>> waiting for the next window to arrive.
>>>>>>>>
>>>>>>>> Some options I've considered:
>>>>>>>> - Producing a second window over the window results that counts the 
>>>>>>>> output size, which can then be used by the downstream application to 
>>>>>>>> see when it has received the same number: This seems fragile, as there 
>>>>>>>> it relies on there being no loss or duplication of data. Its also an 
>>>>>>>> extra window and Kafka stream which is a tad messy.
>>>>>>>> - Somehow adding an 'end of window' element to each partitions of the 
>>>>>>>> Kafka topic which can be read by the consumer: This seems a bit messy 
>>>>>>>> because it mixes different types of events into the same Kafka stream, 
>>>>>>>> and there is no really simple way to do this in Flink
>>>>>>>> - Package the whole window output into a single message and make this 
>>>>>>>> the unit of transaction: This is possible, but the message would be 
>>>>>>>> quite large then (at least 10s of mb), as the volume of this stream is 
>>>>>>>> quite large.
>>>>>>>> - Assume that if the consumer has no elements to read, or if the next 
>>>>>>>> window has started to be read, then it has read the whole window: This 
>>>>>>>> seems reasonable, and if it wasn't for the fact that my consumer on 
>>>>>>>> the application end was a bit inflexible right now, it is probably the 
>>>>>>>> solution I would use.
>>>>>>>>
>>>>>>>> Any further/better ideas?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Padarn

Reply via email to