Hi Kostas, Aljoscha, To answer Kostas’s concern, the algorithm works this way:
Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 time units. 1st cycle: Source-0 sends records with timestamp 1,2 and emit watermark 2. Throttle-0 has WM 2. Source-1 sends records with timestamp 1,2,3 and emit watermark 3. Throttle-1 has also WM 2. . . . 10th cycle: Source-0 sends records with timestamp 19, 20 and emit watermark 20. Throttle-0 has WM 20. Source-1 sends records with timestamp 28, 29, 30 and emit watermark 30. Throttle-1 has also WM 20. 11th cycle: Source-0 sends records with timestamp 21,22 and emit watermark 22. Throttle-0 has WM 22. Source-1 sends records with timestamp 31,32,33 and emit watermark 33. Since, Throttle-1 has a WM of 20 at the beginning of the cycle ,it will start sleeping a very short amount of time for each incoming record. This eventually causes a backpressure to Source-1 and only Source-1. Source-1 starts to poll less frequently from Kafka. For this algorithm to work each Throttler should receive records from only one source. Otherwise backpressure will be applied to both sources. I achive that using a custom partitioner and indexIds. Everything that comes from Source-n goes to Throttler-n. Since it is a custom partitioner watermarks gets broadcasted to all throttlers. The problem is I thought Source-0 and Throttler-0 will be colocated in the same taskmanager. Unfortunately this is not the case. Source-0 and Throttler-1 can end up in TM-0; Source-1 and Throttler-0 at TM-1. This causes a network shuffle, one more data serialization/deserialization. I want to avoid that if it is possible, since the stream is big. Regards, > On 28. Sep 2017, at 23:03, Aljoscha Krettek <aljos...@apache.org> wrote: > > To quickly make Kostas' intuition concrete: it's currently not possible to > have watermarks broadcast but the data be locally forwarded. The reason is > that watermarks and data travel in the same channels so if the watermark > needs to be broadcast there needs to be an n to m (in this case m == n) > connection pattern between the operations (tasks). > > I think your algorithm should work if you take the correct difference, i.e. > throttle when timestamp - "global watermark" > threshold. The inverted diff > would be "global watermark" - timestamp. I think you're already doing the > correct thing, just wanted to clarify for others who might be reading. > > Did you check on which TaskManagers the taskA and taskB operators run? I > think they should still be running on the same TM if resources permit. > > Best, > Aljoscha >> On 28. Sep 2017, at 10:25, Kostas Kloudas <k.klou...@data-artisans.com >> <mailto:k.klou...@data-artisans.com>> wrote: >> >> Hi Yunus, >> >> I see. Currently I am not sure that you can simply broadcast the watermark >> only, without >> having a shuffle. >> >> But one thing to notice about your algorithm is that, I am not sure if your >> algorithm solves >> the problem you encounter. >> >> Your algorithm seems to prioritize the stream with the elements with the >> smallest timestamps, >> rather than throttling fast streams so that slow ones can catch up. >> >> Example: Reading a partition from Kafka that has elements with timestamps >> 1,2,3 >> will emit watermark 3 (assuming ascending watermark extractor), while >> another task that reads >> another partition with elements with timestamps 5,6,7 will emit watermark 7. >> With your algorithm, >> if I get it right, you will throttle the second partition/task, while allow >> the first one to advance, although >> both read at the same pace (e.g. 3 elements per unit of time). >> >> I will think a bit more on the solution. >> >> Some sketches that I can find, they all introduce some latency, e.g. >> measuring throughput in taskA >> and sending it to a side output with a taksID, then broadcasting the side >> output to a downstream operator >> which is sth like a coprocess function (taskB) and receives the original >> stream and the side output, and >> this is the one that checks if “my task" is slow. >> >> As I said I will think on it a bit more, >> Kostas >> >>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunol...@gmail.com >>> <mailto:yunol...@gmail.com>> wrote: >>> >>> Hi Kostas, >>> >>> Yes, you have summarized well. I want to only forward the data to the next >>> local operator, but broadcast the watermark through the cluster. >>> >>> - I can’t set parallelism of taskB to 1. The stream is too big for that. >>> Also, the data is ordered at each partition. I don’t want to change that >>> order. >>> >>> - I don’t need KeyedStream. Also taskA and taskB will always have the same >>> parallelism with each other. But this parallelism can be increased in the >>> future. >>> >>> The use case is: The source is Kafka. At our peak hours or when we want to >>> run the streaming job with old data from Kafka, always the same thing >>> happens. Even at trivial jobs. Some consumers consumes faster than others. >>> They produce too much data to downstream but watermark advances slowly at >>> the speed of the slowest consumer. This extra data gets piled up at >>> downstream operators. When the downstream operator is an aggregation, it is >>> ok. But when it is a in-Flink join; state size gets too big, checkpoints >>> take much longer and overall the job becomes slower or fails. Also it >>> effects other jobs at the cluster. >>> >>> So, basically I want to implement a throttler. It compares timestamp of a >>> record and the global watermark. If the difference is larger than a >>> constant threshold it starts sleeping 1 ms for each incoming record. This >>> way, fast operators wait for the slowest one. >>> >>> The only problem is that, this solution came at the cost of one network >>> shuffle and data serialization/deserialization. Since the stream is large I >>> want to avoid the network shuffle at the least. >>> >>> I thought operator instances within a taskmanager would get the same >>> indexId, but apparently this is not the case. >>> >>> Thanks, >>> >>>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.klou...@data-artisans.com >>>> <mailto:k.klou...@data-artisans.com>> wrote: >>>> >>>> Hi Yunus, >>>> >>>> I am not sure if I understand correctly the question. >>>> >>>> Am I correct to assume that you want the following? >>>> >>>> ———————————> time >>>> >>>> ProcessA ProcessB >>>> >>>> Task1: W(3) E(1) E(2) E(5) W(3) W(7) E(1) E(2) E(5) >>>> >>>> Task2: W(7) E(3) E(10) E(6) W(3) W(7) E(3) E(10) >>>> E(6) >>>> >>>> >>>> In the above, elements flow from left to right and W() stands for >>>> watermark and E() stands for element. >>>> In other words, between Process(TaksA) and Process(TaskB) you want to only >>>> forward the elements, but broadcast the watermarks, right? >>>> >>>> If this is the case, a trivial solution would be to set the parallelism of >>>> TaskB to 1, so that all elements go through the same node. >>>> >>>> One other solution is what you did, BUT by using a custom partitioner you >>>> cannot use keyed state in your process function B because the >>>> stream is no longer keyed. >>>> >>>> A similar approach to what you did but without the limitation above, is >>>> that in the first processFunction (TaskA) you can append the >>>> taskId to the elements themselves and then do a keyBy(taskId) between the >>>> first and the second process function. >>>> >>>> These are the solutions that I can come up with, assuming that you want to >>>> do what I described. >>>> >>>> But in general, could you please describe a bit more what is your use >>>> case? >>>> This way we may figure out another approach to achieve your goal. >>>> In fact, I am not sure if you earn anything by broadcasting the watermark, >>>> other than >>>> re-implementing (to some extent) Flink’s windowing mechanism. >>>> >>>> Thanks, >>>> Kostas >>>> >>>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunol...@gmail.com >>>>> <mailto:yunol...@gmail.com>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I have a simple streaming job such as: >>>>> >>>>> source.process(taskA) >>>>> .process(taskB) >>>>> >>>>> I want taskB to access minimum watermark of all parallel taskA instances, >>>>> but the data is ordered and should not be shuffled. ForwardPartitioner >>>>> uses watermark of only one predecessor. So, I have used a >>>>> customPartitioner. >>>>> >>>>> source.process(taskA) >>>>> .map(AssignPartitionID) >>>>> .partitionCustom(IdPartitioner) >>>>> .map(StripPartitionID) >>>>> .process(taskB) >>>>> >>>>> At AssignPartitionID function, I attach >>>>> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the >>>>> object. At IdPartitioner, I return this partitionId. >>>>> >>>>> This solved the main requirement but I have another concern now, >>>>> >>>>> Network shuffle: I don’t need a network shuffle. I thought within a >>>>> taskmanager, indexId of taskA subtasks would be same as indexId of taskB >>>>> subtasks. Unfortunately, they are not. Is there a way to make >>>>> partitionCustom distribute data like ForwardPartitioner, to the next >>>>> local operator? >>>>> >>>>> As I know, this still requires object serialization/deserialization since >>>>> operators can’t be chained anymore. Is there a way to get minimum >>>>> watermark from upstream operators without network shuffle and object >>>>> serilization/deserialization? >>>>> >>>>> Regards, >>>> >>> >> >