You can use the Timestamp Assigner / Watermark Generator in two different
ways: Per Kafka Partition or per parallel source.

I would usually recommend per Kafka Partition, because if the read position
in the partitions drifts apart (for example some partitions are read at the
tail, some are read a few minutes behind) then your watermarks get messes
up easily, if you do not track them per partition.
There is one instance of the assigner per partition, because its state
might be different for each partition (like "highest seen timestamp so far"
or "millis since last activity").

Why this behaves differently with a JDBC sink versus simply printing is
strange. Does the JDBC sink alter the parallelism or block some parts of
the pipeline?


On Sat, Aug 17, 2019 at 2:42 AM Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com> wrote:

> Hi all,
>
> It was a bit tricky to figure out what was going wrong here, hopefully
> someone can add the missing piece to the puzzle.
>
> I have a Kafka source with a custom AssignerWithPeriodicWatermarks
> timestamp assigner. It's a copy of the AscendingTimestampExtractor with a
> log statement printing each timestamp and watermark produced (along with
> the hash of the assigner instance so I know exactly how far each substream
> has progressed). Attached to that there is a JDBCSinkFunction. I have set
> the whole plan's parallelism to 1 and the max also to 1.
>
> My first surprise was to see there are 16 instances of my assigner
> created, despite there being only one thread using all 16.
>
> My second surprise was to see there were only 4 assigner instances that
> were extracting timestamps.
>
> This meant the whole job's watermark wasn't advancing (and while that's
> not important in this simplified example it is in my real life use case).
>
> If I replace my JDBC sink for a print sink though all 16 assigners get
> fully used (i.e. they all receive messages from which they have to extract
> a timestamp).
>
> What is happening here? I don't want to ignore the unattended Kafka
> partitions or mark them as idle - because I know from using the print sink
> that they do have messages in them. I'm also surprised that there are 16
> instances of the assigner (one per Kafka partition) even though the
> parallelism of the job is one - is that a conscious decision and if so
> what's the reason?
>
> Finally I'd also like to know why only 4 assigners are effectively been
> used, I suspect it's a JDBC default I can override somehow.
>
> Thanks for getting to the bottom of this!
>

Reply via email to