Hi Sandeep,
looking into the code, can you please elaborate on how the reading
thread holds the lock for ever? From what I understand from the code the
lock is released after each call to reader.advance(). Therefore the
checkpoint should not be blocked "for ever". Am I missing something?
Could you maybe take thread dump and send dumps of those two threads?
Thanks,
Jan
On 7/27/22 15:58, John Casey via dev wrote:
Would it be possible to recreate the experiments to try and isolate
variables? Right now the 3 cases change both beam and flink versions.
On Tue, Jul 26, 2022 at 11:35 PM Kenneth Knowles <k...@apache.org> wrote:
Bumping this and adding +John Casey
<mailto:johnjca...@google.com> who knows about KafkaIO and
unbounded sources, though probably less about the FlinkRunner. It
seems you have isolated it to the Flink translation logic. I'm not
sure who would be the best expert to evaluate if that logic is
still OK.
Kenn
On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep
<sandeep_kath...@intuit.com> wrote:
Hi,
We have a stateless application which
1. Reads from kafka
2. Doing some stateless transformations by reading from in
memory databases and updating the records
3. Writing back to Kafka.
*With Beam 2.23 and Flink 1.9, we are seeing checkpoints are
working fine (it takes max 1 min).*
**
*With Beam 2.29 and Flink 1.12, we are seeing checkpoints
taking longer time (it takes max 6-7 min sometimes)*
**
*With Beam 2.38 and Flink 1.14, we are seeing checkpoints
timing out after 10 minutes.*
I am checking Beam code and after some logging and analysis
found the problem is at
https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307
We are still using the old API to read from Kafka and not yet
using KafkaIO based on SplittableDoFn.
There are two threads
1. Legacy source thread reading from kafka and doing entire
processing.
2. Thread which emits watermark on timer
https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474
Both these code blocks are in synchronized block waiting for
same checkpoint lock. Under heavy load, the thread reading
from kafka is running for ever in the while loop and the
thread emitting the watermarks is waiting for ever to get the
lock not emitting the watermarks and the checkpoint times out.
Is it a known issue and do we have any solution here? For now
we are putting Thread.sleep(1) once for every 10 sec after the
synchronized block so that the thread emitting the watermarks
can be unblocked and run.
One of my colleagues tried to follow up on this (attaching the
previous email here) but we didn’t get any reply. Any help on
this would be appreciated.
Thanks,
Sandeep