Thank you Addison, this is very helpful. On Fri, Mar 22, 2019 at 10:12 AM Addison Higham <addis...@gmail.com> wrote:
> Our implementation has quite a bit more going on just to deal with > serialization of types, but here is pretty much the core of what we do in > (psuedo) scala: > > class DynamoSink[...](...) extends RichProcessFunction[T] with > ProcessingTimeCallback { > > private var curTimer: Option[Long] = None > private var toWrite: ListBuffer[T] = new ListBuffer[T] > def timerService: ProcessingTimeService > = > getRuntimeContext.asInstanceOf[StreamingRuntimeContext].getProcessingTimeService > > override def invoke(element: T, context: Context[_]): Unit = { > if (curTimer.isEmpty) { > val timer = context.currentProcessingTime() + > maxDelayTime.toMilliseconds > timerService.registerTimer(timer, this) > curTimer = Some(timer) > } > > toWrite += element > if (toWrite.length >= 25) { > flushBufferedRecordsToDynamo() > } > } > > override def onProcessingTime(timestamp: Long): Unit = { > if (toWrite.nonEmpty) { > flushBufferedRecordsToDynamo() > } > curTimer = None > } > > def flushBufferedRecordsToDynamo(): Unit = { > val result = batchWrite(toWrite) > val unprocessed = getUnprocessedItems(result) > toWrite.clear() > toWrite ++= unprocessed > } > > // snapshot and restore the state of the toWrite buffer > override def snapshotState(context: FunctionSnapshotContext): Unit = ??? > > override def initializeState(context: FunctionInitializationContext): > Unit = ??? > > // write to dynamo > def batchWrite(toWrite: ListBuffer[T]): List[BatchWriteItemResult] = ??? > > // capture any results in the BatchWriteItemResult that didn't get > processed > def getUnprocessedItems(result: List[BatchWriteItemResult]): > ListBuffer[T] = ??? > > } > > On Fri, Mar 22, 2019 at 10:59 AM Addison Higham <addis...@gmail.com> > wrote: > >> Hi there, >> >> We have implemented a dynamo sink, have had no real issues, but >> obviously, it is at-least-once and so we work around that by just >> structuring our transformations so that they produce idempotent writes. >> >> What we do is pretty similar to what you suggest, we collect the records >> in a buffer (which needs to be checkpointed) >> However, instead of using the checkpointedFunction interface to trigger >> batch writes, we use a RichSinkFunction and have a process time callback. >> For each record, we ensure there is a process time callback set for some >> configurable time in the future and add it to the buffer. Dynamodb has a >> max batch size of 25 records, so if the batch is full, we immediately >> trigger a write. >> >> The process time callback serves mostly for situations with low message >> volume so that we can control the max amount of latency a record will have. >> >> We initially looked at using the Checkpointed function, but due to >> dynamos relatively small batch write size we knew that we would need to be >> flushing really often and that batches would never get very big so state >> would be minimal. >> >> Hopefully that helps! >> >> On Fri, Mar 22, 2019 at 9:49 AM Vivek <vivek.tha...@gmail.com> wrote: >> >>> >>> Hello, >>> >>> I am currently using Dynamodb as a persistent store in my application, >>> and in process of transforming the application into streaming pipeline >>> utilizing Flink. Pipeline is simple and stateless, consume data from Kafka, >>> apply some transformation and write records to Dynamodb. >>> I would want to collect records in buffer and batch write them instead >>> of writing one at a time. Also want to put retry and exponential back off >>> for unprocessed records in batch. >>> >>> My first question , is Dynamodb a suitable Sink? I googled and searched >>> on mailing list archive, but couldn’t find anyone having that requirement. >>> >>> What are some of the guidelines around implementing a fault tolerant >>> Sink with at least once guarantees? I am looking at ElasticSearch >>> implementation and it does have the capabilities that I want, Could I just >>> try implementing Dynamodb Sink similar way? >>> >>> Basically, implement Checkpointed interface, >>> Collect records in buffer in invoke method, flush buffer and >>> checkEerrors in snapshot method, >>> Flush method will call Dynamodb’s batchwriteitem and if there are any >>> unprocessed records add them back in buffer. FailureHandler will attempt to >>> flush buffer again or do nothing or fail the sink based on failure handling >>> policy. >>> >>> Please let me know if am in right direct >>> >>> Thank You >> >>