If you need a specific output order, then merge the three streams, key by a constant (like 1), and run that into a KeyedProcessFunction.
That function can buffer out-of-order records, and set up a timer to fire when it gets a MAX_WATERMARK (which indicates that all streams are finished) so that it can flush any pending data from state, in the proper order. You’d have a ListState<PojoClass> for the reduced data, and a ValueState<PojoClass> for the footer. Once you get the header, you can flush all reduced data, and no longer buffer it. When timer fires, you can flush the footer. — Ken > On Aug 10, 2023, at 10:14 PM, Muazim Wani <muazim1...@gmail.com> wrote: > > Thank you so much for taking the time to provide me with such a detailed > response. Your assistance has been incredibly helpful in clarifying my > understanding! > Let me provide you with the exact scenario , I think there might be some > misunderstanding. All the streams are bounded and parallelism is set to 1. I > am writing to 1 file only. So the main use case Is to provide support for > dynamic Headers and footers with Aggregated values. > e.g if my input is > > Static Header > JohnDoe, 12 > Alice, 21 > > My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic footer". > These headers are on top of static headers which are already present in the > DataStream(bounded). The output would be like > > Dynamic Header 33 > Static Header > JohnDoe, 12 > Alice, 21 > Dynamic footer > > In this particular case I am writing to one file only. I have set my > parallelism to 1. I have 1 InputDataStream on top of that I have one dynamic > header and footer stream (which contains some dynamic params such as > aggregated value on some fields e.g salary etc) .Now I am left with three > transformed streams in Sink Operator. i.e dynamic HeaderStream with > aggregated Value 2) input DataStream 3) dynamic Footer stream with aggregated > Value. > > I could have used String for both Dynamic Headers and footers and emitted the > headers in open() method and footers in close() method of TextOutputFormat, > That would have solved my useCase. But as I get a DataStream(with only 1 > value i.e final sum) back from Aggregated Values (Currently I am using reduce > function). I am adding headers to that DataStream only and similarly for > footers. Now I am not able to merge them while maintaining the order. > > Below I have provided my implementation of reduce function > public DataStream<T> sum( > SumFunction<T> reduceFunction, DataStream<PojoClass> stream) { > > DataStream<T> inputRecordTransformedDataStream = > stream.map(this::transform).returns((TypeInformation<T>) > Types.GENERIC(Number.class)); > > return inputRecordTransformedDataStream > .keyBy(value -> "key") > .reduce(reduceFunction); > } > > > Below I am adding my Headers to my Sum Stream > > public static DataStream<PojoClass> getAggregatedStream( > String header, DataStream<String> sinkStream) { > > return sinkStream > .keyBy(key -> "value") > .flatMap( > (FlatMapFunction<String, PojoClass>) > (value, out) -> out.collect(PojoClass.builder().data(header + > value).build())) > .returns(PojoClass.class); > } > > Add HeaderStream is a Bounded DataStream with Dynamic Headers and Aggregated > value. > DataStream<PojoClass> headerStream = addHeaderRows(sinkStream); > > DataStream<PojoClass> footerStream = addFooterRows(finalStream); > > DataStream<PojoClass> sinkStream; > > Thanks a lot for your time and the advice. > Muazim Wani > > > On Fri, 11 Aug 2023 at 07:45, Hang Ruan <ruanhang1...@gmail.com > <mailto:ruanhang1...@gmail.com>> wrote: >> ps: Forget the link: Hybrid Source[1] >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/ >> >> Hang Ruan <ruanhang1...@gmail.com <mailto:ruanhang1...@gmail.com>> >> 于2023年8月11日周五 10:14写道: >>> Hi, Muazim. >>> >>> I think the Hybird Source[1] may be helpful for your case. >>> >>> Best, >>> Hang >>> >>> Ken Krugler <kkrugler_li...@transpac.com >>> <mailto:kkrugler_li...@transpac.com>> 于2023年8月11日周五 04:18写道: >>>> As (almost) always, the devil is in the details. >>>> >>>> You haven’t said, but I’m assuming you’re writing out multiple files, each >>>> with a different schema, as otherwise you could just leverage the existing >>>> Flink support for CSV. >>>> >>>> So then you could combine the header/footer streams (adding a flag for >>>> header vs. footer), and connect that to the row data stream, then use a >>>> KeyedCoProcessFunction (I’m assuming you can key by something that >>>> identifies which schema). You’d buffer the row data & footer (separately >>>> in state). You would also need to set up a timer to fire at the max >>>> watermark, to flush out pending rows/footer when all of the input data has >>>> been processed. >>>> >>>> After that function you could configure the sink to bucket by the target >>>> schema. >>>> >>>> — Ken >>>> >>>> >>>>> On Aug 10, 2023, at 10:41 AM, Muazim Wani <muazim1...@gmail.com >>>>> <mailto:muazim1...@gmail.com>> wrote: >>>>> >>>>> Thanks for the response! >>>>> I have a specific use case where I am writing to a TextFile sink. I have >>>>> a Bounded stream of header data and need to merge it with another >>>>> bounded stream. While writing the data to a text file the header data >>>>> should be written before the original data(from another bounded stream). >>>>> And also at last I have another stream of footers where I would repeat >>>>> the same process. >>>>> I tried keeping an identifier for all three streams and based on these >>>>> identifiers I added the data in three different ListState using >>>>> KeyedProcess functions. So for headers I directly emitted the value but >>>>> for main data and footers if I store it in a state . The issue is Outside >>>>> KeyedProcess I am not able to emit the data in order. >>>>> Is there any way I can achieve the use case of orderding the dataStreams >>>>> . I also tried with union but it seems it adds data arbitrarily in both >>>>> streams . >>>>> Thanks and regards >>>>> >>>>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, <kkrugler_li...@transpac.com >>>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>>>> Hi Muazim, >>>>>> >>>>>> In Flink, a stream of data (unless bounded) is assumed to never end. >>>>>> >>>>>> So in your example below, this means stream 2 would NEVER be emitted, >>>>>> because stream 1 would never end (there is no time at which you know for >>>>>> sure that stream 1 is done). >>>>>> >>>>>> And this in turn means stream 2 would be buffered forever in state, thus >>>>>> growing unbounded. >>>>>> >>>>>> I would suggest elaborating on your use case. >>>>>> >>>>>> Regards, >>>>>> >>>>>> — Ken >>>>>> >>>>>> >>>>>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani <muazim1...@gmail.com >>>>>>> <mailto:muazim1...@gmail.com>> wrote: >>>>>>> >>>>>>> Hi Team, >>>>>>> I have a use case where I have two streams and want to join them in >>>>>>> stateful manner . >>>>>>> E.g data of stream 1 should be emitted before stream2. >>>>>>> I tried to store the data in ListState in KeyedProcessFunction but I am >>>>>>> not able to access state outside proccessElement(). >>>>>>> Is there any way I could achieve this? >>>>>>> Thanks and regards >>>>>>> >>>>>> >>>>>> -------------------------- >>>>>> Ken Krugler >>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>>>> Custom big data solutions >>>>>> Flink, Pinot, Solr, Elasticsearch >>>>>> >>>>>> >>>>>> >>>> >>>> -------------------------- >>>> Ken Krugler >>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>> Custom big data solutions >>>> Flink, Pinot, Solr, Elasticsearch >>>> >>>> >>>> -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch