Hi Si-li Liu,

if you want to run T with a parallelism of 1, then your parallelism of A
should be limited by the total number of slots on your TM. Otherwise you
would have some A_i which are not running on a machine with T.

For the approach with the colocation constraint, you can take a look at
Transformation.setCoLocationGroupKey() [1]. Using this API one can define
operators whose sub tasks need to run on the same machine (e.g. A_i runs
together with B_i on the same machine, even in the same slot). However,
this is pretty much an internal feature which might change in future
versions.

What I did not fully understand is what should happen if your TM dies.
Wouldn't then the information of T be lost and the sources would start from
offset 0 again? According to your explanation, this should be intolerable
given the business requirements.

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426

Cheers,
Till

On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise <ar...@ververica.com> wrote:

> If you would prefer to have T with parallelism 1, one complete alternative
> solution would be to leave the timestamp in the state of T and extract the
> timestamp from the savepoint/checkpoint upon start of the application using
> the state processor API [1]. Unfortunately, it may be a bit hacky when you
> do a normal recovery as there is not a single entrypoint (if you start new
> you could just extract that timestamp from main()). Of course, you could
> also store the information in an external storage but that would also make
> the architecture more complicated.
>
> Let's see if anyone has an idea on the co-location topic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu <unix...@gmail.com> wrote:
>
>> Thanks for your reply!
>>
>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>> should have 1 parallism in topo, also all A_i can start from the same
>> timestamp, but some minor difference of resume timestamp in different A_i
>> source is also acceptable. So I think multiple T operator is also ok to me
>> here. But the prerequisite of this topo can work is I can make sure T and A
>> always reside same TM.
>>
>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>> compressed) per messages, and I have to keep the whole message in cache. So
>> it's hard to fit into Flink state.
>>
>>
>>
>> Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道:
>>
>>> Your topology is definitively interesting and makes sense to me on a
>>> high level. The main question remaining is the parallelism. I'm assuming
>>> you run your pipeline with parallelism p and both source A and
>>> timestampcalculator T are run with parallelism p. You want to create a
>>> situation where for A_i, there is an T_i which run in the same slot. Am I
>>> right?
>>>
>>> If so, then as you have noticed that there is currently no way to
>>> express that in Flink on a high level. One more idea before trying to solve
>>> it in a hacky way: How large is B? Could use a broadcast to avoid the
>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
>>> because then it's easy to produce an operator chain, where everything even
>>> runs within the same thread.
>>>
>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote:
>>>
>>>> Thanks for your reply.
>>>>
>>>> I want to join two stream A and stream B. Items in stream A come in
>>>> first then I keep them in memory cache, as join key and item, then serval
>>>> minutes later the items in stream B come in then the join work is
>>>> performed. The timestamp of the latest expired item in memory cache is the
>>>> safe rollback timestamp, I can resume source A from that timestamp when I
>>>> restart.
>>>>
>>>> It's not very percise, maybe lost same items or send same items twice,
>>>> but seems useful to me in my situation. But if job restart, both source A
>>>> and source B resume from last consumed offset, it will make the absense of
>>>> serval minutes join result, which is unacceptable.
>>>>
>>>> The topo I consider is like
>>>>
>>>> source A -> parser --shuffle--> join -> sink
>>>> source B -> parser ...(parallel)      |--->timestampcalculator
>>>>
>>>> Memory cache aside in join operator, the join operator will broadcast
>>>> the timestamp of latest expired cache item to the timestampcalculator. Then
>>>> timestampcalculator will use them to calculate a safe rollback timestamp (a
>>>> moving minimum) that source A can resume from that timestamp, source B will
>>>> also restart from that timestamp. I will add a bloomfilter in sink's state
>>>> to avoid duplicate items.
>>>>
>>>> So I want to let timestampcalculator operator and source A are located
>>>> in one TM, then I can send this timestamp from timestampcalculator to
>>>> source A by static variable.
>>>>
>>>> Hope I make my problem clear with my poor English, it seems a little
>>>> tricky. But I think it's the only way to do two streams join and avoid to
>>>> store very huge state.
>>>>
>>>>
>>>>
>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道:
>>>>
>>>>> I still haven't fully understood. Do you mean you can't infer the
>>>>> timestamp in source A because it depends on some internal field of source 
>>>>> B?
>>>>>
>>>>> How is that actually working in a parallel setting? Which timestamp is
>>>>> used in the different instances of a source?
>>>>>
>>>>> Say, we have task A1 which is the first subtask of source A and task
>>>>> B2 as the second subtask of source B. How would you like them to be
>>>>> located? How does that correlate to the third subtask of the join (let's
>>>>> call it J3).
>>>>>
>>>>> Remember that through the shuffling before the join there is no clear
>>>>> correlation between any subtask of A or B to J...
>>>>>
>>>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for your help!
>>>>>>
>>>>>> Now the timestamps already go with the items in streaming. My
>>>>>> streaming pipeline is like this:
>>>>>>
>>>>>> source -> parser --shuffle--> join -> sink
>>>>>>
>>>>>> Streaming A and streaming B go through this pipeline, I keep logs in
>>>>>> streaming A in memory cache (linkedHashmap) in join operator, then all 
>>>>>> logs
>>>>>> in streaming B tries to lookup up the cache and perform the actual join
>>>>>> work.
>>>>>>
>>>>>> I try to use the timestamp of the lastest expire item in memory as a
>>>>>> safe rollback timestamp, if I restart job, the source should use this
>>>>>> timestamp as start offset. The safe rollback timestamp is calucated in 
>>>>>> join
>>>>>> operator, but I want to use it in source. So the simplest way to pass 
>>>>>> this
>>>>>> information from join operator to source is use static variable, which
>>>>>> require source operator and join operator always locate in same TM 
>>>>>> process.
>>>>>>
>>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道:
>>>>>>
>>>>>>> Hi Si-li,
>>>>>>>
>>>>>>> couldn't you also add the timestamp as a state to the source? So the
>>>>>>> time would store the timestamp of the last emitted record.
>>>>>>> It's nearly identical to your solution but would fit the recovery
>>>>>>> model of Flink much better.
>>>>>>> If you want to go further back to account for the records that have
>>>>>>> been actually processed in the join, you could also replay the data from
>>>>>>> <last timestamp> - <some offset>.
>>>>>>>
>>>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks, I'll try it.
>>>>>>>>
>>>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道:
>>>>>>>>
>>>>>>>>> Hi Si-li,
>>>>>>>>> trying to answer your initial question: Theoretically, you could
>>>>>>>>> try using the co-location constraints to achieve this. But keep in 
>>>>>>>>> mind
>>>>>>>>> that this might lead to multiple Join operators running in the same 
>>>>>>>>> JVM
>>>>>>>>> reducing the amount of memory each operator can utilize.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Matthias
>>>>>>>>>
>>>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for your reply.
>>>>>>>>>>
>>>>>>>>>> It's a streaming job. The join operator is doing join work, such
>>>>>>>>>> as join. The join state is too large so I don't want to keep the 
>>>>>>>>>> state
>>>>>>>>>> using the mechanism that Flink provided, and also I don't need very 
>>>>>>>>>> precise
>>>>>>>>>> join. So I prefer to let the join operator to calculate a backward
>>>>>>>>>> timestamp as state, if the cluster restarts, the consumer can
>>>>>>>>>> use setStartFromTimestamp to start from that timestamp.
>>>>>>>>>>
>>>>>>>>>> Now my problem is, consumer can't read the state that join
>>>>>>>>>> operator written, so I need a way to need small message (64bit long) 
>>>>>>>>>> from
>>>>>>>>>> downstream to upstream. Redis may be a solution, but add external
>>>>>>>>>> dependency is a secondary option if I can pass this message through 
>>>>>>>>>> memory.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道:
>>>>>>>>>>
>>>>>>>>>>> It would be good if you could elaborate a bit more on your
>>>>>>>>>>> use-case.
>>>>>>>>>>> Are you using batch or streaming? What kind of "message" are we
>>>>>>>>>>> talking about? Why are you thinking of using a static variable, 
>>>>>>>>>>> instead of
>>>>>>>>>>> just treating this message as part of the data(set/stream)?
>>>>>>>>>>>
>>>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>>>>>>>>>
>>>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is
>>>>>>>>>>> send some messages from downstream operators to upstream operators, 
>>>>>>>>>>> which I
>>>>>>>>>>> consider use static variable.
>>>>>>>>>>>
>>>>>>>>>>> But it makes me have to make sure in one taskmanager process it
>>>>>>>>>>> always has these two operators, can I use CoLocationGroup to solve 
>>>>>>>>>>> this
>>>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage
>>>>>>>>>>> of CoLocationGroup ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> --
>>>>>>>>>>> Best regards
>>>>>>>>>>>
>>>>>>>>>>> Sili Liu
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards
>>>>>>>>>>
>>>>>>>>>> Sili Liu
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Arvid Heise | Senior Java Developer
>>>>>>>
>>>>>>> <https://www.ververica.com/>
>>>>>>>
>>>>>>> Follow us @VervericaData
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>> Conference
>>>>>>>
>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>
>>>>>>> --
>>>>>>> Ververica GmbH
>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>>> Ji (Toni) Cheng
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>> Best regards
>>
>> Sili Liu
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to