As (almost) always, the devil is in the details.

You haven’t said, but I’m assuming you’re writing out multiple files, each with 
a different schema, as otherwise you could just leverage the existing Flink 
support for CSV.

So then you could combine the header/footer streams (adding a flag for header 
vs. footer), and connect that to the row data stream, then use a 
KeyedCoProcessFunction (I’m assuming you can key by something that identifies 
which schema). You’d buffer the row data & footer (separately in state). You 
would also need to set up a timer to fire at the max watermark, to flush out 
pending rows/footer when all of the input data has been processed.

After that function you could configure the sink to bucket by the target schema.

— Ken


> On Aug 10, 2023, at 10:41 AM, Muazim Wani <muazim1...@gmail.com> wrote:
> 
> Thanks for the response!
> I have a specific use case where I am writing to a TextFile sink. I have a 
> Bounded stream of header data and need  to merge it with another bounded 
> stream. While writing the data to a text file the header data should be 
> written before the original data(from another bounded stream). And also at 
> last I have another stream of footers where I would repeat the same process.
> I tried keeping an identifier for all three streams and based on these 
> identifiers I added the data in three different ListState using KeyedProcess 
> functions. So for headers I directly emitted the value but for main data and 
> footers if I store it in a state . The issue is Outside KeyedProcess I am not 
> able to emit the data in order.
> Is there any way I can achieve the use case of orderding the dataStreams . I 
> also tried with union but it seems it adds data arbitrarily in both streams .
> Thanks and regards
> 
> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
>> Hi Muazim,
>> 
>> In Flink, a stream of data (unless bounded) is assumed to never end.
>> 
>> So in your example below, this means stream 2 would NEVER be emitted, 
>> because stream 1 would never end (there is no time at which you know for 
>> sure that stream 1 is done).
>> 
>> And this in turn means stream 2 would be buffered forever in state, thus 
>> growing unbounded.
>> 
>> I would suggest elaborating on your use case.
>> 
>> Regards,
>> 
>> — Ken
>> 
>> 
>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani <muazim1...@gmail.com 
>>> <mailto:muazim1...@gmail.com>> wrote:
>>> 
>>> Hi Team,
>>> I have a use case where I have two streams and want to join them in 
>>> stateful manner . 
>>> E.g data of stream 1 should be emitted before stream2.
>>> I tried to store the data in ListState in KeyedProcessFunction but I am not 
>>> able to access state  outside proccessElement().
>>> Is there any way I could achieve this?
>>> Thanks and regards
>>> 
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to