Your topology is definitively interesting and makes sense to me on a high
level. The main question remaining is the parallelism. I'm assuming you run
your pipeline with parallelism p and both source A and timestampcalculator
T are run with parallelism p. You want to create a situation where for A_i,
there is an T_i which run in the same slot. Am I right?

If so, then as you have noticed that there is currently no way to express
that in Flink on a high level. One more idea before trying to solve it in a
hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
I'm thinking of creating a pipeline A->J(side input B)->T, because then
it's easy to produce an operator chain, where everything even runs within
the same thread.

On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote:

> Thanks for your reply.
>
> I want to join two stream A and stream B. Items in stream A come in first
> then I keep them in memory cache, as join key and item, then serval
> minutes later the items in stream B come in then the join work is
> performed. The timestamp of the latest expired item in memory cache is the
> safe rollback timestamp, I can resume source A from that timestamp when I
> restart.
>
> It's not very percise, maybe lost same items or send same items twice, but
> seems useful to me in my situation. But if job restart, both source A and
> source B resume from last consumed offset, it will make the absense of
> serval minutes join result, which is unacceptable.
>
> The topo I consider is like
>
> source A -> parser --shuffle--> join -> sink
> source B -> parser ...(parallel)      |--->timestampcalculator
>
> Memory cache aside in join operator, the join operator will broadcast the
> timestamp of latest expired cache item to the timestampcalculator. Then
> timestampcalculator will use them to calculate a safe rollback timestamp (a
> moving minimum) that source A can resume from that timestamp, source B will
> also restart from that timestamp. I will add a bloomfilter in sink's state
> to avoid duplicate items.
>
> So I want to let timestampcalculator operator and source A are located in
> one TM, then I can send this timestamp from timestampcalculator to source A
> by static variable.
>
> Hope I make my problem clear with my poor English, it seems a little
> tricky. But I think it's the only way to do two streams join and avoid to
> store very huge state.
>
>
>
> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道:
>
>> I still haven't fully understood. Do you mean you can't infer the
>> timestamp in source A because it depends on some internal field of source B?
>>
>> How is that actually working in a parallel setting? Which timestamp is
>> used in the different instances of a source?
>>
>> Say, we have task A1 which is the first subtask of source A and task B2
>> as the second subtask of source B. How would you like them to be located?
>> How does that correlate to the third subtask of the join (let's call it J3).
>>
>> Remember that through the shuffling before the join there is no clear
>> correlation between any subtask of A or B to J...
>>
>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote:
>>
>>> Thanks for your help!
>>>
>>> Now the timestamps already go with the items in streaming. My streaming
>>> pipeline is like this:
>>>
>>> source -> parser --shuffle--> join -> sink
>>>
>>> Streaming A and streaming B go through this pipeline, I keep logs in
>>> streaming A in memory cache (linkedHashmap) in join operator, then all logs
>>> in streaming B tries to lookup up the cache and perform the actual join
>>> work.
>>>
>>> I try to use the timestamp of the lastest expire item in memory as a
>>> safe rollback timestamp, if I restart job, the source should use this
>>> timestamp as start offset. The safe rollback timestamp is calucated in join
>>> operator, but I want to use it in source. So the simplest way to pass this
>>> information from join operator to source is use static variable, which
>>> require source operator and join operator always locate in same TM process.
>>>
>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道:
>>>
>>>> Hi Si-li,
>>>>
>>>> couldn't you also add the timestamp as a state to the source? So the
>>>> time would store the timestamp of the last emitted record.
>>>> It's nearly identical to your solution but would fit the recovery model
>>>> of Flink much better.
>>>> If you want to go further back to account for the records that have
>>>> been actually processed in the join, you could also replay the data from
>>>> <last timestamp> - <some offset>.
>>>>
>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>
>>>>> Thanks, I'll try it.
>>>>>
>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道:
>>>>>
>>>>>> Hi Si-li,
>>>>>> trying to answer your initial question: Theoretically, you could try
>>>>>> using the co-location constraints to achieve this. But keep in mind that
>>>>>> this might lead to multiple Join operators running in the same JVM 
>>>>>> reducing
>>>>>> the amount of memory each operator can utilize.
>>>>>>
>>>>>> Best,
>>>>>> Matthias
>>>>>>
>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for your reply.
>>>>>>>
>>>>>>> It's a streaming job. The join operator is doing join work, such as
>>>>>>> join. The join state is too large so I don't want to keep the state 
>>>>>>> using
>>>>>>> the mechanism that Flink provided, and also I don't need very precise 
>>>>>>> join.
>>>>>>> So I prefer to let the join operator to calculate a backward timestamp 
>>>>>>> as
>>>>>>> state, if the cluster restarts, the consumer can use 
>>>>>>> setStartFromTimestamp
>>>>>>> to start from that timestamp.
>>>>>>>
>>>>>>> Now my problem is, consumer can't read the state that join operator
>>>>>>> written, so I need a way to need small message (64bit long) from 
>>>>>>> downstream
>>>>>>> to upstream. Redis may be a solution, but add external  dependency is a
>>>>>>> secondary option if I can pass this message through memory.
>>>>>>>
>>>>>>>
>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道:
>>>>>>>
>>>>>>>> It would be good if you could elaborate a bit more on your use-case.
>>>>>>>> Are you using batch or streaming? What kind of "message" are we
>>>>>>>> talking about? Why are you thinking of using a static variable, 
>>>>>>>> instead of
>>>>>>>> just treating this message as part of the data(set/stream)?
>>>>>>>>
>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>>>>>>
>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is send
>>>>>>>> some messages from downstream operators to upstream operators, which I
>>>>>>>> consider use static variable.
>>>>>>>>
>>>>>>>> But it makes me have to make sure in one taskmanager process it
>>>>>>>> always has these two operators, can I use CoLocationGroup to solve this
>>>>>>>> problem? Or can anyone give me an example to demostrate the usage
>>>>>>>> of CoLocationGroup ?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
> Best regards
>
> Sili Liu
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to