Hey Jaxon,

I don't think it's possible to control this via the life-cycle methods
of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).

Maybe Aljoscha (cc'd) has an idea how to do this in a better way.

– Ufuk


On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote:
> I would like to stop FlinkKafkaConsumer consuming data from kafka manually.
> But I find it won't be close when I invoke "cancel()" method. What I am
> trying to do is add an EOF symbol meaning the end of my kafka data, and when
> the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> "cancel()" method. It doesn't work. Flink streaming job won't finish unless
> it get canceled or failed, when I use kafka as source.
>
> Somebody knowing  gives me some help, thx~~

Reply via email to