Hi all,

Apologies for not following up sooner. Thank you Austin for creating
FLINK-22698. It seems that the issue is well understood and a fix is
currently under development/review. Please let me know if there is anything
additional that I can do. I look forward to testing out a new version of
Flink that includes this fix.

Thanks again,
Jose

On Tue, May 18, 2021 at 4:38 PM Austin Cawley-Edwards <
[email protected]> wrote:

> Hey all,
>
> Thanks for the details, John! Hmm, that doesn't look too good either 😬
> but probably a different issue with the RMQ source/ sink. Hopefully, the
> new FLIP-27 sources will help you guys out there! The upcoming HybridSource
> in FLIP-150 [1] might also be interesting to you in finely controlling
> sources.
>
> @Jose Vargas <[email protected]> I've created FLINK-22698 [2] to
> track your issue. Do you have a small reproducible case/ GitHub repo? Also,
> would you be able to provide a little bit more about the Flink job that you
> see this issue in? i.e. overall parallelism, the parallelism of the
> sources/ sinks, checkpointing mode.
>
> Best,
> Austin
>
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> [2]: https://issues.apache.org/jira/browse/FLINK-22698
>
> On Thu, May 13, 2021 at 9:25 PM John Morrow <[email protected]>
> wrote:
>
>> Hi Jose, hey Austin!!
>>
>> I know we were just recently looking at trying to consume a fixed number
>> of messages from an RMQ source, process them and output them to an RMQ
>> sink. As a naive first attempt at stopping the job when the target number
>> of messaged had been processed, we put a counter state in the process
>> function and tried throwing an exception when the counter >= the target
>> message count.
>>
>> The job had:
>>
>>    - parallelism: 1
>>    - checkpointing: 1000 (1 sec)
>>    - restartStrategy: noRestart
>>    - prefetchCount: 100
>>
>> Running it with 150 messages in the input queue and 150 also as the
>> target number, at the end the queues had:
>>
>>    - output queue - 150
>>    - input queue - 50
>>
>> So it looks like it did transfer all the messages, but some unack'd ones
>> also got requeued back at the source so end up as duplicates. I know
>> throwing an exception in the Flink job is not the same as triggering a
>> stateful shutdown, but it might be hitting similar unack issues.
>>
>> John
>>
>> ------------------------------
>> *From:* Austin Cawley-Edwards <[email protected]>
>> *Sent:* Thursday 13 May 2021 16:49
>> *To:* Jose Vargas <[email protected]>; John Morrow <
>> [email protected]>
>> *Cc:* user <[email protected]>
>> *Subject:* Re: RabbitMQ source does not stop unless message arrives in
>> queue
>>
>> Hey Jose,
>>
>> Thanks for bringing this up – it indeed sounds like a bug. There is
>> ongoing work to update the RMQ source to the new interface, which might
>> address some of these issues (or should, if it is not already), tracked in
>> FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
>> you like me to?
>>
>> At my previous company, we only consumed one Rabbit queue per
>> application, so we didn't run into this exactly but did see other weird
>> behavior in the RMQ source that could be related. I'm going to cc @John
>> Morrow <[email protected]> who might be able to contribute to
>> what he's seen working with the source, if he's around. I remember some
>> messages not properly being ack'ed during a stateful shutdown via the
>> Ververica Platform's stop-with-savepoint functionality that you mention,
>> though that might be more related to FLINK-20244[2], perhaps.
>>
>>
>> Best,
>> Austin
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-20628
>> [2]: https://issues.apache.org/jira/browse/FLINK-20244
>>
>> On Thu, May 13, 2021 at 10:23 AM Jose Vargas <[email protected]>
>> wrote:
>>
>> Hi,
>>
>> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
>> Flink's RabbitMQ source has some surprising behavior when a
>> stop-with-savepoint request is made.
>>
>> *Expected Behavior:*
>> The stop-with-savepoint request stops the job with a FINISHED state.
>>
>> *Actual Behavior:*
>> The stop-with-savepoint request either times out or hangs indefinitely
>> unless a message arrives in all the queues that the job consumes from after
>> the stop-with-savepoint request is made.
>>
>>
>> I know that one possible workaround is to send a sentinel value to each
>> of the queues consumed by the job that the deserialization schema checks in
>> its isEndOfStream method. However, this is somewhat cumbersome and
>> complicates the continuous delivery of a Flink job. For example,
>> Ververica Platform will trigger a stop-with-savepoint for the user if one
>> of many possible Flink configurations for a job are changed. The
>> stop-with-savepoint can then hang indefinitely because only some of the
>> RabbitMQ sources will have reached a FINISHED state.
>>
>> I have attached the TaskManager thread dump after the save-with-savepoint
>> request was made. Most every thread is either sleeping or waiting around
>> for locks to be released, and then there are a handful of threads trying to
>> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
>> method.
>>
>> Ideally, once a stop-with-savepoint request is made, the threads trying
>> to read data from RabbitMQ would be interrupted so that all RabbitMQ
>> sources would reach a FINISHED state.
>>
>> Regular checkpoints and savepoints complete successfully, it is only the
>> stop-with-savepoint request where I see this behavior.
>>
>>
>> Respectfully,
>>
>>
>> Jose Vargas
>>
>> Software Engineer, Data Engineering
>>
>> E: [email protected]
>>
>> fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
>> <http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>
>>
>>

-- 

Jose Vargas

Software Engineer, Data Engineering

E: [email protected]

fiscalnote.com <https://www.fiscalnote.com>  |  info.cq.com
<http://www.info.cq.com>  | rollcall.com <https://www.rollcall.com>

Reply via email to