Also relevant for this discussion: Several people (including me) by now were floating the idea of reworking the source interface to take away the responsibility of stopping/canceling/continuing from a specific source implementation and to instead give that power to the system. Currently each source does basically this:
class Source<T> { public void run(Context ctx, Lock lock) { while ("forever long I want and I don't care") { synchronized (lock) { T output = ReadFrom.externalSystem(); updateReadPositionState(); ctx.collect(output); } } } } Meaning that any stopping/canceling behaviour requires cooperation from the source implementation. This would be a different idea for a source interface: abstract class NewSource { public abstract boolean start(); public abstract boolean advance(); public abstract void close(); public abstract T getCurrent(); public abstract Instant getCurrentTimestamp(); public abstract Instant getWatermark(); public abstract CheckpointMark getCheckpointMark(); } Here the driver would sit outside and call the source whenever data should be provided. Stop/cancel would not be a feature of the source function but of the code that calls it. Best, Aljoscha > On 14. Sep 2017, at 20:03, Eron Wright <eronwri...@gmail.com> wrote: > > I too am curious about stop vs cancel. I'm trying to understand the > motivations a bit more. > > The current behavior of stop is basically that the sources become bounded, > leading to the job winding down. > > The interesting question is how best to support 'planned' maintenance > procedures such as app upgrade and scale changes. I think a good > enhancement could be to stop precisely at checkpoint time to prevent emission > of spurious records. Today the behavior of 'cancel w/ savepoint' is > at-least-once because the two operations aren't atomic. Earlier I had > assumed that 'stop' would evolve in this direction but I suppose we could > improve the atomicity of 'cancel /w savepoint' rather than implicating 'stop'. > > A different direction for 'stop' might be to improve the determinism of > bounding a streaming job such that the stop point is well-understood in terms > of the source. For example, stopping at a offset provided as a stop > parameter. Today I suppose one would rely on external state to remember the > stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets. > > On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <u...@apache.org > <mailto:u...@apache.org>> wrote: > Hey Elias, > > sorry for the delay here. No, stop is not deprecated but not fully > implemented yet. One missing part is migration of the existing source > functions as you say. > > Let me pull in Till for more details on this. @Till: Is there more > missing than migrating the sources? > > Here is the PR and discussion for reference: > https://github.com/apache/flink/pull/750 > <https://github.com/apache/flink/pull/750> > > I would also really love to see this fully implemented in Flink. I > don't expect this to happen for the upcoming 1.4 release though. > > – Ufuk > > > On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fearsome.lucid...@gmail.com > <mailto:fearsome.lucid...@gmail.com>> wrote: > > Anyone? > > > > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fearsome.lucid...@gmail.com > > <mailto:fearsome.lucid...@gmail.com>> > > wrote: > >> > >> I was wondering about the status of the flink stop command. At first > >> blush it would seem as the preferable way to shutdown a Flink job, but it > >> depends on StoppableFunction being implemented by sources and I notice that > >> the Kafka source does not seem to implement it. In addition, the command > >> does not -s --withSavepoint like cancel does. > >> > >> Is stop deprecated? > > > > >