Hi Andrey,

thanks for your feedback. I am not sure if I understand 100% correctly, but 
using the flink state to store my stuff (in addition to the oracle database) is 
not an option, because to my knowledge flink state does not allow arbitrary 
lookup queries, which I need to do, however. Also, given the logic described in 
my original post, the database access is never going to be idempotent, which 
lies in the nature of the required insert/update logic.

regards
Patrick


--

Patrick Fial

Client Platform Entwickler

Information Design One AG


Phone +49 69 244 502 38

Web www.id1.de <http://www.id1.de/>



Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
(Vorsitz)


Am 19. März 2019 um 17:59:22, Andrey Zagrebin 
(and...@ververica.com<mailto:and...@ververica.com>) schrieb:

Hi Patrick,

One approach, I would try, is to use Flink state and sync it with database in 
initializeState and CheckpointListener.notifyCheckpointComplete.
Basically issue only idempotent updates to database but only when the last 
checkpoint is securely taken and records before it are not processed again.
This has though a caveat that database might have stale data between 
checkpoints.
Once the current state is synced with database, depending on your App, it might 
be even cleared from Flink state.

I also cc Piotr and Kostas, maybe, they have more ideas.

Best,
Andrey

On Tue, Mar 19, 2019 at 10:09 AM Patrick Fial 
<patrick.f...@id1.de<mailto:patrick.f...@id1.de>> wrote:
Hello,

I am working on a streaming application with apache flink, which shall provide 
end-to-end exactly-once delivery guarantees. The application is roughly built 
like this:

environment.addSource(consumer)
  .map(… idempotent transformations ...)
  .map(new DatabaseFunction)
  .map(… idempotent transformations ...)
  .addSink(producer)

Both source and sink are kafka connectors, and thus support exactly-once 
delivery guarantees.

The tricky part comes with the .map() containing the DatabaseFunction. Its job 
is to:
1) look up the incoming message in some oracle database
2a) insert it if it is not already stored in the database and publish the 
incoming message
2b) otherwise combine the incoming update with previous contents from the 
database, and store back the combined update in the database
3) output the result of 2) to the next operator

This logic leads to inconsistent data beeing published to the sink in case of a 
failure where the DatabaseFunction was already executed, but the message is not 
yet published to the sink.

My understanding is, that in such a scenario all operator states would be 
reverted to the last checkpoint. Since the .map() operator is stateless, 
nothing is done here, so only the consumer and producer states are reverted. 
This leads to the message beeing reprocessed from the beginning (source), and 
thus beeing processed *again* by the DatabaseFunction. However, the 
DatabaseFunction is not idempotent (because of 1)-3) as explained above), and 
thus leads to a different output than in the first run.

The question is, how I can assure transaction-safety in this application?

Basically, I would need to use database transactions within the 
DatabaseFunction, and commit those only if the messages are also commited to 
the kafka sink. However, I don’t know how to achieve this.

I read about the two phase commit protocol in flink 
(https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html),
 but I fail to find examples of how to implement this in detail for stream 
operators (NOT sinks). All documentation I find only refers to using the two 
phase commit protocol for sinks. Should I, in this case, only implement the 
CheckpointedFunction and hook on the initializeState/snapshotState to 
rollback/commit by database transactions? Would this already make things work? 
I am a bit confused because there seem to be no hooks for the 
pre-commit/commit/abort signals.

Anyway, I am also afraid that this might also introduce scaling issues, because 
depending on the message throughput, committing database actions only with 
every checkpoint interval might blow the temp tablespace in the oracle database.

Thanks in advance for any help.

best regards
Patrick Fial


--

Patrick Fial

Client Platform Entwickler

Information Design One AG


Phone +49 69 244 502 38

Web www.id1.de<http://www.id1.de/>



Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
(Vorsitz)

Reply via email to