Hi there,

We have implemented a dynamo sink, have had no real issues, but obviously,
it is at-least-once and so we work around that by just structuring our
transformations so that they produce idempotent writes.

What we do is pretty similar to what you suggest, we collect the records in
a buffer (which needs to be checkpointed)
However, instead of using the checkpointedFunction interface to trigger
batch writes, we use a RichSinkFunction and have a process time callback.
For each record, we ensure there is a process time callback set for some
configurable time in the future and add it to the buffer.  Dynamodb has a
max batch size of 25 records, so if the batch is full, we immediately
trigger a write.

The process time callback serves mostly for situations with low  message
volume so that we can control the max amount of latency a record will have.

We initially looked at using the Checkpointed function, but due to dynamos
relatively small batch write size we knew that we would need to be flushing
really often and that batches would never get very big so state would be
minimal.

Hopefully that helps!

On Fri, Mar 22, 2019 at 9:49 AM Vivek <vivek.tha...@gmail.com> wrote:

>
> Hello,
>
> I am currently using Dynamodb as a persistent store in my application, and
> in process of transforming the application into streaming pipeline
> utilizing Flink. Pipeline is simple and stateless, consume data from Kafka,
> apply some transformation and write records to Dynamodb.
> I would want to collect records in buffer and batch write them instead of
> writing one at a time. Also want to put retry and exponential back off for
> unprocessed records in batch.
>
> My first question , is Dynamodb a suitable Sink? I googled and searched on
> mailing list archive, but couldn’t find anyone having that requirement.
>
> What are some of the guidelines around implementing a fault tolerant Sink
> with at least once guarantees? I am looking at ElasticSearch implementation
> and it does have the capabilities that I want, Could I just try
> implementing Dynamodb Sink similar way?
>
> Basically, implement Checkpointed interface,
> Collect records in buffer in invoke method, flush buffer and checkEerrors
> in snapshot method,
> Flush method will call Dynamodb’s batchwriteitem and if there are any
> unprocessed records add them back in buffer. FailureHandler will attempt to
> flush buffer again or do nothing or fail the sink based on failure handling
> policy.
>
> Please let me know if am in right direct
>
> Thank You

Reply via email to