Hello, I am working on a streaming application with apache flink, which shall provide end-to-end exactly-once delivery guarantees. The application is roughly built like this:
environment.addSource(consumer) .map(… idempotent transformations ...) .map(new DatabaseFunction) .map(… idempotent transformations ...) .addSink(producer) Both source and sink are kafka connectors, and thus support exactly-once delivery guarantees. The tricky part comes with the .map() containing the DatabaseFunction. Its job is to: 1) look up the incoming message in some oracle database 2a) insert it if it is not already stored in the database and publish the incoming message 2b) otherwise combine the incoming update with previous contents from the database, and store back the combined update in the database 3) output the result of 2) to the next operator This logic leads to inconsistent data beeing published to the sink in case of a failure where the DatabaseFunction was already executed, but the message is not yet published to the sink. My understanding is, that in such a scenario all operator states would be reverted to the last checkpoint. Since the .map() operator is stateless, nothing is done here, so only the consumer and producer states are reverted. This leads to the message beeing reprocessed from the beginning (source), and thus beeing processed *again* by the DatabaseFunction. However, the DatabaseFunction is not idempotent (because of 1)-3) as explained above), and thus leads to a different output than in the first run. The question is, how I can assure transaction-safety in this application? Basically, I would need to use database transactions within the DatabaseFunction, and commit those only if the messages are also commited to the kafka sink. However, I don’t know how to achieve this. I read about the two phase commit protocol in flink (https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html), but I fail to find examples of how to implement this in detail for stream operators (NOT sinks). All documentation I find only refers to using the two phase commit protocol for sinks. Should I, in this case, only implement the CheckpointedFunction and hook on the initializeState/snapshotState to rollback/commit by database transactions? Would this already make things work? I am a bit confused because there seem to be no hooks for the pre-commit/commit/abort signals. Anyway, I am also afraid that this might also introduce scaling issues, because depending on the message throughput, committing database actions only with every checkpoint interval might blow the temp tablespace in the oracle database. Thanks in advance for any help. best regards Patrick Fial -- Patrick Fial Client Platform Entwickler Information Design One AG Phone +49 69 244 502 38 Web www.id1.de <http://www.id1.de/> Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596 Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht (Vorsitz)