Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'? It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation. e.g. the source might require a buffer pool.
On Fri, Sep 15, 2017 at 9:05 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Also relevant for this discussion: Several people (including me) by now > were floating the idea of reworking the source interface to take away the > responsibility of stopping/canceling/continuing from a specific source > implementation and to instead give that power to the system. Currently each > source does basically this: > > class Source<T> { > public void run(Context ctx, Lock lock) { > while ("forever long I want and I don't care") { > synchronized (lock) { > T output = ReadFrom.externalSystem(); > updateReadPositionState(); > ctx.collect(output); > } > } > } > } > > Meaning that any stopping/canceling behaviour requires cooperation from > the source implementation. > > This would be a different idea for a source interface: > > abstract class NewSource { > public abstract boolean start(); > public abstract boolean advance(); > public abstract void close(); > > public abstract T getCurrent(); > public abstract Instant getCurrentTimestamp(); > public abstract Instant getWatermark(); > > public abstract CheckpointMark getCheckpointMark(); > } > > Here the driver would sit outside and call the source whenever data should > be provided. Stop/cancel would not be a feature of the source function but > of the code that calls it. > > Best, > Aljoscha > > On 14. Sep 2017, at 20:03, Eron Wright <eronwri...@gmail.com> wrote: > > I too am curious about stop vs cancel. I'm trying to understand the > motivations a bit more. > > The current behavior of stop is basically that the sources become bounded, > leading to the job winding down. > > The interesting question is how best to support 'planned' maintenance > procedures such as app upgrade and scale changes. I think a good > enhancement could be to stop precisely at checkpoint time to prevent > emission of spurious records. Today the behavior of 'cancel w/ savepoint' > is at-least-once because the two operations aren't atomic. Earlier I had > assumed that 'stop' would evolve in this direction but I suppose we could > improve the atomicity of 'cancel /w savepoint' rather than implicating > 'stop'. > > A different direction for 'stop' might be to improve the determinism of > bounding a streaming job such that the stop point is well-understood in > terms of the source. For example, stopping at a offset provided as a stop > parameter. Today I suppose one would rely on external state to remember > the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets. > > On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <u...@apache.org> wrote: > >> Hey Elias, >> >> sorry for the delay here. No, stop is not deprecated but not fully >> implemented yet. One missing part is migration of the existing source >> functions as you say. >> >> Let me pull in Till for more details on this. @Till: Is there more >> missing than migrating the sources? >> >> Here is the PR and discussion for reference: >> https://github.com/apache/flink/pull/750 >> >> I would also really love to see this fully implemented in Flink. I >> don't expect this to happen for the upcoming 1.4 release though. >> >> – Ufuk >> >> >> On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <fearsome.lucid...@gmail.com> >> wrote: >> > Anyone? >> > >> > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy < >> fearsome.lucid...@gmail.com> >> > wrote: >> >> >> >> I was wondering about the status of the flink stop command. At first >> >> blush it would seem as the preferable way to shutdown a Flink job, but >> it >> >> depends on StoppableFunction being implemented by sources and I notice >> that >> >> the Kafka source does not seem to implement it. In addition, the >> command >> >> does not -s --withSavepoint like cancel does. >> >> >> >> Is stop deprecated? >> > >> > >> > > >