Hi,

My 2 cents: not being able to programmatically nicely stop a Flink stream is 
what lacks most to the framework IMHO. It's a very common use case: each time 
you want to update the application or change its configuration you need to 
nicely stop  & restart it, without triggering alerts, data loss, or anything 
else.
That's why I never use the provided Flink Sources "out of the box". I've made a 
framework that encapsulate them, adding a monitoring thread that periodically 
check for a special "hdfs stop file" and try to nicely cancel() the source if 
the user requested a stop by this mean (I've found that the hdfs file trick is 
most easy way to reach from an external application all task managers running 
on unknown hosts).

I could not use the "special message" trick because in most real production 
environment you cannot, as a client, post a message in a queue just for your 
client's need: you don't have proper access rights to do so ; and you don't 
know how other clients, connected to the same data, may react to fake 
messages...

Unfortunately most Flink sources cannot be "cancelled" nicely without changing 
part of their code. It's the case for the Kafka source.

- If a kafa consumer source instance is not connected to any partition (because 
it's parallelism level exceeds the kafka consumer group partition number for 
instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until 
thread is interrupted :

                        // wait until this is canceled
                        final Object waitLock = new Object();
                        while (running) {
                                try {
                                        //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
                                        synchronized (waitLock) {
                                                waitLock.wait();
                                        }
                                }
                                catch (InterruptedException e) {
                                        if (!running) {
                                                // restore the interrupted 
state, and fall through the loop
                                                
Thread.currentThread().interrupt();
                                        }
                                }
                        }

So either you change the code, or in your monitoring thread you interrupt the 
source thread -- but that will trigger the HA mechanism, the source instance 
will be relaunched n times before failing.

- BTW it's also the case with RMQSource, as the "nextDelivery" in 
RMQSource.run() is called without timeout :
        @Override
        public void run(SourceContext<OUT> ctx) throws Exception {
                while (running) {
                        QueueingConsumer.Delivery delivery = 
consumer.nextDelivery();

So if no message arrives, the while running check is not done and the source 
cannot be cancelled without hard interruption.

Best regards,
Arnaud


-----Message d'origine-----
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright <eronwri...@gmail.com>
Cc : Ufuk Celebi <u...@apache.org>; Jaxon Hu <hujiaxu...@gmail.com>; user 
<user@flink.apache.org>; Aljoscha Krettek <aljos...@apache.org>
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?

Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer 
Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <eronwri...@gmail.com> wrote:
> I believe you can extend the `KeyedDeserializationSchema` that you
> pass to the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
> rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
> a.html#isEndOfStream-T-
>
> Eron
>
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Jaxon,
>>
>> I don't think it's possible to control this via the life-cycle
>> methods of your functions.
>>
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>>
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>>
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>
>> 3) Use an Http client and cancel your job via the Http endpoint
>>
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>>
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>
>> – Ufuk
>>
>>
>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote:
>> > I would like to stop FlinkKafkaConsumer consuming data from kafka
>> > manually.
>> > But I find it won't be close when I invoke "cancel()" method. What
>> > I am trying to do is add an EOF symbol meaning the end of my kafka
>> > data, and when the FlatMap operator read the symbol it will invoke
>> > FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink
>> > streaming job won't finish unless it get canceled or failed, when I
>> > use kafka as source.
>> >
>> > Somebody knowing  gives me some help, thx~~
>
>

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.

Reply via email to