Hi Sandeep,

looking into the code, can you please elaborate on how the reading thread holds the lock for ever? From what I understand from the code the lock is released after each call to reader.advance(). Therefore the checkpoint should not be blocked "for ever". Am I missing something? Could you maybe take thread dump and send dumps of those two threads?

Thanks,

 Jan

On 7/27/22 15:58, John Casey via dev wrote:
Would it be possible to recreate the experiments to try and isolate variables? Right now the 3 cases change both beam and flink versions.



On Tue, Jul 26, 2022 at 11:35 PM Kenneth Knowles <k...@apache.org> wrote:

    Bumping this and adding +John Casey
    <mailto:johnjca...@google.com> who knows about KafkaIO and
    unbounded sources, though probably less about the FlinkRunner. It
    seems you have isolated it to the Flink translation logic. I'm not
    sure who would be the best expert to evaluate if that logic is
    still OK.

    Kenn

    On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep
    <sandeep_kath...@intuit.com> wrote:

        Hi,

           We have a stateless application which

         1. Reads from kafka
         2. Doing some stateless transformations by reading from in
            memory databases and updating the records
         3. Writing back to Kafka.

        *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are
        working fine (it takes max 1 min).*

        **

        *With Beam 2.29 and Flink 1.12, we are seeing checkpoints
        taking longer time (it takes max 6-7 min sometimes)*

        **

        *With Beam 2.38 and Flink 1.14, we are seeing checkpoints
        timing out after 10 minutes.*

        I am checking Beam code and after some logging and analysis
        found the problem is at
        
https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307

        We are still using the old API to read from Kafka and not yet
        using KafkaIO based on SplittableDoFn.

        There are two threads

         1. Legacy source thread reading from kafka and doing entire
            processing.
         2. Thread which emits watermark on timer
            
https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474

        Both these code blocks are in synchronized block waiting for
        same checkpoint lock. Under heavy load, the thread reading
        from kafka is running for ever in the while loop and  the
        thread emitting the watermarks is waiting for ever to get the
        lock not emitting the watermarks and the checkpoint times out.

        Is it a known issue and do we have any solution here? For now
        we are putting Thread.sleep(1) once for every 10 sec after the
        synchronized block so that the thread emitting the watermarks
        can be unblocked and run.

        One of my colleagues tried to follow up on this (attaching the
        previous email here) but we didn’t get any reply. Any help on
        this would be appreciated.

        Thanks,

        Sandeep

Reply via email to