Thanks for your reply. I want to join two stream A and stream B. Items in stream A come in first then I keep them in memory cache, as join key and item, then serval minutes later the items in stream B come in then the join work is performed. The timestamp of the latest expired item in memory cache is the safe rollback timestamp, I can resume source A from that timestamp when I restart.
It's not very percise, maybe lost same items or send same items twice, but seems useful to me in my situation. But if job restart, both source A and source B resume from last consumed offset, it will make the absense of serval minutes join result, which is unacceptable. The topo I consider is like source A -> parser --shuffle--> join -> sink source B -> parser ...(parallel) |--->timestampcalculator Memory cache aside in join operator, the join operator will broadcast the timestamp of latest expired cache item to the timestampcalculator. Then timestampcalculator will use them to calculate a safe rollback timestamp (a moving minimum) that source A can resume from that timestamp, source B will also restart from that timestamp. I will add a bloomfilter in sink's state to avoid duplicate items. So I want to let timestampcalculator operator and source A are located in one TM, then I can send this timestamp from timestampcalculator to source A by static variable. Hope I make my problem clear with my poor English, it seems a little tricky. But I think it's the only way to do two streams join and avoid to store very huge state. Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道: > I still haven't fully understood. Do you mean you can't infer the > timestamp in source A because it depends on some internal field of source B? > > How is that actually working in a parallel setting? Which timestamp is > used in the different instances of a source? > > Say, we have task A1 which is the first subtask of source A and task B2 as > the second subtask of source B. How would you like them to be located? How > does that correlate to the third subtask of the join (let's call it J3). > > Remember that through the shuffling before the join there is no clear > correlation between any subtask of A or B to J... > > On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote: > >> Thanks for your help! >> >> Now the timestamps already go with the items in streaming. My streaming >> pipeline is like this: >> >> source -> parser --shuffle--> join -> sink >> >> Streaming A and streaming B go through this pipeline, I keep logs in >> streaming A in memory cache (linkedHashmap) in join operator, then all logs >> in streaming B tries to lookup up the cache and perform the actual join >> work. >> >> I try to use the timestamp of the lastest expire item in memory as a safe >> rollback timestamp, if I restart job, the source should use this timestamp >> as start offset. The safe rollback timestamp is calucated in join operator, >> but I want to use it in source. So the simplest way to pass this >> information from join operator to source is use static variable, which >> require source operator and join operator always locate in same TM process. >> >> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道: >> >>> Hi Si-li, >>> >>> couldn't you also add the timestamp as a state to the source? So the >>> time would store the timestamp of the last emitted record. >>> It's nearly identical to your solution but would fit the recovery model >>> of Flink much better. >>> If you want to go further back to account for the records that have been >>> actually processed in the join, you could also replay the data from <last >>> timestamp> - <some offset>. >>> >>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Thanks, I'll try it. >>>> >>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道: >>>> >>>>> Hi Si-li, >>>>> trying to answer your initial question: Theoretically, you could try >>>>> using the co-location constraints to achieve this. But keep in mind that >>>>> this might lead to multiple Join operators running in the same JVM >>>>> reducing >>>>> the amount of memory each operator can utilize. >>>>> >>>>> Best, >>>>> Matthias >>>>> >>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Thanks for your reply. >>>>>> >>>>>> It's a streaming job. The join operator is doing join work, such as >>>>>> join. The join state is too large so I don't want to keep the state using >>>>>> the mechanism that Flink provided, and also I don't need very precise >>>>>> join. >>>>>> So I prefer to let the join operator to calculate a backward timestamp as >>>>>> state, if the cluster restarts, the consumer can use >>>>>> setStartFromTimestamp >>>>>> to start from that timestamp. >>>>>> >>>>>> Now my problem is, consumer can't read the state that join operator >>>>>> written, so I need a way to need small message (64bit long) from >>>>>> downstream >>>>>> to upstream. Redis may be a solution, but add external dependency is a >>>>>> secondary option if I can pass this message through memory. >>>>>> >>>>>> >>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道: >>>>>> >>>>>>> It would be good if you could elaborate a bit more on your use-case. >>>>>>> Are you using batch or streaming? What kind of "message" are we >>>>>>> talking about? Why are you thinking of using a static variable, instead >>>>>>> of >>>>>>> just treating this message as part of the data(set/stream)? >>>>>>> >>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote: >>>>>>> >>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is send >>>>>>> some messages from downstream operators to upstream operators, which I >>>>>>> consider use static variable. >>>>>>> >>>>>>> But it makes me have to make sure in one taskmanager process it >>>>>>> always has these two operators, can I use CoLocationGroup to solve this >>>>>>> problem? Or can anyone give me an example to demostrate the usage >>>>>>> of CoLocationGroup ? >>>>>>> >>>>>>> Thanks! >>>>>>> -- >>>>>>> Best regards >>>>>>> >>>>>>> Sili Liu >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> Best regards >> >> Sili Liu >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Best regards Sili Liu