Thank you so much for taking the time to provide me with such a detailed response. Your assistance has been incredibly helpful in clarifying my understanding! Let me provide you with the exact scenario , I think there might be some misunderstanding. All the streams are bounded and parallelism is set to 1. I am writing to 1 file only. *So the main use case Is to provide support for dynamic Headers and footers with Aggregated values. * e.g if my input is
Static Header JohnDoe, 12 Alice, 21 My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic footer". *These headers are on top of static headers which are already present in the DataStream(bounded).* The output would be like Dynamic Header 33 Static Header JohnDoe, 12 Alice, 21 Dynamic footer In this particular case I am writing to one file only. I have set my parallelism to 1. I have 1 InputDataStream on top of that I have one dynamic header and footer stream (*which contains some dynamic params such as aggregated value on some fields e.g salary etc*) .*Now I am left with three transformed streams in Sink Operator. i.e dynamic HeaderStream with aggregated Value 2) input DataStream 3) dynamic Footer stream with aggregated Value. * I could have used String for both Dynamic Headers and footers and emitted the headers in open() method and footers in close() method of TextOutputFormat, That would have solved my useCase.* But as I get a DataStream(with only 1 value i.e final sum) back from Aggregated Values (Currently I am using reduce function).* I am adding headers to that DataStream only and similarly for footers. Now I am not able to merge them while maintaining the order. Below I have provided my implementation of reduce function public DataStream<T> sum( SumFunction<T> reduceFunction, DataStream<PojoClass> stream) { DataStream<T> inputRecordTransformedDataStream = stream.map(this::transform).returns((TypeInformation<T>) Types.GENERIC (Number.class)); return inputRecordTransformedDataStream .keyBy(value -> "key") .reduce(reduceFunction); } Below I am adding my Headers to my Sum Stream public static DataStream<PojoClass> getAggregatedStream( String header, DataStream<String> sinkStream) { return sinkStream .keyBy(key -> "value") .flatMap( (FlatMapFunction<String, PojoClass>) (value, out) -> out.collect(PojoClass.builder().data(header + value).build())) .returns(PojoClass.class); } Add HeaderStream is a Bounded DataStream with Dynamic Headers and Aggregated value. DataStream<PojoClass> headerStream = addHeaderRows(sinkStream); DataStream<PojoClass> footerStream = addFooterRows(finalStream); DataStream<PojoClass> sinkStream; Thanks a lot for your time and the advice. Muazim Wani On Fri, 11 Aug 2023 at 07:45, Hang Ruan <ruanhang1...@gmail.com> wrote: > ps: Forget the link: Hybrid Source[1] > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/ > > Hang Ruan <ruanhang1...@gmail.com> 于2023年8月11日周五 10:14写道: > >> Hi, Muazim. >> >> I think the Hybird Source[1] may be helpful for your case. >> >> Best, >> Hang >> >> Ken Krugler <kkrugler_li...@transpac.com> 于2023年8月11日周五 04:18写道: >> >>> As (almost) always, the devil is in the details. >>> >>> You haven’t said, but I’m assuming you’re writing out multiple files, >>> each with a different schema, as otherwise you could just leverage the >>> existing Flink support for CSV. >>> >>> So then you could combine the header/footer streams (adding a flag for >>> header vs. footer), and connect that to the row data stream, then use a >>> KeyedCoProcessFunction (I’m assuming you can key by something that >>> identifies which schema). You’d buffer the row data & footer (separately in >>> state). You would also need to set up a timer to fire at the max watermark, >>> to flush out pending rows/footer when all of the input data has been >>> processed. >>> >>> After that function you could configure the sink to bucket by the target >>> schema. >>> >>> — Ken >>> >>> >>> On Aug 10, 2023, at 10:41 AM, Muazim Wani <muazim1...@gmail.com> wrote: >>> >>> Thanks for the response! >>> I have a specific use case where I am writing to a TextFile sink. I have >>> a Bounded stream of header data and need to merge it with another bounded >>> stream. While writing the data to a text file the header data should be >>> written before the original data(from another bounded stream). And also at >>> last I have another stream of footers where I would repeat the same process. >>> I tried keeping an identifier for all three streams and based on these >>> identifiers I added the data in three different ListState >>> using KeyedProcess functions. So for headers I directly emitted the value >>> but for main data and footers if I store it in a state . The issue is >>> Outside KeyedProcess I am not able to emit the data in order. >>> Is there any way I can achieve the use case of orderding the dataStreams >>> . I also tried with union but it seems it adds data arbitrarily in both >>> streams . >>> Thanks and regards >>> >>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, <kkrugler_li...@transpac.com> >>> wrote: >>> >>>> Hi Muazim, >>>> >>>> In Flink, a stream of data (unless bounded) is assumed to never end. >>>> >>>> So in your example below, this means stream 2 would NEVER be emitted, >>>> because stream 1 would never end (there is no time at which you know for >>>> sure that stream 1 is done). >>>> >>>> And this in turn means stream 2 would be buffered forever in state, >>>> thus growing unbounded. >>>> >>>> I would suggest elaborating on your use case. >>>> >>>> Regards, >>>> >>>> — Ken >>>> >>>> >>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani <muazim1...@gmail.com> wrote: >>>> >>>> Hi Team, >>>> I have a use case where I have two streams and want to join them in >>>> stateful manner . >>>> E.g data of stream 1 should be emitted before stream2. >>>> I tried to store the data in ListState in KeyedProcessFunction but I am >>>> not able to access state outside proccessElement(). >>>> Is there any way I could achieve this? >>>> Thanks and regards >>>> >>>> >>>> -------------------------- >>>> Ken Krugler >>>> http://www.scaleunlimited.com >>>> Custom big data solutions >>>> Flink, Pinot, Solr, Elasticsearch >>>> >>>> >>>> >>>> >>> -------------------------- >>> Ken Krugler >>> http://www.scaleunlimited.com >>> Custom big data solutions >>> Flink, Pinot, Solr, Elasticsearch >>> >>> >>> >>>