Hi Si-li Liu, if you want to run T with a parallelism of 1, then your parallelism of A should be limited by the total number of slots on your TM. Otherwise you would have some A_i which are not running on a machine with T.
For the approach with the colocation constraint, you can take a look at Transformation.setCoLocationGroupKey() [1]. Using this API one can define operators whose sub tasks need to run on the same machine (e.g. A_i runs together with B_i on the same machine, even in the same slot). However, this is pretty much an internal feature which might change in future versions. What I did not fully understand is what should happen if your TM dies. Wouldn't then the information of T be lost and the sources would start from offset 0 again? According to your explanation, this should be intolerable given the business requirements. [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426 Cheers, Till On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise <ar...@ververica.com> wrote: > If you would prefer to have T with parallelism 1, one complete alternative > solution would be to leave the timestamp in the state of T and extract the > timestamp from the savepoint/checkpoint upon start of the application using > the state processor API [1]. Unfortunately, it may be a bit hacky when you > do a normal recovery as there is not a single entrypoint (if you start new > you could just extract that timestamp from main()). Of course, you could > also store the information in an external storage but that would also make > the architecture more complicated. > > Let's see if anyone has an idea on the co-location topic. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu <unix...@gmail.com> wrote: > >> Thanks for your reply! >> >> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator >> should have 1 parallism in topo, also all A_i can start from the same >> timestamp, but some minor difference of resume timestamp in different A_i >> source is also acceptable. So I think multiple T operator is also ok to me >> here. But the prerequisite of this topo can work is I can make sure T and A >> always reside same TM. >> >> The problem here both stream A and stream B is very huge. 200k ~ 300k >> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after >> compressed) per messages, and I have to keep the whole message in cache. So >> it's hard to fit into Flink state. >> >> >> >> Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道: >> >>> Your topology is definitively interesting and makes sense to me on a >>> high level. The main question remaining is the parallelism. I'm assuming >>> you run your pipeline with parallelism p and both source A and >>> timestampcalculator T are run with parallelism p. You want to create a >>> situation where for A_i, there is an T_i which run in the same slot. Am I >>> right? >>> >>> If so, then as you have noticed that there is currently no way to >>> express that in Flink on a high level. One more idea before trying to solve >>> it in a hacky way: How large is B? Could use a broadcast to avoid the >>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T, >>> because then it's easy to produce an operator chain, where everything even >>> runs within the same thread. >>> >>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Thanks for your reply. >>>> >>>> I want to join two stream A and stream B. Items in stream A come in >>>> first then I keep them in memory cache, as join key and item, then serval >>>> minutes later the items in stream B come in then the join work is >>>> performed. The timestamp of the latest expired item in memory cache is the >>>> safe rollback timestamp, I can resume source A from that timestamp when I >>>> restart. >>>> >>>> It's not very percise, maybe lost same items or send same items twice, >>>> but seems useful to me in my situation. But if job restart, both source A >>>> and source B resume from last consumed offset, it will make the absense of >>>> serval minutes join result, which is unacceptable. >>>> >>>> The topo I consider is like >>>> >>>> source A -> parser --shuffle--> join -> sink >>>> source B -> parser ...(parallel) |--->timestampcalculator >>>> >>>> Memory cache aside in join operator, the join operator will broadcast >>>> the timestamp of latest expired cache item to the timestampcalculator. Then >>>> timestampcalculator will use them to calculate a safe rollback timestamp (a >>>> moving minimum) that source A can resume from that timestamp, source B will >>>> also restart from that timestamp. I will add a bloomfilter in sink's state >>>> to avoid duplicate items. >>>> >>>> So I want to let timestampcalculator operator and source A are located >>>> in one TM, then I can send this timestamp from timestampcalculator to >>>> source A by static variable. >>>> >>>> Hope I make my problem clear with my poor English, it seems a little >>>> tricky. But I think it's the only way to do two streams join and avoid to >>>> store very huge state. >>>> >>>> >>>> >>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道: >>>> >>>>> I still haven't fully understood. Do you mean you can't infer the >>>>> timestamp in source A because it depends on some internal field of source >>>>> B? >>>>> >>>>> How is that actually working in a parallel setting? Which timestamp is >>>>> used in the different instances of a source? >>>>> >>>>> Say, we have task A1 which is the first subtask of source A and task >>>>> B2 as the second subtask of source B. How would you like them to be >>>>> located? How does that correlate to the third subtask of the join (let's >>>>> call it J3). >>>>> >>>>> Remember that through the shuffling before the join there is no clear >>>>> correlation between any subtask of A or B to J... >>>>> >>>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Thanks for your help! >>>>>> >>>>>> Now the timestamps already go with the items in streaming. My >>>>>> streaming pipeline is like this: >>>>>> >>>>>> source -> parser --shuffle--> join -> sink >>>>>> >>>>>> Streaming A and streaming B go through this pipeline, I keep logs in >>>>>> streaming A in memory cache (linkedHashmap) in join operator, then all >>>>>> logs >>>>>> in streaming B tries to lookup up the cache and perform the actual join >>>>>> work. >>>>>> >>>>>> I try to use the timestamp of the lastest expire item in memory as a >>>>>> safe rollback timestamp, if I restart job, the source should use this >>>>>> timestamp as start offset. The safe rollback timestamp is calucated in >>>>>> join >>>>>> operator, but I want to use it in source. So the simplest way to pass >>>>>> this >>>>>> information from join operator to source is use static variable, which >>>>>> require source operator and join operator always locate in same TM >>>>>> process. >>>>>> >>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道: >>>>>> >>>>>>> Hi Si-li, >>>>>>> >>>>>>> couldn't you also add the timestamp as a state to the source? So the >>>>>>> time would store the timestamp of the last emitted record. >>>>>>> It's nearly identical to your solution but would fit the recovery >>>>>>> model of Flink much better. >>>>>>> If you want to go further back to account for the records that have >>>>>>> been actually processed in the join, you could also replay the data from >>>>>>> <last timestamp> - <some offset>. >>>>>>> >>>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks, I'll try it. >>>>>>>> >>>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道: >>>>>>>> >>>>>>>>> Hi Si-li, >>>>>>>>> trying to answer your initial question: Theoretically, you could >>>>>>>>> try using the co-location constraints to achieve this. But keep in >>>>>>>>> mind >>>>>>>>> that this might lead to multiple Join operators running in the same >>>>>>>>> JVM >>>>>>>>> reducing the amount of memory each operator can utilize. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Matthias >>>>>>>>> >>>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for your reply. >>>>>>>>>> >>>>>>>>>> It's a streaming job. The join operator is doing join work, such >>>>>>>>>> as join. The join state is too large so I don't want to keep the >>>>>>>>>> state >>>>>>>>>> using the mechanism that Flink provided, and also I don't need very >>>>>>>>>> precise >>>>>>>>>> join. So I prefer to let the join operator to calculate a backward >>>>>>>>>> timestamp as state, if the cluster restarts, the consumer can >>>>>>>>>> use setStartFromTimestamp to start from that timestamp. >>>>>>>>>> >>>>>>>>>> Now my problem is, consumer can't read the state that join >>>>>>>>>> operator written, so I need a way to need small message (64bit long) >>>>>>>>>> from >>>>>>>>>> downstream to upstream. Redis may be a solution, but add external >>>>>>>>>> dependency is a secondary option if I can pass this message through >>>>>>>>>> memory. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道: >>>>>>>>>> >>>>>>>>>>> It would be good if you could elaborate a bit more on your >>>>>>>>>>> use-case. >>>>>>>>>>> Are you using batch or streaming? What kind of "message" are we >>>>>>>>>>> talking about? Why are you thinking of using a static variable, >>>>>>>>>>> instead of >>>>>>>>>>> just treating this message as part of the data(set/stream)? >>>>>>>>>>> >>>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote: >>>>>>>>>>> >>>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is >>>>>>>>>>> send some messages from downstream operators to upstream operators, >>>>>>>>>>> which I >>>>>>>>>>> consider use static variable. >>>>>>>>>>> >>>>>>>>>>> But it makes me have to make sure in one taskmanager process it >>>>>>>>>>> always has these two operators, can I use CoLocationGroup to solve >>>>>>>>>>> this >>>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage >>>>>>>>>>> of CoLocationGroup ? >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> -- >>>>>>>>>>> Best regards >>>>>>>>>>> >>>>>>>>>>> Sili Liu >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards >>>>>>>>>> >>>>>>>>>> Sili Liu >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Arvid Heise | Senior Java Developer >>>>>>> >>>>>>> <https://www.ververica.com/> >>>>>>> >>>>>>> Follow us @VervericaData >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>> Conference >>>>>>> >>>>>>> Stream Processing | Event Driven | Real Time >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>> >>>>>>> -- >>>>>>> Ververica GmbH >>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>>> Ji (Toni) Cheng >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> >> >> -- >> Best regards >> >> Sili Liu >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >