Hello!

We currently have a job which reads from Kafka and uses punctuated watermarks 
based on the messages we read. We currently keep track of the watermarks for 
each partition to emit a consensus watermark, taking the smallest of all 
partitions.
We ran into an issue because we are not storing the state of this map of 
partitions->watermarks when one of the partitions got delayed and the job 
restarted, losing track of that partition and emitting a watermark anyway.
Our idea of a solution involves saving this map of partition -> watermarks into 
the state but we would like to know how Flink behaves when we decrease the 
parallelism so as to make sure that the instance that will read from Kafka also 
will have the state for that particular partition.

To give an example:

Operator 1: (Reads Partition1)
Partition 1: Watermark1 (Map / State)

Operator 2: (Reads Partition2)
Partition 2: Watermark2 (Map / State)

Operator 3: (Reads Partition1)
Partition 3: Watermark3 (Map / State)


After shrinking:

Operator 1: (Reads Partition1)
Partition 1: Watermark1 (Map / State)

Operator 2: (Reads Partition2, Partition3)
Partition 2: Watermark2 (Map / State)
Partition 3: Watermark3 (Map / State)

Or

Operator 1: (Reads Partition1, Partition3) => HERE we would have a problem as 
the state could be loaded on the other operator.
Partition 1: Watermark1 (Map / State)

Operator 2: (Reads Partition2)
Partition 2: Watermark2 (Map / State)
Partition 3: Watermark3 (Map / State)

For this we are using the operator state 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state)
 with “Even-split redistribution”

Could you please give us a hand understanding how Flink behaves in such 
scenario?

Thank you,
Juan G.

Reply via email to