Thanks for your reply.

I want to join two stream A and stream B. Items in stream A come in first
then I keep them in memory cache, as join key and item, then serval
minutes later the items in stream B come in then the join work is
performed. The timestamp of the latest expired item in memory cache is the
safe rollback timestamp, I can resume source A from that timestamp when I
restart.

It's not very percise, maybe lost same items or send same items twice, but
seems useful to me in my situation. But if job restart, both source A and
source B resume from last consumed offset, it will make the absense of
serval minutes join result, which is unacceptable.

The topo I consider is like

source A -> parser --shuffle--> join -> sink
source B -> parser ...(parallel)      |--->timestampcalculator

Memory cache aside in join operator, the join operator will broadcast the
timestamp of latest expired cache item to the timestampcalculator. Then
timestampcalculator will use them to calculate a safe rollback timestamp (a
moving minimum) that source A can resume from that timestamp, source B will
also restart from that timestamp. I will add a bloomfilter in sink's state
to avoid duplicate items.

So I want to let timestampcalculator operator and source A are located in
one TM, then I can send this timestamp from timestampcalculator to source A
by static variable.

Hope I make my problem clear with my poor English, it seems a little
tricky. But I think it's the only way to do two streams join and avoid to
store very huge state.



Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道:

> I still haven't fully understood. Do you mean you can't infer the
> timestamp in source A because it depends on some internal field of source B?
>
> How is that actually working in a parallel setting? Which timestamp is
> used in the different instances of a source?
>
> Say, we have task A1 which is the first subtask of source A and task B2 as
> the second subtask of source B. How would you like them to be located? How
> does that correlate to the third subtask of the join (let's call it J3).
>
> Remember that through the shuffling before the join there is no clear
> correlation between any subtask of A or B to J...
>
> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote:
>
>> Thanks for your help!
>>
>> Now the timestamps already go with the items in streaming. My streaming
>> pipeline is like this:
>>
>> source -> parser --shuffle--> join -> sink
>>
>> Streaming A and streaming B go through this pipeline, I keep logs in
>> streaming A in memory cache (linkedHashmap) in join operator, then all logs
>> in streaming B tries to lookup up the cache and perform the actual join
>> work.
>>
>> I try to use the timestamp of the lastest expire item in memory as a safe
>> rollback timestamp, if I restart job, the source should use this timestamp
>> as start offset. The safe rollback timestamp is calucated in join operator,
>> but I want to use it in source. So the simplest way to pass this
>> information from join operator to source is use static variable, which
>> require source operator and join operator always locate in same TM process.
>>
>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道:
>>
>>> Hi Si-li,
>>>
>>> couldn't you also add the timestamp as a state to the source? So the
>>> time would store the timestamp of the last emitted record.
>>> It's nearly identical to your solution but would fit the recovery model
>>> of Flink much better.
>>> If you want to go further back to account for the records that have been
>>> actually processed in the join, you could also replay the data from <last
>>> timestamp> - <some offset>.
>>>
>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote:
>>>
>>>> Thanks, I'll try it.
>>>>
>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道:
>>>>
>>>>> Hi Si-li,
>>>>> trying to answer your initial question: Theoretically, you could try
>>>>> using the co-location constraints to achieve this. But keep in mind that
>>>>> this might lead to multiple Join operators running in the same JVM 
>>>>> reducing
>>>>> the amount of memory each operator can utilize.
>>>>>
>>>>> Best,
>>>>> Matthias
>>>>>
>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for your reply.
>>>>>>
>>>>>> It's a streaming job. The join operator is doing join work, such as
>>>>>> join. The join state is too large so I don't want to keep the state using
>>>>>> the mechanism that Flink provided, and also I don't need very precise 
>>>>>> join.
>>>>>> So I prefer to let the join operator to calculate a backward timestamp as
>>>>>> state, if the cluster restarts, the consumer can use 
>>>>>> setStartFromTimestamp
>>>>>> to start from that timestamp.
>>>>>>
>>>>>> Now my problem is, consumer can't read the state that join operator
>>>>>> written, so I need a way to need small message (64bit long) from 
>>>>>> downstream
>>>>>> to upstream. Redis may be a solution, but add external  dependency is a
>>>>>> secondary option if I can pass this message through memory.
>>>>>>
>>>>>>
>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道:
>>>>>>
>>>>>>> It would be good if you could elaborate a bit more on your use-case.
>>>>>>> Are you using batch or streaming? What kind of "message" are we
>>>>>>> talking about? Why are you thinking of using a static variable, instead 
>>>>>>> of
>>>>>>> just treating this message as part of the data(set/stream)?
>>>>>>>
>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>>>>>
>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is send
>>>>>>> some messages from downstream operators to upstream operators, which I
>>>>>>> consider use static variable.
>>>>>>>
>>>>>>> But it makes me have to make sure in one taskmanager process it
>>>>>>> always has these two operators, can I use CoLocationGroup to solve this
>>>>>>> problem? Or can anyone give me an example to demostrate the usage
>>>>>>> of CoLocationGroup ?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Best regards

Sili Liu

Reply via email to