Hi Arun, Gabor, Thanks for the feedback. We are using the "Exactly-once using transactional writes" approach, so we don't rely on message keys for idempotent writes. So I should clarify that my question is specific to the "Exactly-once using transactional writes" approach.
We are following the approach mentioned from this previous list posting: http://apache-spark-developers-list.1001551.n3.nabble.com/Easy-way-to-get-offset-metatada-with-Spark-Streaming-API-td22406.html Quoted from that posting: "I think the right way to look at this is the batchId is just a proxy for offsets that is agnostic to what type of source you are reading from (or how many sources their are). We might call into a custom sink with the same batchId more than once, but it will always contain the same data (there is no race condition, since this is stored in a write-ahead log). As long as you check/commit the batch id in the same transaction as the data you will get exactly once." We coordinate each micro-batch transaction from the driver, and atomically commit the batchId with the data to a datastore. So my question is really specific to this part of the referenced posting "We might call into a custom sink with the same batchId more than once". Before committing any micro-batch, we can check whether the batchId has already been committed. I know that this is required in the case of driver recovery, because the batchId in the checkpoint file might be out-of-synch with the batchId that was committed in a datastore transaction. Are there other cases where Spark might call into a custom sink with the same batchId more than once? i.e. do we need to check Spark's current batchId against the datastore for each micro-batch, or can we get away with only doing this check at the time of driver recovery? On Thu, Dec 6, 2018 at 1:28 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Eric, > > In order to have exactly-once one need re-playable source and idempotent > sink. > The cases what you've mentioned are covering the 2 main group of issues. > Practically any kind of programming problem can end-up in duplicated data > (even in the code which feeds kafka). > Don't know why have you asked this because if the sink see an already > processed key then it should be just skipped and doesn't matter why it is > duplicated. > Cody has a really good writing about semantics: > https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md#delivery-semantics > > I think if you reach Continuous Processing this it worth to consider: > "There are currently no automatic retries of failed tasks. Any failure > will lead to the query being stopped and it needs to be manually restarted > from the checkpoint." > > BR, > G > > > On Wed, Dec 5, 2018 at 8:36 PM Eric Wohlstadter <wohls...@gmail.com> > wrote: > >> Hi all, >> We are working on implementing a streaming sink on 2.3.1 with the >> DataSourceV2 APIs. >> >> Can anyone help check if my understanding is correct, with respect to the >> failure modes which need to be covered? >> >> We are assuming that a Reliable Receiver (such as Kafka) is used as the >> stream source. And we only want to support micro-batch execution at this >> time (not yet Continuous Processing). >> >> I believe the possible failures that need to be covered are: >> >> 1. Task failure: If a task fails, it may have written data to the sink >> output before failure. Subsequent attempts for a failed task must be >> idempotent, so that no data is duplicated in the output. >> 2. Driver failure: If the driver fails, upon recovery, it might replay a >> micro-batch that was already seen by the sink (if a failure occurs after >> the sink has committed output but before the driver has updated the >> checkpoint). In this case, the sink must be idempotent when a micro-batch >> is replayed so that no data is duplicated in the output. >> >> Are there any other cases where data might be duplicated in the stream? >> i.e. if neither of these 2 failures occur, is there still a case where >> data can be duplicated? >> >> Thanks for any help to check if my understanding is correct. >> >> >> >> >>