If you need a specific output order, then merge the three streams, key by a 
constant (like 1), and run that into a KeyedProcessFunction.

That function can buffer out-of-order records, and set up a timer to fire when 
it gets a MAX_WATERMARK (which indicates that all streams are finished) so that 
it can flush any pending data from state, in the proper order.

You’d have a ListState<PojoClass> for the reduced data, and a 
ValueState<PojoClass> for the footer.

Once you get the header, you can flush all reduced data, and no longer buffer 
it.

When timer fires, you can flush the footer.

— Ken



> On Aug 10, 2023, at 10:14 PM, Muazim Wani <muazim1...@gmail.com> wrote:
> 
> Thank you so much for taking the time to provide me with such a detailed 
> response. Your assistance has been incredibly helpful in clarifying my 
> understanding! 
> Let me provide you with the exact scenario ,  I think there might be some 
> misunderstanding. All the streams are bounded and parallelism is set to 1. I 
> am  writing to 1 file only. So the main use case Is to provide support for 
> dynamic Headers and footers with Aggregated values. 
> e.g if my input is
> 
> Static Header
> JohnDoe, 12
> Alice, 21
> 
> My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic footer". 
> These headers are on top of static headers which are already present in the 
> DataStream(bounded). The output would be like
> 
> Dynamic Header 33
> Static Header
> JohnDoe, 12
> Alice, 21
> Dynamic footer
> 
> In this particular case I am writing to one file only. I have set my 
> parallelism to 1. I have 1 InputDataStream on top of that I have one dynamic 
> header and footer stream (which contains some dynamic params such as 
> aggregated value on some fields e.g salary etc) .Now I am left with three 
> transformed streams in Sink Operator. i.e dynamic HeaderStream with 
> aggregated Value 2) input DataStream 3) dynamic Footer stream with aggregated 
> Value. 
> 
> I could have used String for both Dynamic Headers and footers and emitted the 
> headers in open() method and footers in close() method of TextOutputFormat, 
> That would have solved my useCase. But as I get a DataStream(with only 1 
> value i.e final sum) back from Aggregated Values (Currently I am using reduce 
> function). I am adding headers to that DataStream only and similarly for 
> footers. Now I am not able to merge them while maintaining the order. 
> 
> Below I have provided my implementation of reduce function
> public DataStream<T> sum(
>     SumFunction<T> reduceFunction, DataStream<PojoClass> stream) {
> 
>   DataStream<T> inputRecordTransformedDataStream =
>       stream.map(this::transform).returns((TypeInformation<T>) 
> Types.GENERIC(Number.class));
> 
>   return inputRecordTransformedDataStream
>       .keyBy(value -> "key")
>       .reduce(reduceFunction);
> }
> 
> 
> Below I am adding my Headers to my Sum Stream
> 
> public static DataStream<PojoClass> getAggregatedStream(
>     String header, DataStream<String> sinkStream) {
> 
>   return sinkStream
>       .keyBy(key -> "value")
>       .flatMap(
>           (FlatMapFunction<String, PojoClass>)
>               (value, out) -> out.collect(PojoClass.builder().data(header + 
> value).build()))
>       .returns(PojoClass.class);
> }
> 
> Add HeaderStream is a Bounded DataStream with Dynamic Headers and Aggregated 
> value.
> DataStream<PojoClass> headerStream = addHeaderRows(sinkStream);
> 
> DataStream<PojoClass> footerStream = addFooterRows(finalStream);
> 
> DataStream<PojoClass> sinkStream;
> 
> Thanks a lot for your time and the advice.
> Muazim Wani
> 
> 
> On Fri, 11 Aug 2023 at 07:45, Hang Ruan <ruanhang1...@gmail.com 
> <mailto:ruanhang1...@gmail.com>> wrote:
>> ps: Forget the link: Hybrid Source[1]
>> 
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/
>> 
>> Hang Ruan <ruanhang1...@gmail.com <mailto:ruanhang1...@gmail.com>> 
>> 于2023年8月11日周五 10:14写道:
>>> Hi, Muazim.
>>> 
>>> I think the Hybird Source[1] may be helpful for your case.
>>> 
>>> Best,
>>> Hang
>>> 
>>> Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> 于2023年8月11日周五 04:18写道:
>>>> As (almost) always, the devil is in the details.
>>>> 
>>>> You haven’t said, but I’m assuming you’re writing out multiple files, each 
>>>> with a different schema, as otherwise you could just leverage the existing 
>>>> Flink support for CSV.
>>>> 
>>>> So then you could combine the header/footer streams (adding a flag for 
>>>> header vs. footer), and connect that to the row data stream, then use a 
>>>> KeyedCoProcessFunction (I’m assuming you can key by something that 
>>>> identifies which schema). You’d buffer the row data & footer (separately 
>>>> in state). You would also need to set up a timer to fire at the max 
>>>> watermark, to flush out pending rows/footer when all of the input data has 
>>>> been processed.
>>>> 
>>>> After that function you could configure the sink to bucket by the target 
>>>> schema.
>>>> 
>>>> — Ken
>>>> 
>>>> 
>>>>> On Aug 10, 2023, at 10:41 AM, Muazim Wani <muazim1...@gmail.com 
>>>>> <mailto:muazim1...@gmail.com>> wrote:
>>>>> 
>>>>> Thanks for the response!
>>>>> I have a specific use case where I am writing to a TextFile sink. I have 
>>>>> a Bounded stream of header data and need  to merge it with another 
>>>>> bounded stream. While writing the data to a text file the header data 
>>>>> should be written before the original data(from another bounded stream). 
>>>>> And also at last I have another stream of footers where I would repeat 
>>>>> the same process.
>>>>> I tried keeping an identifier for all three streams and based on these 
>>>>> identifiers I added the data in three different ListState using 
>>>>> KeyedProcess functions. So for headers I directly emitted the value but 
>>>>> for main data and footers if I store it in a state . The issue is Outside 
>>>>> KeyedProcess I am not able to emit the data in order.
>>>>> Is there any way I can achieve the use case of orderding the dataStreams 
>>>>> . I also tried with union but it seems it adds data arbitrarily in both 
>>>>> streams .
>>>>> Thanks and regards
>>>>> 
>>>>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, <kkrugler_li...@transpac.com 
>>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>>>> Hi Muazim,
>>>>>> 
>>>>>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>>>>> 
>>>>>> So in your example below, this means stream 2 would NEVER be emitted, 
>>>>>> because stream 1 would never end (there is no time at which you know for 
>>>>>> sure that stream 1 is done).
>>>>>> 
>>>>>> And this in turn means stream 2 would be buffered forever in state, thus 
>>>>>> growing unbounded.
>>>>>> 
>>>>>> I would suggest elaborating on your use case.
>>>>>> 
>>>>>> Regards,
>>>>>> 
>>>>>> — Ken
>>>>>> 
>>>>>> 
>>>>>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani <muazim1...@gmail.com 
>>>>>>> <mailto:muazim1...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> Hi Team,
>>>>>>> I have a use case where I have two streams and want to join them in 
>>>>>>> stateful manner . 
>>>>>>> E.g data of stream 1 should be emitted before stream2.
>>>>>>> I tried to store the data in ListState in KeyedProcessFunction but I am 
>>>>>>> not able to access state  outside proccessElement().
>>>>>>> Is there any way I could achieve this?
>>>>>>> Thanks and regards
>>>>>>> 
>>>>>> 
>>>>>> --------------------------
>>>>>> Ken Krugler
>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>>> Custom big data solutions
>>>>>> Flink, Pinot, Solr, Elasticsearch
>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> --------------------------
>>>> Ken Krugler
>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>> Custom big data solutions
>>>> Flink, Pinot, Solr, Elasticsearch
>>>> 
>>>> 
>>>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to