Thank you for the detailed explanation.

On Tue, 29 Aug 2023 at 22:45, Ken Krugler <>

> If you need a specific output order, then merge the three streams, key by
> a constant (like 1), and run that into a KeyedProcessFunction.
> That function can buffer out-of-order records, and set up a timer to fire
> when it gets a MAX_WATERMARK (which indicates that all streams are
> finished) so that it can flush any pending data from state, in the proper
> order.
> You’d have a ListState<PojoClass> for the reduced data, and a
> ValueState<PojoClass> for the footer.
> Once you get the header, you can flush all reduced data, and no longer
> buffer it.
> When timer fires, you can flush the footer.
> — Ken
> On Aug 10, 2023, at 10:14 PM, Muazim Wani <> wrote:
> Thank you so much for taking the time to provide me with such a detailed
> response. Your assistance has been incredibly helpful in clarifying my
> understanding!
> Let me provide you with the exact scenario ,  I think there might be some
> misunderstanding. All the streams are bounded and parallelism is set to 1.
> I am  writing to 1 file only. *So the main use case Is to provide support
> for dynamic Headers and footers with Aggregated values. *
> e.g if my input is
> Static Header
> JohnDoe, 12
> Alice, 21
> My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic
> footer". *These headers are on top of static headers which are already
> present in the DataStream(bounded).* The output would be like
> Dynamic Header 33
> Static Header
> JohnDoe, 12
> Alice, 21
> Dynamic footer
> In this particular case I am writing to one file only. I have set my
> parallelism to 1. I have 1 InputDataStream on top of that I have one
> dynamic header and footer stream (*which contains some dynamic params
> such as aggregated value on some fields e.g salary etc*) .*Now I am left
> with three transformed streams in Sink Operator. i.e dynamic HeaderStream
> with aggregated Value 2) input DataStream 3) dynamic Footer stream with
> aggregated Value. *
> I could have used String for both Dynamic Headers and footers and emitted
> the headers in open() method and footers in close() method of
> TextOutputFormat, That would have solved my useCase.* But as I get a
> DataStream(with only 1 value i.e final sum) back from Aggregated Values
> (Currently I am using reduce function).* I am adding headers to that
> DataStream only and similarly for footers. Now I am not able to merge them
> while maintaining the order.
> Below I have provided my implementation of reduce function
> public DataStream<T> sum(
> SumFunction<T> reduceFunction, DataStream<PojoClass> stream) {
> DataStream<T> inputRecordTransformedDataStream =
><T>) Types.GENERIC
> (Number.class));
> return inputRecordTransformedDataStream
> .keyBy(value -> "key")
> .reduce(reduceFunction);
> }
> Below I am adding my Headers to my Sum Stream
> public static DataStream<PojoClass> getAggregatedStream(
> String header, DataStream<String> sinkStream) {
> return sinkStream
> .keyBy(key -> "value")
> .flatMap(
> (FlatMapFunction<String, PojoClass>)
> (value, out) -> out.collect(PojoClass.builder().data(header +
> value).build()))
> .returns(PojoClass.class);
> }
> Add HeaderStream is a Bounded DataStream with Dynamic Headers and
> Aggregated value.
> DataStream<PojoClass> headerStream = addHeaderRows(sinkStream);
> DataStream<PojoClass> footerStream = addFooterRows(finalStream);
> DataStream<PojoClass> sinkStream;
> Thanks a lot for your time and the advice.
> Muazim Wani
> On Fri, 11 Aug 2023 at 07:45, Hang Ruan <> wrote:
>> ps: Forget the link: Hybrid Source[1]
>> [1]
>> Hang Ruan <> 于2023年8月11日周五 10:14写道:
>>> Hi, Muazim.
>>> I think the Hybird Source[1] may be helpful for your case.
>>> Best,
>>> Hang
>>> Ken Krugler <> 于2023年8月11日周五 04:18写道:
>>>> As (almost) always, the devil is in the details.
>>>> You haven’t said, but I’m assuming you’re writing out multiple files,
>>>> each with a different schema, as otherwise you could just leverage the
>>>> existing Flink support for CSV.
>>>> So then you could combine the header/footer streams (adding a flag for
>>>> header vs. footer), and connect that to the row data stream, then use a
>>>> KeyedCoProcessFunction (I’m assuming you can key by something that
>>>> identifies which schema). You’d buffer the row data & footer (separately in
>>>> state). You would also need to set up a timer to fire at the max watermark,
>>>> to flush out pending rows/footer when all of the input data has been
>>>> processed.
>>>> After that function you could configure the sink to bucket by the
>>>> target schema.
>>>> — Ken
>>>> On Aug 10, 2023, at 10:41 AM, Muazim Wani <> wrote:
>>>> Thanks for the response!
>>>> I have a specific use case where I am writing to a TextFile sink. I
>>>> have a Bounded stream of header data and need  to merge it with another
>>>> bounded stream. While writing the data to a text file the header data
>>>> should be written before the original data(from another bounded stream).
>>>> And also at last I have another stream of footers where I would repeat the
>>>> same process.
>>>> I tried keeping an identifier for all three streams and based on these
>>>> identifiers I added the data in three different ListState
>>>> using KeyedProcess functions. So for headers I directly emitted the value
>>>> but for main data and footers if I store it in a state . The issue is
>>>> Outside KeyedProcess I am not able to emit the data in order.
>>>> Is there any way I can achieve the use case of orderding the
>>>> dataStreams . I also tried with union but it seems it adds data
>>>> arbitrarily in both streams .
>>>> Thanks and regards
>>>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, <
>>>>> wrote:
>>>>> Hi Muazim,
>>>>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>>>> So in your example below, this means stream 2 would NEVER be emitted,
>>>>> because stream 1 would never end (there is no time at which you know for
>>>>> sure that stream 1 is done).
>>>>> And this in turn means stream 2 would be buffered forever in state,
>>>>> thus growing unbounded.
>>>>> I would suggest elaborating on your use case.
>>>>> Regards,
>>>>> — Ken
>>>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani <>
>>>>> wrote:
>>>>> Hi Team,
>>>>> I have a use case where I have two streams and want to join them in
>>>>> stateful manner .
>>>>> E.g data of stream 1 should be emitted before stream2.
>>>>> I tried to store the data in ListState in KeyedProcessFunction but I
>>>>> am not able to access state  outside proccessElement().
>>>>> Is there any way I could achieve this?
>>>>> Thanks and regards
>>>>> --------------------------
>>>>> Ken Krugler
>>>>> Custom big data solutions
>>>>> Flink, Pinot, Solr, Elasticsearch
>>>> --------------------------
>>>> Ken Krugler
>>>> Custom big data solutions
>>>> Flink, Pinot, Solr, Elasticsearch
> --------------------------
> Ken Krugler
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch

Reply via email to