Hi Ning, I don't think it is possible to pause a Kafka source upon taking a savepoint without making any changes to the implementation.
I think your problem is that the Cassandra sink doesn't support exactly once guarantees when the Cassandra query isn't idempotent. If possible, the cleanest solution would be implementing a new or extending the existing Cassandra sink with the https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html interface, and setting your environment to exactly-once guarantee. -- Niels On Mon, Oct 22, 2018 at 1:26 AM Ning Shi <nings...@gmail.com> wrote: > I'm implementing a streaming job that consumes events from Kafka and > writes results to Cassandra. The writes to Cassandra are not > idempotent. In preparation for planned maintenance events like Flink > version upgrade or job update, I'd like to be able to shutdown the job > cleanly. > > I understand that cancelling with savepoint is currently not an atomic > operation, meaning that there may be one or more events being processed > after the savepoint is taken. In order to prevent any writes going to > Cassandra after the savepoint is taken, I wonder if it's possible to > pause the Kafka stream before I take the savepoint. > > I've seen people suggest using a control stream and union it with the > Kafka stream to achieve this, but it doesn't really pause the Kafka's > consumer offset from advancing. I'm concerned that if I send the signal > to the control stream and start to drop messages from Kafka, the offsets > may still advance and the new offsets will be included in the > savepoint. As a result, recovering from the savepoint will cause data > loss. > > Is there anyway to cleanly shutdown a job or pause the Kafka source > prior to taking a savepoint? > > Thanks, > > -- > Ning >