Hi, As you noticed, Flink does currently not put Source-X and Throttler-X (for some X) in the same task slot (TaskManager). In the low-level execution system, there are two connection patterns: ALL_TO_ALL and POINTWISE. Flink will only schedule Source-X and Throttler-X on the same slot when the POINTWISE pattern is used. At the API level, only when there is a simple "forward" connection between operations in the streaming API will the POINTWISE pattern be used, for all other partitioning schemes ALL_TO_ALL is used with a custom (API-level partitioner).
It might be possible to change this, I'll get back to you once I investigated more. Best, Aljoscha > On 29. Sep 2017, at 00:05, Yunus Olgun <yunol...@gmail.com> wrote: > > Hi Kostas, Aljoscha, > > To answer Kostas’s concern, the algorithm works this way: > > Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and > Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 > time units. > > 1st cycle: Source-0 sends records with timestamp 1,2 and emit watermark 2. > Throttle-0 has WM 2. > Source-1 sends records with timestamp 1,2,3 and emit > watermark 3. Throttle-1 has also WM 2. > . > . > . > 10th cycle: Source-0 sends records with timestamp 19, 20 and emit watermark > 20. Throttle-0 has WM 20. > Source-1 sends records with timestamp 28, 29, 30 and emit > watermark 30. Throttle-1 has also WM 20. > > 11th cycle: Source-0 sends records with timestamp 21,22 and emit watermark > 22. Throttle-0 has WM 22. > Source-1 sends records with timestamp 31,32,33 and emit > watermark 33. Since, Throttle-1 has a WM of 20 at the beginning of the cycle > ,it will start sleeping a very short amount of time for each incoming record. > This eventually causes a backpressure to Source-1 and only Source-1. Source-1 > starts to poll less frequently from Kafka. > > For this algorithm to work each Throttler should receive records from only > one source. Otherwise backpressure will be applied to both sources. I achive > that using a custom partitioner and indexIds. Everything that comes from > Source-n goes to Throttler-n. Since it is a custom partitioner watermarks > gets broadcasted to all throttlers. > > The problem is I thought Source-0 and Throttler-0 will be colocated in the > same taskmanager. Unfortunately this is not the case. Source-0 and > Throttler-1 can end up in TM-0; Source-1 and Throttler-0 at TM-1. This causes > a network shuffle, one more data serialization/deserialization. I want to > avoid that if it is possible, since the stream is big. > > Regards, > >> On 28. Sep 2017, at 23:03, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> >> To quickly make Kostas' intuition concrete: it's currently not possible to >> have watermarks broadcast but the data be locally forwarded. The reason is >> that watermarks and data travel in the same channels so if the watermark >> needs to be broadcast there needs to be an n to m (in this case m == n) >> connection pattern between the operations (tasks). >> >> I think your algorithm should work if you take the correct difference, i.e. >> throttle when timestamp - "global watermark" > threshold. The inverted diff >> would be "global watermark" - timestamp. I think you're already doing the >> correct thing, just wanted to clarify for others who might be reading. >> >> Did you check on which TaskManagers the taskA and taskB operators run? I >> think they should still be running on the same TM if resources permit. >> >> Best, >> Aljoscha >>> On 28. Sep 2017, at 10:25, Kostas Kloudas <k.klou...@data-artisans.com >>> <mailto:k.klou...@data-artisans.com>> wrote: >>> >>> Hi Yunus, >>> >>> I see. Currently I am not sure that you can simply broadcast the watermark >>> only, without >>> having a shuffle. >>> >>> But one thing to notice about your algorithm is that, I am not sure if your >>> algorithm solves >>> the problem you encounter. >>> >>> Your algorithm seems to prioritize the stream with the elements with the >>> smallest timestamps, >>> rather than throttling fast streams so that slow ones can catch up. >>> >>> Example: Reading a partition from Kafka that has elements with timestamps >>> 1,2,3 >>> will emit watermark 3 (assuming ascending watermark extractor), while >>> another task that reads >>> another partition with elements with timestamps 5,6,7 will emit watermark >>> 7. With your algorithm, >>> if I get it right, you will throttle the second partition/task, while allow >>> the first one to advance, although >>> both read at the same pace (e.g. 3 elements per unit of time). >>> >>> I will think a bit more on the solution. >>> >>> Some sketches that I can find, they all introduce some latency, e.g. >>> measuring throughput in taskA >>> and sending it to a side output with a taksID, then broadcasting the side >>> output to a downstream operator >>> which is sth like a coprocess function (taskB) and receives the original >>> stream and the side output, and >>> this is the one that checks if “my task" is slow. >>> >>> As I said I will think on it a bit more, >>> Kostas >>> >>>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunol...@gmail.com >>>> <mailto:yunol...@gmail.com>> wrote: >>>> >>>> Hi Kostas, >>>> >>>> Yes, you have summarized well. I want to only forward the data to the next >>>> local operator, but broadcast the watermark through the cluster. >>>> >>>> - I can’t set parallelism of taskB to 1. The stream is too big for that. >>>> Also, the data is ordered at each partition. I don’t want to change that >>>> order. >>>> >>>> - I don’t need KeyedStream. Also taskA and taskB will always have the same >>>> parallelism with each other. But this parallelism can be increased in the >>>> future. >>>> >>>> The use case is: The source is Kafka. At our peak hours or when we want to >>>> run the streaming job with old data from Kafka, always the same thing >>>> happens. Even at trivial jobs. Some consumers consumes faster than others. >>>> They produce too much data to downstream but watermark advances slowly at >>>> the speed of the slowest consumer. This extra data gets piled up at >>>> downstream operators. When the downstream operator is an aggregation, it >>>> is ok. But when it is a in-Flink join; state size gets too big, >>>> checkpoints take much longer and overall the job becomes slower or fails. >>>> Also it effects other jobs at the cluster. >>>> >>>> So, basically I want to implement a throttler. It compares timestamp of a >>>> record and the global watermark. If the difference is larger than a >>>> constant threshold it starts sleeping 1 ms for each incoming record. This >>>> way, fast operators wait for the slowest one. >>>> >>>> The only problem is that, this solution came at the cost of one network >>>> shuffle and data serialization/deserialization. Since the stream is large >>>> I want to avoid the network shuffle at the least. >>>> >>>> I thought operator instances within a taskmanager would get the same >>>> indexId, but apparently this is not the case. >>>> >>>> Thanks, >>>> >>>>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.klou...@data-artisans.com >>>>> <mailto:k.klou...@data-artisans.com>> wrote: >>>>> >>>>> Hi Yunus, >>>>> >>>>> I am not sure if I understand correctly the question. >>>>> >>>>> Am I correct to assume that you want the following? >>>>> >>>>> ———————————> time >>>>> >>>>> ProcessA ProcessB >>>>> >>>>> Task1: W(3) E(1) E(2) E(5) W(3) W(7) E(1) E(2) E(5) >>>>> >>>>> Task2: W(7) E(3) E(10) E(6) W(3) W(7) E(3) E(10) >>>>> E(6) >>>>> >>>>> >>>>> In the above, elements flow from left to right and W() stands for >>>>> watermark and E() stands for element. >>>>> In other words, between Process(TaksA) and Process(TaskB) you want to >>>>> only forward the elements, but broadcast the watermarks, right? >>>>> >>>>> If this is the case, a trivial solution would be to set the parallelism >>>>> of TaskB to 1, so that all elements go through the same node. >>>>> >>>>> One other solution is what you did, BUT by using a custom partitioner you >>>>> cannot use keyed state in your process function B because the >>>>> stream is no longer keyed. >>>>> >>>>> A similar approach to what you did but without the limitation above, is >>>>> that in the first processFunction (TaskA) you can append the >>>>> taskId to the elements themselves and then do a keyBy(taskId) between the >>>>> first and the second process function. >>>>> >>>>> These are the solutions that I can come up with, assuming that you want >>>>> to do what I described. >>>>> >>>>> But in general, could you please describe a bit more what is your use >>>>> case? >>>>> This way we may figure out another approach to achieve your goal. >>>>> In fact, I am not sure if you earn anything by broadcasting the >>>>> watermark, other than >>>>> re-implementing (to some extent) Flink’s windowing mechanism. >>>>> >>>>> Thanks, >>>>> Kostas >>>>> >>>>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunol...@gmail.com >>>>>> <mailto:yunol...@gmail.com>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I have a simple streaming job such as: >>>>>> >>>>>> source.process(taskA) >>>>>> .process(taskB) >>>>>> >>>>>> I want taskB to access minimum watermark of all parallel taskA >>>>>> instances, but the data is ordered and should not be shuffled. >>>>>> ForwardPartitioner uses watermark of only one predecessor. So, I have >>>>>> used a customPartitioner. >>>>>> >>>>>> source.process(taskA) >>>>>> .map(AssignPartitionID) >>>>>> .partitionCustom(IdPartitioner) >>>>>> .map(StripPartitionID) >>>>>> .process(taskB) >>>>>> >>>>>> At AssignPartitionID function, I attach >>>>>> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the >>>>>> object. At IdPartitioner, I return this partitionId. >>>>>> >>>>>> This solved the main requirement but I have another concern now, >>>>>> >>>>>> Network shuffle: I don’t need a network shuffle. I thought within a >>>>>> taskmanager, indexId of taskA subtasks would be same as indexId of taskB >>>>>> subtasks. Unfortunately, they are not. Is there a way to make >>>>>> partitionCustom distribute data like ForwardPartitioner, to the next >>>>>> local operator? >>>>>> >>>>>> As I know, this still requires object serialization/deserialization >>>>>> since operators can’t be chained anymore. Is there a way to get minimum >>>>>> watermark from upstream operators without network shuffle and object >>>>>> serilization/deserialization? >>>>>> >>>>>> Regards, >>>>> >>>> >>> >> >