Also relevant for this discussion: Several people (including me) by now were 
floating the idea of reworking the source interface to take away the 
responsibility of stopping/canceling/continuing from a specific source 
implementation and to instead give that power to the system. Currently each 
source does basically this:

class Source<T> {
  public void run(Context ctx, Lock lock) {
    while ("forever long I want and I don't care") {
      synchronized (lock) {
        T output = ReadFrom.externalSystem();
         updateReadPositionState();
        ctx.collect(output);
      }
    }
  }
}

Meaning that any stopping/canceling behaviour requires cooperation from the 
source implementation.

This would be a different idea for a source interface:

abstract class NewSource {
  public abstract boolean start();
  public abstract boolean advance();
  public abstract void close();

  public abstract T getCurrent();
  public abstract Instant getCurrentTimestamp();
  public abstract Instant getWatermark();

  public abstract CheckpointMark getCheckpointMark();
}

Here the driver would sit outside and call the source whenever data should be 
provided. Stop/cancel would not be a feature of the source function but of the 
code that calls it.

Best,
Aljoscha

> On 14. Sep 2017, at 20:03, Eron Wright <eronwri...@gmail.com> wrote:
> 
> I too am curious about stop vs cancel.  I'm trying to understand the 
> motivations a bit more.
> 
> The current behavior of stop is basically that the sources become bounded, 
> leading to the job winding down.
> 
> The interesting question is how best to support 'planned' maintenance 
> procedures such as app upgrade and scale changes.   I think a good 
> enhancement could be to stop precisely at checkpoint time to prevent emission 
> of spurious records.  Today the behavior of 'cancel w/ savepoint' is 
> at-least-once because the two operations aren't atomic.  Earlier I had 
> assumed that 'stop' would evolve in this direction but I suppose we could 
> improve the atomicity of 'cancel /w savepoint' rather than implicating 'stop'.
> 
> A different direction for 'stop' might be to improve the determinism of 
> bounding a streaming job such that the stop point is well-understood in terms 
> of the source.  For example, stopping at a offset provided as a stop 
> parameter.   Today I suppose one would rely on external state to remember the 
> stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.
> 
> On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> Hey Elias,
> 
> sorry for the delay here. No, stop is not deprecated but not fully
> implemented yet. One missing part is migration of the existing source
> functions as you say.
> 
> Let me pull in Till for more details on this. @Till: Is there more
> missing than migrating the sources?
> 
> Here is the PR and discussion for reference:
> https://github.com/apache/flink/pull/750 
> <https://github.com/apache/flink/pull/750>
> 
> I would also really love to see this fully implemented in Flink. I
> don't expect this to happen for the upcoming 1.4 release though.
> 
> – Ufuk
> 
> 
> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fearsome.lucid...@gmail.com 
> <mailto:fearsome.lucid...@gmail.com>> wrote:
> > Anyone?
> >
> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fearsome.lucid...@gmail.com 
> > <mailto:fearsome.lucid...@gmail.com>>
> > wrote:
> >>
> >> I was wondering about the status of the flink stop command.  At first
> >> blush it would seem as the preferable way to shutdown a Flink job, but it
> >> depends on StoppableFunction being implemented by sources and I notice that
> >> the Kafka source does not seem to implement it.  In addition, the command
> >> does not -s  --withSavepoint like cancel does.
> >>
> >> Is stop deprecated?
> >
> >
> 

Reply via email to