Hi Ning,

I don't think it is possible to pause a Kafka source upon taking a
savepoint without making any changes to the implementation.

I think your problem is that the Cassandra sink doesn't support exactly
once guarantees when the Cassandra query isn't idempotent. If possible, the
cleanest solution would be implementing a new or extending the existing
Cassandra sink with the
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
interface, and setting your environment to exactly-once guarantee.

--
Niels



On Mon, Oct 22, 2018 at 1:26 AM Ning Shi <nings...@gmail.com> wrote:

> I'm implementing a streaming job that consumes events from Kafka and
> writes results to Cassandra. The writes to Cassandra are not
> idempotent. In preparation for planned maintenance events like Flink
> version upgrade or job update, I'd like to be able to shutdown the job
> cleanly.
>
> I understand that cancelling with savepoint is currently not an atomic
> operation, meaning that there may be one or more events being processed
> after the savepoint is taken. In order to prevent any writes going to
> Cassandra after the savepoint is taken, I wonder if it's possible to
> pause the Kafka stream before I take the savepoint.
>
> I've seen people suggest using a control stream and union it with the
> Kafka stream to achieve this, but it doesn't really pause the Kafka's
> consumer offset from advancing. I'm concerned that if I send the signal
> to the control stream and start to drop messages from Kafka, the offsets
> may still advance and the new offsets will be included in the
> savepoint. As a result, recovering from the savepoint will cause data
> loss.
>
> Is there anyway to cleanly shutdown a job or pause the Kafka source
> prior to taking a savepoint?
>
> Thanks,
>
> --
> Ning
>

Reply via email to