Hi Kostas, Aljoscha,

To answer Kostas’s concern, the algorithm works this way:

Let’s say we have two sources Source-0 and Source-1. Source-0 is slow and 
Source-1 is fast. Sources read from Kafka at different paces. Threshold is 10 
time units.

1st cycle: Source-0 sends records with timestamp 1,2 and emit watermark 2. 
Throttle-0 has WM 2.
                Source-1 sends records with timestamp 1,2,3 and emit watermark 
3. Throttle-1 has also WM 2.
.
.
.
10th cycle: Source-0 sends records with timestamp 19, 20 and emit watermark 20. 
Throttle-0 has WM 20.
                  Source-1 sends records with timestamp 28, 29, 30 and emit 
watermark 30. Throttle-1 has also WM 20.

11th cycle: Source-0 sends records with timestamp 21,22 and emit watermark 22. 
Throttle-0 has WM 22.
                  Source-1 sends records with timestamp 31,32,33 and emit 
watermark 33. Since, Throttle-1 has a WM of 20 at the beginning of the cycle 
,it will start sleeping a very short amount of time for each incoming record. 
This eventually causes a backpressure to Source-1 and only Source-1. Source-1 
starts to poll less frequently from Kafka.

For this algorithm to work each Throttler should receive records from only one 
source. Otherwise backpressure will be applied to both sources. I achive that 
using a custom partitioner and indexIds. Everything that comes from Source-n 
goes to Throttler-n. Since it is a custom partitioner watermarks gets 
broadcasted to all throttlers.

The problem is I thought Source-0 and Throttler-0 will be colocated in the same 
taskmanager. Unfortunately this is not the case. Source-0 and Throttler-1 can 
end up in TM-0; Source-1 and Throttler-0 at TM-1. This causes a network 
shuffle, one more data serialization/deserialization. I want to avoid that if 
it is possible, since the stream is big.

Regards,  
 
> On 28. Sep 2017, at 23:03, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> To quickly make Kostas' intuition concrete: it's currently not possible to 
> have watermarks broadcast but the data be locally forwarded. The reason is 
> that watermarks and data travel in the same channels so if the watermark 
> needs to be broadcast there needs to be an n to m (in this case m == n) 
> connection pattern between the operations (tasks).
> 
> I think your algorithm should work if you take the correct difference, i.e. 
> throttle when timestamp - "global watermark" > threshold. The inverted diff 
> would be "global watermark" - timestamp. I think you're already doing the 
> correct thing, just wanted to clarify for others who might be reading.
> 
> Did you check on which TaskManagers the taskA and taskB operators run? I 
> think they should still be running on the same TM if resources permit.
> 
> Best,
> Aljoscha
>> On 28. Sep 2017, at 10:25, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Yunus,
>> 
>> I see. Currently I am not sure that you can simply broadcast the watermark 
>> only, without 
>> having a shuffle.
>> 
>> But one thing to notice about your algorithm is that, I am not sure if your 
>> algorithm solves 
>> the problem you encounter.
>> 
>> Your algorithm seems to prioritize the stream with the elements with the 
>> smallest timestamps,
>> rather than throttling fast streams so that slow ones can catch up.
>> 
>> Example: Reading a partition from Kafka that has elements with timestamps 
>> 1,2,3
>> will emit watermark 3 (assuming ascending watermark extractor), while 
>> another task that reads 
>> another partition with elements with timestamps 5,6,7 will emit watermark 7. 
>> With your algorithm, 
>> if I get it right, you will throttle the second partition/task, while allow 
>> the first one to advance, although
>> both read at the same pace (e.g. 3 elements per unit of time).
>> 
>> I will think a bit more on the solution. 
>> 
>> Some sketches that I can find, they all introduce some latency, e.g. 
>> measuring throughput in taskA
>> and sending it to a side output with a taksID, then broadcasting the side 
>> output to a downstream operator
>> which is sth like a coprocess function (taskB) and receives the original 
>> stream and the side output, and 
>> this is the one that checks if “my task" is slow. 
>> 
>> As I said I will think on it a bit more,
>> Kostas
>> 
>>> On Sep 27, 2017, at 6:32 PM, Yunus Olgun <yunol...@gmail.com 
>>> <mailto:yunol...@gmail.com>> wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> Yes, you have summarized well. I want to only forward the data to the next 
>>> local operator, but broadcast the watermark through the cluster.
>>> 
>>> - I can’t set parallelism of taskB to 1. The stream is too big for that. 
>>> Also, the data is ordered at each partition. I don’t want to change that 
>>> order.
>>> 
>>> - I don’t need KeyedStream. Also taskA and taskB will always have the same 
>>> parallelism with each other. But this parallelism can be increased in the 
>>> future.
>>> 
>>> The use case is: The source is Kafka. At our peak hours or when we want to 
>>> run the streaming job with old data from Kafka, always the same thing 
>>> happens. Even at trivial jobs. Some consumers consumes faster than others. 
>>> They produce too much data to downstream but watermark advances slowly at 
>>> the speed of the slowest consumer. This extra data gets piled up at 
>>> downstream operators. When the downstream operator is an aggregation, it is 
>>> ok. But when it is a in-Flink join; state size gets too big, checkpoints 
>>> take much longer and overall the job becomes slower or fails. Also it 
>>> effects other jobs at the cluster.
>>> 
>>> So, basically I want to implement a throttler. It compares timestamp of a 
>>> record and the global watermark. If the difference is larger than a 
>>> constant threshold it starts sleeping 1 ms for each incoming record. This 
>>> way, fast operators wait for the slowest one.
>>> 
>>> The only problem is that, this solution came at the cost of one network 
>>> shuffle and data serialization/deserialization. Since the stream is large I 
>>> want to avoid the network shuffle at the least. 
>>> 
>>> I thought operator instances within a taskmanager would get the same 
>>> indexId, but apparently this is not the case.
>>> 
>>> Thanks,
>>> 
>>>> On 27. Sep 2017, at 17:16, Kostas Kloudas <k.klou...@data-artisans.com 
>>>> <mailto:k.klou...@data-artisans.com>> wrote:
>>>> 
>>>> Hi Yunus,
>>>> 
>>>> I am not sure if I understand correctly the question.
>>>> 
>>>> Am I correct to assume that you want the following?
>>>> 
>>>>                            ———————————> time
>>>> 
>>>>            ProcessA                                                ProcessB
>>>> 
>>>> Task1: W(3) E(1) E(2) E(5)                 W(3) W(7) E(1) E(2) E(5)
>>>> 
>>>> Task2: W(7) E(3) E(10) E(6)                        W(3) W(7) E(3) E(10) 
>>>> E(6)
>>>> 
>>>> 
>>>> In the above, elements flow from left to right and W() stands for 
>>>> watermark and E() stands for element.
>>>> In other words, between Process(TaksA) and Process(TaskB) you want to only 
>>>> forward the elements, but broadcast the watermarks, right?
>>>> 
>>>> If this is the case, a trivial solution would be to set the parallelism of 
>>>> TaskB to 1, so that all elements go through the same node.
>>>> 
>>>> One other solution is what you did, BUT by using a custom partitioner you 
>>>> cannot use keyed state in your process function B because the 
>>>> stream is no longer keyed.
>>>> 
>>>> A similar approach to what you did but without the limitation above, is 
>>>> that in the first processFunction (TaskA) you can append the 
>>>> taskId to the elements themselves and then do a keyBy(taskId) between the 
>>>> first and the second process function.
>>>> 
>>>> These are the solutions that I can come up with, assuming that you want to 
>>>> do what I described.
>>>> 
>>>> But in general, could you please describe a bit more what is your use 
>>>> case? 
>>>> This way we may figure out another approach to achieve your goal. 
>>>> In fact, I am not sure if you earn anything by broadcasting the watermark, 
>>>> other than 
>>>> re-implementing (to some extent) Flink’s windowing mechanism.
>>>> 
>>>> Thanks,
>>>> Kostas
>>>> 
>>>>> On Sep 27, 2017, at 4:35 PM, Yunus Olgun <yunol...@gmail.com 
>>>>> <mailto:yunol...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I have a simple streaming job such as:
>>>>> 
>>>>> source.process(taskA)
>>>>>           .process(taskB)
>>>>> 
>>>>> I want taskB to access minimum watermark of all parallel taskA instances, 
>>>>> but the data is ordered and should not be shuffled. ForwardPartitioner 
>>>>> uses watermark of only one predecessor. So, I have used a 
>>>>> customPartitioner.
>>>>> 
>>>>> source.process(taskA)
>>>>>           .map(AssignPartitionID)
>>>>>           .partitionCustom(IdPartitioner)
>>>>>           .map(StripPartitionID)
>>>>>           .process(taskB)
>>>>> 
>>>>> At AssignPartitionID function, I attach 
>>>>> getRuntimeContext().getIndexOfThisSubtask() as a partitionId to the 
>>>>> object. At IdPartitioner, I return this partitionId.
>>>>> 
>>>>> This solved the main requirement but I have another concern now,
>>>>> 
>>>>> Network shuffle: I don’t need a network shuffle. I thought within a 
>>>>> taskmanager, indexId of taskA subtasks would be same as indexId of taskB 
>>>>> subtasks. Unfortunately, they are not. Is there a way to make 
>>>>> partitionCustom distribute data like ForwardPartitioner, to the next 
>>>>> local operator? 
>>>>> 
>>>>> As I know, this still requires object serialization/deserialization since 
>>>>> operators can’t be chained anymore. Is there a way to get minimum 
>>>>> watermark from upstream operators without network shuffle and object 
>>>>> serilization/deserialization?
>>>>> 
>>>>> Regards,
>>>> 
>>> 
>> 
> 

Reply via email to