Thank you for the detailed explanation. On Tue, 29 Aug 2023 at 22:45, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> If you need a specific output order, then merge the three streams, key by > a constant (like 1), and run that into a KeyedProcessFunction. > > That function can buffer out-of-order records, and set up a timer to fire > when it gets a MAX_WATERMARK (which indicates that all streams are > finished) so that it can flush any pending data from state, in the proper > order. > > You’d have a ListState<PojoClass> for the reduced data, and a > ValueState<PojoClass> for the footer. > > Once you get the header, you can flush all reduced data, and no longer > buffer it. > > When timer fires, you can flush the footer. > > — Ken > > > > On Aug 10, 2023, at 10:14 PM, Muazim Wani <muazim1...@gmail.com> wrote: > > Thank you so much for taking the time to provide me with such a detailed > response. Your assistance has been incredibly helpful in clarifying my > understanding! > Let me provide you with the exact scenario , I think there might be some > misunderstanding. All the streams are bounded and parallelism is set to 1. > I am writing to 1 file only. *So the main use case Is to provide support > for dynamic Headers and footers with Aggregated values. * > e.g if my input is > > Static Header > JohnDoe, 12 > Alice, 21 > > My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic > footer". *These headers are on top of static headers which are already > present in the DataStream(bounded).* The output would be like > > Dynamic Header 33 > Static Header > JohnDoe, 12 > Alice, 21 > Dynamic footer > > In this particular case I am writing to one file only. I have set my > parallelism to 1. I have 1 InputDataStream on top of that I have one > dynamic header and footer stream (*which contains some dynamic params > such as aggregated value on some fields e.g salary etc*) .*Now I am left > with three transformed streams in Sink Operator. i.e dynamic HeaderStream > with aggregated Value 2) input DataStream 3) dynamic Footer stream with > aggregated Value. * > > I could have used String for both Dynamic Headers and footers and emitted > the headers in open() method and footers in close() method of > TextOutputFormat, That would have solved my useCase.* But as I get a > DataStream(with only 1 value i.e final sum) back from Aggregated Values > (Currently I am using reduce function).* I am adding headers to that > DataStream only and similarly for footers. Now I am not able to merge them > while maintaining the order. > > Below I have provided my implementation of reduce function > public DataStream<T> sum( > SumFunction<T> reduceFunction, DataStream<PojoClass> stream) { > > DataStream<T> inputRecordTransformedDataStream = > stream.map(this::transform).returns((TypeInformation<T>) Types.GENERIC > (Number.class)); > > return inputRecordTransformedDataStream > .keyBy(value -> "key") > .reduce(reduceFunction); > } > > > Below I am adding my Headers to my Sum Stream > > public static DataStream<PojoClass> getAggregatedStream( > String header, DataStream<String> sinkStream) { > > return sinkStream > .keyBy(key -> "value") > .flatMap( > (FlatMapFunction<String, PojoClass>) > (value, out) -> out.collect(PojoClass.builder().data(header + > value).build())) > .returns(PojoClass.class); > } > > Add HeaderStream is a Bounded DataStream with Dynamic Headers and > Aggregated value. > DataStream<PojoClass> headerStream = addHeaderRows(sinkStream); > > DataStream<PojoClass> footerStream = addFooterRows(finalStream); > > DataStream<PojoClass> sinkStream; > > Thanks a lot for your time and the advice. > Muazim Wani > > > On Fri, 11 Aug 2023 at 07:45, Hang Ruan <ruanhang1...@gmail.com> wrote: > >> ps: Forget the link: Hybrid Source[1] >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/ >> >> Hang Ruan <ruanhang1...@gmail.com> 于2023年8月11日周五 10:14写道: >> >>> Hi, Muazim. >>> >>> I think the Hybird Source[1] may be helpful for your case. >>> >>> Best, >>> Hang >>> >>> Ken Krugler <kkrugler_li...@transpac.com> 于2023年8月11日周五 04:18写道: >>> >>>> As (almost) always, the devil is in the details. >>>> >>>> You haven’t said, but I’m assuming you’re writing out multiple files, >>>> each with a different schema, as otherwise you could just leverage the >>>> existing Flink support for CSV. >>>> >>>> So then you could combine the header/footer streams (adding a flag for >>>> header vs. footer), and connect that to the row data stream, then use a >>>> KeyedCoProcessFunction (I’m assuming you can key by something that >>>> identifies which schema). You’d buffer the row data & footer (separately in >>>> state). You would also need to set up a timer to fire at the max watermark, >>>> to flush out pending rows/footer when all of the input data has been >>>> processed. >>>> >>>> After that function you could configure the sink to bucket by the >>>> target schema. >>>> >>>> — Ken >>>> >>>> >>>> On Aug 10, 2023, at 10:41 AM, Muazim Wani <muazim1...@gmail.com> wrote: >>>> >>>> Thanks for the response! >>>> I have a specific use case where I am writing to a TextFile sink. I >>>> have a Bounded stream of header data and need to merge it with another >>>> bounded stream. While writing the data to a text file the header data >>>> should be written before the original data(from another bounded stream). >>>> And also at last I have another stream of footers where I would repeat the >>>> same process. >>>> I tried keeping an identifier for all three streams and based on these >>>> identifiers I added the data in three different ListState >>>> using KeyedProcess functions. So for headers I directly emitted the value >>>> but for main data and footers if I store it in a state . The issue is >>>> Outside KeyedProcess I am not able to emit the data in order. >>>> Is there any way I can achieve the use case of orderding the >>>> dataStreams . I also tried with union but it seems it adds data >>>> arbitrarily in both streams . >>>> Thanks and regards >>>> >>>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, < >>>> kkrugler_li...@transpac.com> wrote: >>>> >>>>> Hi Muazim, >>>>> >>>>> In Flink, a stream of data (unless bounded) is assumed to never end. >>>>> >>>>> So in your example below, this means stream 2 would NEVER be emitted, >>>>> because stream 1 would never end (there is no time at which you know for >>>>> sure that stream 1 is done). >>>>> >>>>> And this in turn means stream 2 would be buffered forever in state, >>>>> thus growing unbounded. >>>>> >>>>> I would suggest elaborating on your use case. >>>>> >>>>> Regards, >>>>> >>>>> — Ken >>>>> >>>>> >>>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani <muazim1...@gmail.com> >>>>> wrote: >>>>> >>>>> Hi Team, >>>>> I have a use case where I have two streams and want to join them in >>>>> stateful manner . >>>>> E.g data of stream 1 should be emitted before stream2. >>>>> I tried to store the data in ListState in KeyedProcessFunction but I >>>>> am not able to access state outside proccessElement(). >>>>> Is there any way I could achieve this? >>>>> Thanks and regards >>>>> >>>>> >>>>> -------------------------- >>>>> Ken Krugler >>>>> http://www.scaleunlimited.com >>>>> Custom big data solutions >>>>> Flink, Pinot, Solr, Elasticsearch >>>>> >>>>> >>>>> >>>>> >>>> -------------------------- >>>> Ken Krugler >>>> http://www.scaleunlimited.com >>>> Custom big data solutions >>>> Flink, Pinot, Solr, Elasticsearch >>>> >>>> >>>> >>>> > -------------------------- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > >