Your topology is definitively interesting and makes sense to me on a high level. The main question remaining is the parallelism. I'm assuming you run your pipeline with parallelism p and both source A and timestampcalculator T are run with parallelism p. You want to create a situation where for A_i, there is an T_i which run in the same slot. Am I right?
If so, then as you have noticed that there is currently no way to express that in Flink on a high level. One more idea before trying to solve it in a hacky way: How large is B? Could use a broadcast to avoid the shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T, because then it's easy to produce an operator chain, where everything even runs within the same thread. On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote: > Thanks for your reply. > > I want to join two stream A and stream B. Items in stream A come in first > then I keep them in memory cache, as join key and item, then serval > minutes later the items in stream B come in then the join work is > performed. The timestamp of the latest expired item in memory cache is the > safe rollback timestamp, I can resume source A from that timestamp when I > restart. > > It's not very percise, maybe lost same items or send same items twice, but > seems useful to me in my situation. But if job restart, both source A and > source B resume from last consumed offset, it will make the absense of > serval minutes join result, which is unacceptable. > > The topo I consider is like > > source A -> parser --shuffle--> join -> sink > source B -> parser ...(parallel) |--->timestampcalculator > > Memory cache aside in join operator, the join operator will broadcast the > timestamp of latest expired cache item to the timestampcalculator. Then > timestampcalculator will use them to calculate a safe rollback timestamp (a > moving minimum) that source A can resume from that timestamp, source B will > also restart from that timestamp. I will add a bloomfilter in sink's state > to avoid duplicate items. > > So I want to let timestampcalculator operator and source A are located in > one TM, then I can send this timestamp from timestampcalculator to source A > by static variable. > > Hope I make my problem clear with my poor English, it seems a little > tricky. But I think it's the only way to do two streams join and avoid to > store very huge state. > > > > Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道: > >> I still haven't fully understood. Do you mean you can't infer the >> timestamp in source A because it depends on some internal field of source B? >> >> How is that actually working in a parallel setting? Which timestamp is >> used in the different instances of a source? >> >> Say, we have task A1 which is the first subtask of source A and task B2 >> as the second subtask of source B. How would you like them to be located? >> How does that correlate to the third subtask of the join (let's call it J3). >> >> Remember that through the shuffling before the join there is no clear >> correlation between any subtask of A or B to J... >> >> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote: >> >>> Thanks for your help! >>> >>> Now the timestamps already go with the items in streaming. My streaming >>> pipeline is like this: >>> >>> source -> parser --shuffle--> join -> sink >>> >>> Streaming A and streaming B go through this pipeline, I keep logs in >>> streaming A in memory cache (linkedHashmap) in join operator, then all logs >>> in streaming B tries to lookup up the cache and perform the actual join >>> work. >>> >>> I try to use the timestamp of the lastest expire item in memory as a >>> safe rollback timestamp, if I restart job, the source should use this >>> timestamp as start offset. The safe rollback timestamp is calucated in join >>> operator, but I want to use it in source. So the simplest way to pass this >>> information from join operator to source is use static variable, which >>> require source operator and join operator always locate in same TM process. >>> >>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道: >>> >>>> Hi Si-li, >>>> >>>> couldn't you also add the timestamp as a state to the source? So the >>>> time would store the timestamp of the last emitted record. >>>> It's nearly identical to your solution but would fit the recovery model >>>> of Flink much better. >>>> If you want to go further back to account for the records that have >>>> been actually processed in the join, you could also replay the data from >>>> <last timestamp> - <some offset>. >>>> >>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote: >>>> >>>>> Thanks, I'll try it. >>>>> >>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道: >>>>> >>>>>> Hi Si-li, >>>>>> trying to answer your initial question: Theoretically, you could try >>>>>> using the co-location constraints to achieve this. But keep in mind that >>>>>> this might lead to multiple Join operators running in the same JVM >>>>>> reducing >>>>>> the amount of memory each operator can utilize. >>>>>> >>>>>> Best, >>>>>> Matthias >>>>>> >>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>> >>>>>>> Thanks for your reply. >>>>>>> >>>>>>> It's a streaming job. The join operator is doing join work, such as >>>>>>> join. The join state is too large so I don't want to keep the state >>>>>>> using >>>>>>> the mechanism that Flink provided, and also I don't need very precise >>>>>>> join. >>>>>>> So I prefer to let the join operator to calculate a backward timestamp >>>>>>> as >>>>>>> state, if the cluster restarts, the consumer can use >>>>>>> setStartFromTimestamp >>>>>>> to start from that timestamp. >>>>>>> >>>>>>> Now my problem is, consumer can't read the state that join operator >>>>>>> written, so I need a way to need small message (64bit long) from >>>>>>> downstream >>>>>>> to upstream. Redis may be a solution, but add external dependency is a >>>>>>> secondary option if I can pass this message through memory. >>>>>>> >>>>>>> >>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道: >>>>>>> >>>>>>>> It would be good if you could elaborate a bit more on your use-case. >>>>>>>> Are you using batch or streaming? What kind of "message" are we >>>>>>>> talking about? Why are you thinking of using a static variable, >>>>>>>> instead of >>>>>>>> just treating this message as part of the data(set/stream)? >>>>>>>> >>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote: >>>>>>>> >>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is send >>>>>>>> some messages from downstream operators to upstream operators, which I >>>>>>>> consider use static variable. >>>>>>>> >>>>>>>> But it makes me have to make sure in one taskmanager process it >>>>>>>> always has these two operators, can I use CoLocationGroup to solve this >>>>>>>> problem? Or can anyone give me an example to demostrate the usage >>>>>>>> of CoLocationGroup ? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards >>>>>>> >>>>>>> Sili Liu >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best regards >>>>> >>>>> Sili Liu >>>>> >>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >>> >>> -- >>> Best regards >>> >>> Sili Liu >>> >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > > > -- > Best regards > > Sili Liu > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng