Thank you so much for taking the time to provide me with such a detailed
response. Your assistance has been incredibly helpful in clarifying my
understanding!
Let me provide you with the exact scenario ,  I think there might be some
misunderstanding. All the streams are bounded and parallelism is set to 1.
I am  writing to 1 file only. *So the main use case Is to provide support
for dynamic Headers and footers with Aggregated values. *
e.g if my input is

Static Header
JohnDoe, 12
Alice, 21

My dynamic header is "Dynamic Header" and dynamic Footer is "Dynamic
footer". *These headers are on top of static headers which are already
present in the DataStream(bounded).* The output would be like

Dynamic Header 33
Static Header
JohnDoe, 12
Alice, 21
Dynamic footer

In this particular case I am writing to one file only. I have set my
parallelism to 1. I have 1 InputDataStream on top of that I have one
dynamic header and footer stream (*which contains some dynamic params such
as aggregated value on some fields e.g salary etc*) .*Now I am left with
three transformed streams in Sink Operator. i.e dynamic HeaderStream with
aggregated Value 2) input DataStream 3) dynamic Footer stream with
aggregated Value. *

I could have used String for both Dynamic Headers and footers and emitted
the headers in open() method and footers in close() method of
TextOutputFormat, That would have solved my useCase.* But as I get a
DataStream(with only 1 value i.e final sum) back from Aggregated Values
(Currently I am using reduce function).* I am adding headers to that
DataStream only and similarly for footers. Now I am not able to merge them
while maintaining the order.

Below I have provided my implementation of reduce function
public DataStream<T> sum(
SumFunction<T> reduceFunction, DataStream<PojoClass> stream) {

DataStream<T> inputRecordTransformedDataStream =
stream.map(this::transform).returns((TypeInformation<T>) Types.GENERIC
(Number.class));

return inputRecordTransformedDataStream
.keyBy(value -> "key")
.reduce(reduceFunction);
}


Below I am adding my Headers to my Sum Stream

public static DataStream<PojoClass> getAggregatedStream(
String header, DataStream<String> sinkStream) {

return sinkStream
.keyBy(key -> "value")
.flatMap(
(FlatMapFunction<String, PojoClass>)
(value, out) -> out.collect(PojoClass.builder().data(header +
value).build()))
.returns(PojoClass.class);
}

Add HeaderStream is a Bounded DataStream with Dynamic Headers and
Aggregated value.
DataStream<PojoClass> headerStream = addHeaderRows(sinkStream);

DataStream<PojoClass> footerStream = addFooterRows(finalStream);

DataStream<PojoClass> sinkStream;

Thanks a lot for your time and the advice.
Muazim Wani


On Fri, 11 Aug 2023 at 07:45, Hang Ruan <ruanhang1...@gmail.com> wrote:

> ps: Forget the link: Hybrid Source[1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/hybridsource/
>
> Hang Ruan <ruanhang1...@gmail.com> 于2023年8月11日周五 10:14写道:
>
>> Hi, Muazim.
>>
>> I think the Hybird Source[1] may be helpful for your case.
>>
>> Best,
>> Hang
>>
>> Ken Krugler <kkrugler_li...@transpac.com> 于2023年8月11日周五 04:18写道:
>>
>>> As (almost) always, the devil is in the details.
>>>
>>> You haven’t said, but I’m assuming you’re writing out multiple files,
>>> each with a different schema, as otherwise you could just leverage the
>>> existing Flink support for CSV.
>>>
>>> So then you could combine the header/footer streams (adding a flag for
>>> header vs. footer), and connect that to the row data stream, then use a
>>> KeyedCoProcessFunction (I’m assuming you can key by something that
>>> identifies which schema). You’d buffer the row data & footer (separately in
>>> state). You would also need to set up a timer to fire at the max watermark,
>>> to flush out pending rows/footer when all of the input data has been
>>> processed.
>>>
>>> After that function you could configure the sink to bucket by the target
>>> schema.
>>>
>>> — Ken
>>>
>>>
>>> On Aug 10, 2023, at 10:41 AM, Muazim Wani <muazim1...@gmail.com> wrote:
>>>
>>> Thanks for the response!
>>> I have a specific use case where I am writing to a TextFile sink. I have
>>> a Bounded stream of header data and need  to merge it with another bounded
>>> stream. While writing the data to a text file the header data should be
>>> written before the original data(from another bounded stream). And also at
>>> last I have another stream of footers where I would repeat the same process.
>>> I tried keeping an identifier for all three streams and based on these
>>> identifiers I added the data in three different ListState
>>> using KeyedProcess functions. So for headers I directly emitted the value
>>> but for main data and footers if I store it in a state . The issue is
>>> Outside KeyedProcess I am not able to emit the data in order.
>>> Is there any way I can achieve the use case of orderding the dataStreams
>>> . I also tried with union but it seems it adds data arbitrarily in both
>>> streams .
>>> Thanks and regards
>>>
>>> On Thu, 10 Aug, 2023, 10:59 pm Ken Krugler, <kkrugler_li...@transpac.com>
>>> wrote:
>>>
>>>> Hi Muazim,
>>>>
>>>> In Flink, a stream of data (unless bounded) is assumed to never end.
>>>>
>>>> So in your example below, this means stream 2 would NEVER be emitted,
>>>> because stream 1 would never end (there is no time at which you know for
>>>> sure that stream 1 is done).
>>>>
>>>> And this in turn means stream 2 would be buffered forever in state,
>>>> thus growing unbounded.
>>>>
>>>> I would suggest elaborating on your use case.
>>>>
>>>> Regards,
>>>>
>>>> — Ken
>>>>
>>>>
>>>> On Aug 10, 2023, at 10:11 AM, Muazim Wani <muazim1...@gmail.com> wrote:
>>>>
>>>> Hi Team,
>>>> I have a use case where I have two streams and want to join them in
>>>> stateful manner .
>>>> E.g data of stream 1 should be emitted before stream2.
>>>> I tried to store the data in ListState in KeyedProcessFunction but I am
>>>> not able to access state  outside proccessElement().
>>>> Is there any way I could achieve this?
>>>> Thanks and regards
>>>>
>>>>
>>>> --------------------------
>>>> Ken Krugler
>>>> http://www.scaleunlimited.com
>>>> Custom big data solutions
>>>> Flink, Pinot, Solr, Elasticsearch
>>>>
>>>>
>>>>
>>>>
>>> --------------------------
>>> Ken Krugler
>>> http://www.scaleunlimited.com
>>> Custom big data solutions
>>> Flink, Pinot, Solr, Elasticsearch
>>>
>>>
>>>
>>>

Reply via email to