Thanks for your reply!

Yes, I want to A_i and T_i run in the same slot. Ideally, T operator should
have 1 parallism in topo, also all A_i can start from the same timestamp,
but some minor difference of resume timestamp in different A_i source is
also acceptable. So I think multiple T operator is also ok to me here. But
the prerequisite of this topo can work is I can make sure T and A always
reside same TM.

The problem here both stream A and stream B is very huge. 200k ~ 300k
messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
compressed) per messages, and I have to keep the whole message in cache. So
it's hard to fit into Flink state.



Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道:

> Your topology is definitively interesting and makes sense to me on a high
> level. The main question remaining is the parallelism. I'm assuming you run
> your pipeline with parallelism p and both source A and timestampcalculator
> T are run with parallelism p. You want to create a situation where for A_i,
> there is an T_i which run in the same slot. Am I right?
>
> If so, then as you have noticed that there is currently no way to express
> that in Flink on a high level. One more idea before trying to solve it in a
> hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
> I'm thinking of creating a pipeline A->J(side input B)->T, because then
> it's easy to produce an operator chain, where everything even runs within
> the same thread.
>
> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote:
>
>> Thanks for your reply.
>>
>> I want to join two stream A and stream B. Items in stream A come in first
>> then I keep them in memory cache, as join key and item, then serval
>> minutes later the items in stream B come in then the join work is
>> performed. The timestamp of the latest expired item in memory cache is the
>> safe rollback timestamp, I can resume source A from that timestamp when I
>> restart.
>>
>> It's not very percise, maybe lost same items or send same items twice,
>> but seems useful to me in my situation. But if job restart, both source A
>> and source B resume from last consumed offset, it will make the absense of
>> serval minutes join result, which is unacceptable.
>>
>> The topo I consider is like
>>
>> source A -> parser --shuffle--> join -> sink
>> source B -> parser ...(parallel)      |--->timestampcalculator
>>
>> Memory cache aside in join operator, the join operator will broadcast the
>> timestamp of latest expired cache item to the timestampcalculator. Then
>> timestampcalculator will use them to calculate a safe rollback timestamp (a
>> moving minimum) that source A can resume from that timestamp, source B will
>> also restart from that timestamp. I will add a bloomfilter in sink's state
>> to avoid duplicate items.
>>
>> So I want to let timestampcalculator operator and source A are located in
>> one TM, then I can send this timestamp from timestampcalculator to source A
>> by static variable.
>>
>> Hope I make my problem clear with my poor English, it seems a little
>> tricky. But I think it's the only way to do two streams join and avoid to
>> store very huge state.
>>
>>
>>
>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道:
>>
>>> I still haven't fully understood. Do you mean you can't infer the
>>> timestamp in source A because it depends on some internal field of source B?
>>>
>>> How is that actually working in a parallel setting? Which timestamp is
>>> used in the different instances of a source?
>>>
>>> Say, we have task A1 which is the first subtask of source A and task B2
>>> as the second subtask of source B. How would you like them to be located?
>>> How does that correlate to the third subtask of the join (let's call it J3).
>>>
>>> Remember that through the shuffling before the join there is no clear
>>> correlation between any subtask of A or B to J...
>>>
>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote:
>>>
>>>> Thanks for your help!
>>>>
>>>> Now the timestamps already go with the items in streaming. My streaming
>>>> pipeline is like this:
>>>>
>>>> source -> parser --shuffle--> join -> sink
>>>>
>>>> Streaming A and streaming B go through this pipeline, I keep logs in
>>>> streaming A in memory cache (linkedHashmap) in join operator, then all logs
>>>> in streaming B tries to lookup up the cache and perform the actual join
>>>> work.
>>>>
>>>> I try to use the timestamp of the lastest expire item in memory as a
>>>> safe rollback timestamp, if I restart job, the source should use this
>>>> timestamp as start offset. The safe rollback timestamp is calucated in join
>>>> operator, but I want to use it in source. So the simplest way to pass this
>>>> information from join operator to source is use static variable, which
>>>> require source operator and join operator always locate in same TM process.
>>>>
>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道:
>>>>
>>>>> Hi Si-li,
>>>>>
>>>>> couldn't you also add the timestamp as a state to the source? So the
>>>>> time would store the timestamp of the last emitted record.
>>>>> It's nearly identical to your solution but would fit the recovery
>>>>> model of Flink much better.
>>>>> If you want to go further back to account for the records that have
>>>>> been actually processed in the join, you could also replay the data from
>>>>> <last timestamp> - <some offset>.
>>>>>
>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, I'll try it.
>>>>>>
>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道:
>>>>>>
>>>>>>> Hi Si-li,
>>>>>>> trying to answer your initial question: Theoretically, you could try
>>>>>>> using the co-location constraints to achieve this. But keep in mind that
>>>>>>> this might lead to multiple Join operators running in the same JVM 
>>>>>>> reducing
>>>>>>> the amount of memory each operator can utilize.
>>>>>>>
>>>>>>> Best,
>>>>>>> Matthias
>>>>>>>
>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks for your reply.
>>>>>>>>
>>>>>>>> It's a streaming job. The join operator is doing join work, such as
>>>>>>>> join. The join state is too large so I don't want to keep the state 
>>>>>>>> using
>>>>>>>> the mechanism that Flink provided, and also I don't need very precise 
>>>>>>>> join.
>>>>>>>> So I prefer to let the join operator to calculate a backward timestamp 
>>>>>>>> as
>>>>>>>> state, if the cluster restarts, the consumer can use 
>>>>>>>> setStartFromTimestamp
>>>>>>>> to start from that timestamp.
>>>>>>>>
>>>>>>>> Now my problem is, consumer can't read the state that join operator
>>>>>>>> written, so I need a way to need small message (64bit long) from 
>>>>>>>> downstream
>>>>>>>> to upstream. Redis may be a solution, but add external  dependency is a
>>>>>>>> secondary option if I can pass this message through memory.
>>>>>>>>
>>>>>>>>
>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道:
>>>>>>>>
>>>>>>>>> It would be good if you could elaborate a bit more on your
>>>>>>>>> use-case.
>>>>>>>>> Are you using batch or streaming? What kind of "message" are we
>>>>>>>>> talking about? Why are you thinking of using a static variable, 
>>>>>>>>> instead of
>>>>>>>>> just treating this message as part of the data(set/stream)?
>>>>>>>>>
>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>>>>>>>
>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is send
>>>>>>>>> some messages from downstream operators to upstream operators, which I
>>>>>>>>> consider use static variable.
>>>>>>>>>
>>>>>>>>> But it makes me have to make sure in one taskmanager process it
>>>>>>>>> always has these two operators, can I use CoLocationGroup to solve 
>>>>>>>>> this
>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage
>>>>>>>>> of CoLocationGroup ?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> --
>>>>>>>>> Best regards
>>>>>>>>>
>>>>>>>>> Sili Liu
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Best regards

Sili Liu

Reply via email to