Thanks for your reply! Yes, I want to A_i and T_i run in the same slot. Ideally, T operator should have 1 parallism in topo, also all A_i can start from the same timestamp, but some minor difference of resume timestamp in different A_i source is also acceptable. So I think multiple T operator is also ok to me here. But the prerequisite of this topo can work is I can make sure T and A always reside same TM.
The problem here both stream A and stream B is very huge. 200k ~ 300k messages per seconds in both stream, with 1k bytes ~ 2k bytes (after compressed) per messages, and I have to keep the whole message in cache. So it's hard to fit into Flink state. Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道: > Your topology is definitively interesting and makes sense to me on a high > level. The main question remaining is the parallelism. I'm assuming you run > your pipeline with parallelism p and both source A and timestampcalculator > T are run with parallelism p. You want to create a situation where for A_i, > there is an T_i which run in the same slot. Am I right? > > If so, then as you have noticed that there is currently no way to express > that in Flink on a high level. One more idea before trying to solve it in a > hacky way: How large is B? Could use a broadcast to avoid the shuffle on A? > I'm thinking of creating a pipeline A->J(side input B)->T, because then > it's easy to produce an operator chain, where everything even runs within > the same thread. > > On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote: > >> Thanks for your reply. >> >> I want to join two stream A and stream B. Items in stream A come in first >> then I keep them in memory cache, as join key and item, then serval >> minutes later the items in stream B come in then the join work is >> performed. The timestamp of the latest expired item in memory cache is the >> safe rollback timestamp, I can resume source A from that timestamp when I >> restart. >> >> It's not very percise, maybe lost same items or send same items twice, >> but seems useful to me in my situation. But if job restart, both source A >> and source B resume from last consumed offset, it will make the absense of >> serval minutes join result, which is unacceptable. >> >> The topo I consider is like >> >> source A -> parser --shuffle--> join -> sink >> source B -> parser ...(parallel) |--->timestampcalculator >> >> Memory cache aside in join operator, the join operator will broadcast the >> timestamp of latest expired cache item to the timestampcalculator. Then >> timestampcalculator will use them to calculate a safe rollback timestamp (a >> moving minimum) that source A can resume from that timestamp, source B will >> also restart from that timestamp. I will add a bloomfilter in sink's state >> to avoid duplicate items. >> >> So I want to let timestampcalculator operator and source A are located in >> one TM, then I can send this timestamp from timestampcalculator to source A >> by static variable. >> >> Hope I make my problem clear with my poor English, it seems a little >> tricky. But I think it's the only way to do two streams join and avoid to >> store very huge state. >> >> >> >> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道: >> >>> I still haven't fully understood. Do you mean you can't infer the >>> timestamp in source A because it depends on some internal field of source B? >>> >>> How is that actually working in a parallel setting? Which timestamp is >>> used in the different instances of a source? >>> >>> Say, we have task A1 which is the first subtask of source A and task B2 >>> as the second subtask of source B. How would you like them to be located? >>> How does that correlate to the third subtask of the join (let's call it J3). >>> >>> Remember that through the shuffling before the join there is no clear >>> correlation between any subtask of A or B to J... >>> >>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Thanks for your help! >>>> >>>> Now the timestamps already go with the items in streaming. My streaming >>>> pipeline is like this: >>>> >>>> source -> parser --shuffle--> join -> sink >>>> >>>> Streaming A and streaming B go through this pipeline, I keep logs in >>>> streaming A in memory cache (linkedHashmap) in join operator, then all logs >>>> in streaming B tries to lookup up the cache and perform the actual join >>>> work. >>>> >>>> I try to use the timestamp of the lastest expire item in memory as a >>>> safe rollback timestamp, if I restart job, the source should use this >>>> timestamp as start offset. The safe rollback timestamp is calucated in join >>>> operator, but I want to use it in source. So the simplest way to pass this >>>> information from join operator to source is use static variable, which >>>> require source operator and join operator always locate in same TM process. >>>> >>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道: >>>> >>>>> Hi Si-li, >>>>> >>>>> couldn't you also add the timestamp as a state to the source? So the >>>>> time would store the timestamp of the last emitted record. >>>>> It's nearly identical to your solution but would fit the recovery >>>>> model of Flink much better. >>>>> If you want to go further back to account for the records that have >>>>> been actually processed in the join, you could also replay the data from >>>>> <last timestamp> - <some offset>. >>>>> >>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Thanks, I'll try it. >>>>>> >>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道: >>>>>> >>>>>>> Hi Si-li, >>>>>>> trying to answer your initial question: Theoretically, you could try >>>>>>> using the co-location constraints to achieve this. But keep in mind that >>>>>>> this might lead to multiple Join operators running in the same JVM >>>>>>> reducing >>>>>>> the amount of memory each operator can utilize. >>>>>>> >>>>>>> Best, >>>>>>> Matthias >>>>>>> >>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks for your reply. >>>>>>>> >>>>>>>> It's a streaming job. The join operator is doing join work, such as >>>>>>>> join. The join state is too large so I don't want to keep the state >>>>>>>> using >>>>>>>> the mechanism that Flink provided, and also I don't need very precise >>>>>>>> join. >>>>>>>> So I prefer to let the join operator to calculate a backward timestamp >>>>>>>> as >>>>>>>> state, if the cluster restarts, the consumer can use >>>>>>>> setStartFromTimestamp >>>>>>>> to start from that timestamp. >>>>>>>> >>>>>>>> Now my problem is, consumer can't read the state that join operator >>>>>>>> written, so I need a way to need small message (64bit long) from >>>>>>>> downstream >>>>>>>> to upstream. Redis may be a solution, but add external dependency is a >>>>>>>> secondary option if I can pass this message through memory. >>>>>>>> >>>>>>>> >>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道: >>>>>>>> >>>>>>>>> It would be good if you could elaborate a bit more on your >>>>>>>>> use-case. >>>>>>>>> Are you using batch or streaming? What kind of "message" are we >>>>>>>>> talking about? Why are you thinking of using a static variable, >>>>>>>>> instead of >>>>>>>>> just treating this message as part of the data(set/stream)? >>>>>>>>> >>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote: >>>>>>>>> >>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is send >>>>>>>>> some messages from downstream operators to upstream operators, which I >>>>>>>>> consider use static variable. >>>>>>>>> >>>>>>>>> But it makes me have to make sure in one taskmanager process it >>>>>>>>> always has these two operators, can I use CoLocationGroup to solve >>>>>>>>> this >>>>>>>>> problem? Or can anyone give me an example to demostrate the usage >>>>>>>>> of CoLocationGroup ? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> -- >>>>>>>>> Best regards >>>>>>>>> >>>>>>>>> Sili Liu >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> Best regards >> >> Sili Liu >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Best regards Sili Liu