Our implementation has quite a bit more going on just to deal with serialization of types, but here is pretty much the core of what we do in (psuedo) scala:
class DynamoSink[...](...) extends RichProcessFunction[T] with ProcessingTimeCallback { private var curTimer: Option[Long] = None private var toWrite: ListBuffer[T] = new ListBuffer[T] def timerService: ProcessingTimeService = getRuntimeContext.asInstanceOf[StreamingRuntimeContext].getProcessingTimeService override def invoke(element: T, context: Context[_]): Unit = { if (curTimer.isEmpty) { val timer = context.currentProcessingTime() + maxDelayTime.toMilliseconds timerService.registerTimer(timer, this) curTimer = Some(timer) } toWrite += element if (toWrite.length >= 25) { flushBufferedRecordsToDynamo() } } override def onProcessingTime(timestamp: Long): Unit = { if (toWrite.nonEmpty) { flushBufferedRecordsToDynamo() } curTimer = None } def flushBufferedRecordsToDynamo(): Unit = { val result = batchWrite(toWrite) val unprocessed = getUnprocessedItems(result) toWrite.clear() toWrite ++= unprocessed } // snapshot and restore the state of the toWrite buffer override def snapshotState(context: FunctionSnapshotContext): Unit = ??? override def initializeState(context: FunctionInitializationContext): Unit = ??? // write to dynamo def batchWrite(toWrite: ListBuffer[T]): List[BatchWriteItemResult] = ??? // capture any results in the BatchWriteItemResult that didn't get processed def getUnprocessedItems(result: List[BatchWriteItemResult]): ListBuffer[T] = ??? } On Fri, Mar 22, 2019 at 10:59 AM Addison Higham <addis...@gmail.com> wrote: > Hi there, > > We have implemented a dynamo sink, have had no real issues, but obviously, > it is at-least-once and so we work around that by just structuring our > transformations so that they produce idempotent writes. > > What we do is pretty similar to what you suggest, we collect the records > in a buffer (which needs to be checkpointed) > However, instead of using the checkpointedFunction interface to trigger > batch writes, we use a RichSinkFunction and have a process time callback. > For each record, we ensure there is a process time callback set for some > configurable time in the future and add it to the buffer. Dynamodb has a > max batch size of 25 records, so if the batch is full, we immediately > trigger a write. > > The process time callback serves mostly for situations with low message > volume so that we can control the max amount of latency a record will have. > > We initially looked at using the Checkpointed function, but due to dynamos > relatively small batch write size we knew that we would need to be flushing > really often and that batches would never get very big so state would be > minimal. > > Hopefully that helps! > > On Fri, Mar 22, 2019 at 9:49 AM Vivek <vivek.tha...@gmail.com> wrote: > >> >> Hello, >> >> I am currently using Dynamodb as a persistent store in my application, >> and in process of transforming the application into streaming pipeline >> utilizing Flink. Pipeline is simple and stateless, consume data from Kafka, >> apply some transformation and write records to Dynamodb. >> I would want to collect records in buffer and batch write them instead of >> writing one at a time. Also want to put retry and exponential back off for >> unprocessed records in batch. >> >> My first question , is Dynamodb a suitable Sink? I googled and searched >> on mailing list archive, but couldn’t find anyone having that requirement. >> >> What are some of the guidelines around implementing a fault tolerant Sink >> with at least once guarantees? I am looking at ElasticSearch implementation >> and it does have the capabilities that I want, Could I just try >> implementing Dynamodb Sink similar way? >> >> Basically, implement Checkpointed interface, >> Collect records in buffer in invoke method, flush buffer and checkEerrors >> in snapshot method, >> Flush method will call Dynamodb’s batchwriteitem and if there are any >> unprocessed records add them back in buffer. FailureHandler will attempt to >> flush buffer again or do nothing or fail the sink based on failure handling >> policy. >> >> Please let me know if am in right direct >> >> Thank You > >