Thanks for your reply! Seems state processor api can solve my problem, the state written by T operator's checkpoint can be read by main function when job restart. My question is, when streaming job restarts due to some reason, does the main function will also rerun again?
Arvid Heise <ar...@ververica.com> 于2020年11月23日周一 下午6:00写道: > If you would prefer to have T with parallelism 1, one complete alternative > solution would be to leave the timestamp in the state of T and extract the > timestamp from the savepoint/checkpoint upon start of the application using > the state processor API [1]. Unfortunately, it may be a bit hacky when you > do a normal recovery as there is not a single entrypoint (if you start new > you could just extract that timestamp from main()). Of course, you could > also store the information in an external storage but that would also make > the architecture more complicated. > > Let's see if anyone has an idea on the co-location topic. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu <unix...@gmail.com> wrote: > >> Thanks for your reply! >> >> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator >> should have 1 parallism in topo, also all A_i can start from the same >> timestamp, but some minor difference of resume timestamp in different A_i >> source is also acceptable. So I think multiple T operator is also ok to me >> here. But the prerequisite of this topo can work is I can make sure T and A >> always reside same TM. >> >> The problem here both stream A and stream B is very huge. 200k ~ 300k >> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after >> compressed) per messages, and I have to keep the whole message in cache. So >> it's hard to fit into Flink state. >> >> >> >> Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道: >> >>> Your topology is definitively interesting and makes sense to me on a >>> high level. The main question remaining is the parallelism. I'm assuming >>> you run your pipeline with parallelism p and both source A and >>> timestampcalculator T are run with parallelism p. You want to create a >>> situation where for A_i, there is an T_i which run in the same slot. Am I >>> right? >>> >>> If so, then as you have noticed that there is currently no way to >>> express that in Flink on a high level. One more idea before trying to solve >>> it in a hacky way: How large is B? Could use a broadcast to avoid the >>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T, >>> because then it's easy to produce an operator chain, where everything even >>> runs within the same thread. >>> >>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Thanks for your reply. >>>> >>>> I want to join two stream A and stream B. Items in stream A come in >>>> first then I keep them in memory cache, as join key and item, then serval >>>> minutes later the items in stream B come in then the join work is >>>> performed. The timestamp of the latest expired item in memory cache is the >>>> safe rollback timestamp, I can resume source A from that timestamp when I >>>> restart. >>>> >>>> It's not very percise, maybe lost same items or send same items twice, >>>> but seems useful to me in my situation. But if job restart, both source A >>>> and source B resume from last consumed offset, it will make the absense of >>>> serval minutes join result, which is unacceptable. >>>> >>>> The topo I consider is like >>>> >>>> source A -> parser --shuffle--> join -> sink >>>> source B -> parser ...(parallel) |--->timestampcalculator >>>> >>>> Memory cache aside in join operator, the join operator will broadcast >>>> the timestamp of latest expired cache item to the timestampcalculator. Then >>>> timestampcalculator will use them to calculate a safe rollback timestamp (a >>>> moving minimum) that source A can resume from that timestamp, source B will >>>> also restart from that timestamp. I will add a bloomfilter in sink's state >>>> to avoid duplicate items. >>>> >>>> So I want to let timestampcalculator operator and source A are located >>>> in one TM, then I can send this timestamp from timestampcalculator to >>>> source A by static variable. >>>> >>>> Hope I make my problem clear with my poor English, it seems a little >>>> tricky. But I think it's the only way to do two streams join and avoid to >>>> store very huge state. >>>> >>>> >>>> >>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道: >>>> >>>>> I still haven't fully understood. Do you mean you can't infer the >>>>> timestamp in source A because it depends on some internal field of source >>>>> B? >>>>> >>>>> How is that actually working in a parallel setting? Which timestamp is >>>>> used in the different instances of a source? >>>>> >>>>> Say, we have task A1 which is the first subtask of source A and task >>>>> B2 as the second subtask of source B. How would you like them to be >>>>> located? How does that correlate to the third subtask of the join (let's >>>>> call it J3). >>>>> >>>>> Remember that through the shuffling before the join there is no clear >>>>> correlation between any subtask of A or B to J... >>>>> >>>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Thanks for your help! >>>>>> >>>>>> Now the timestamps already go with the items in streaming. My >>>>>> streaming pipeline is like this: >>>>>> >>>>>> source -> parser --shuffle--> join -> sink >>>>>> >>>>>> Streaming A and streaming B go through this pipeline, I keep logs in >>>>>> streaming A in memory cache (linkedHashmap) in join operator, then all >>>>>> logs >>>>>> in streaming B tries to lookup up the cache and perform the actual join >>>>>> work. >>>>>> >>>>>> I try to use the timestamp of the lastest expire item in memory as a >>>>>> safe rollback timestamp, if I restart job, the source should use this >>>>>> timestamp as start offset. The safe rollback timestamp is calucated in >>>>>> join >>>>>> operator, but I want to use it in source. So the simplest way to pass >>>>>> this >>>>>> information from join operator to source is use static variable, which >>>>>> require source operator and join operator always locate in same TM >>>>>> process. >>>>>> >>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道: >>>>>> >>>>>>> Hi Si-li, >>>>>>> >>>>>>> couldn't you also add the timestamp as a state to the source? So the >>>>>>> time would store the timestamp of the last emitted record. >>>>>>> It's nearly identical to your solution but would fit the recovery >>>>>>> model of Flink much better. >>>>>>> If you want to go further back to account for the records that have >>>>>>> been actually processed in the join, you could also replay the data from >>>>>>> <last timestamp> - <some offset>. >>>>>>> >>>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks, I'll try it. >>>>>>>> >>>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道: >>>>>>>> >>>>>>>>> Hi Si-li, >>>>>>>>> trying to answer your initial question: Theoretically, you could >>>>>>>>> try using the co-location constraints to achieve this. But keep in >>>>>>>>> mind >>>>>>>>> that this might lead to multiple Join operators running in the same >>>>>>>>> JVM >>>>>>>>> reducing the amount of memory each operator can utilize. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Matthias >>>>>>>>> >>>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for your reply. >>>>>>>>>> >>>>>>>>>> It's a streaming job. The join operator is doing join work, such >>>>>>>>>> as join. The join state is too large so I don't want to keep the >>>>>>>>>> state >>>>>>>>>> using the mechanism that Flink provided, and also I don't need very >>>>>>>>>> precise >>>>>>>>>> join. So I prefer to let the join operator to calculate a backward >>>>>>>>>> timestamp as state, if the cluster restarts, the consumer can >>>>>>>>>> use setStartFromTimestamp to start from that timestamp. >>>>>>>>>> >>>>>>>>>> Now my problem is, consumer can't read the state that join >>>>>>>>>> operator written, so I need a way to need small message (64bit long) >>>>>>>>>> from >>>>>>>>>> downstream to upstream. Redis may be a solution, but add external >>>>>>>>>> dependency is a secondary option if I can pass this message through >>>>>>>>>> memory. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道: >>>>>>>>>> >>>>>>>>>>> It would be good if you could elaborate a bit more on your >>>>>>>>>>> use-case. >>>>>>>>>>> Are you using batch or streaming? What kind of "message" are we >>>>>>>>>>> talking about? Why are you thinking of using a static variable, >>>>>>>>>>> instead of >>>>>>>>>>> just treating this message as part of the data(set/stream)? >>>>>>>>>>> >>>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote: >>>>>>>>>>> >>>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is >>>>>>>>>>> send some messages from downstream operators to upstream operators, >>>>>>>>>>> which I >>>>>>>>>>> consider use static variable. >>>>>>>>>>> >>>>>>>>>>> But it makes me have to make sure in one taskmanager process it >>>>>>>>>>> always has these two operators, can I use CoLocationGroup to solve >>>>>>>>>>> this >>>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage >>>>>>>>>>> of CoLocationGroup ? >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> -- >>>>>>>>>>> Best regards >>>>>>>>>>> >>>>>>>>>>> Sili Liu >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards >>>>>>>>>> >>>>>>>>>> Sili Liu >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Arvid Heise | Senior Java Developer >>>>>>> >>>>>>> <https://www.ververica.com/> >>>>>>> >>>>>>> Follow us @VervericaData >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>> Conference >>>>>>> >>>>>>> Stream Processing | Event Driven | Real Time >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>> >>>>>>> -- >>>>>>> Ververica GmbH >>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>>> Ji (Toni) Cheng >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> Best regards >> >> Sili Liu >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > -- Best regards Sili Liu