Our implementation has quite a bit more going on just to deal with
serialization of types, but here is pretty much the core of what we do in
(psuedo) scala:

class DynamoSink[...](...) extends RichProcessFunction[T] with
ProcessingTimeCallback {

  private var curTimer: Option[Long] = None
  private var toWrite: ListBuffer[T] = new ListBuffer[T]
  def timerService: ProcessingTimeService
= 
getRuntimeContext.asInstanceOf[StreamingRuntimeContext].getProcessingTimeService

  override def invoke(element: T, context: Context[_]): Unit = {
    if (curTimer.isEmpty) {
      val timer = context.currentProcessingTime() +
maxDelayTime.toMilliseconds
      timerService.registerTimer(timer, this)
      curTimer = Some(timer)
    }

    toWrite += element
    if (toWrite.length >= 25) {
      flushBufferedRecordsToDynamo()
    }
  }

  override def onProcessingTime(timestamp: Long): Unit = {
    if (toWrite.nonEmpty) {
      flushBufferedRecordsToDynamo()
    }
    curTimer = None
  }

  def flushBufferedRecordsToDynamo(): Unit = {
    val result      = batchWrite(toWrite)
    val unprocessed = getUnprocessedItems(result)
    toWrite.clear()
    toWrite ++= unprocessed
  }

   // snapshot and restore the state of the toWrite buffer
  override def snapshotState(context: FunctionSnapshotContext): Unit = ???

  override def initializeState(context: FunctionInitializationContext):
Unit = ???

  // write to dynamo
  def batchWrite(toWrite: ListBuffer[T]): List[BatchWriteItemResult] = ???

  // capture any results in the BatchWriteItemResult that didn't get
processed
  def getUnprocessedItems(result: List[BatchWriteItemResult]):
ListBuffer[T]  = ???

}

On Fri, Mar 22, 2019 at 10:59 AM Addison Higham <addis...@gmail.com> wrote:

> Hi there,
>
> We have implemented a dynamo sink, have had no real issues, but obviously,
> it is at-least-once and so we work around that by just structuring our
> transformations so that they produce idempotent writes.
>
> What we do is pretty similar to what you suggest, we collect the records
> in a buffer (which needs to be checkpointed)
> However, instead of using the checkpointedFunction interface to trigger
> batch writes, we use a RichSinkFunction and have a process time callback.
> For each record, we ensure there is a process time callback set for some
> configurable time in the future and add it to the buffer.  Dynamodb has a
> max batch size of 25 records, so if the batch is full, we immediately
> trigger a write.
>
> The process time callback serves mostly for situations with low  message
> volume so that we can control the max amount of latency a record will have.
>
> We initially looked at using the Checkpointed function, but due to dynamos
> relatively small batch write size we knew that we would need to be flushing
> really often and that batches would never get very big so state would be
> minimal.
>
> Hopefully that helps!
>
> On Fri, Mar 22, 2019 at 9:49 AM Vivek <vivek.tha...@gmail.com> wrote:
>
>>
>> Hello,
>>
>> I am currently using Dynamodb as a persistent store in my application,
>> and in process of transforming the application into streaming pipeline
>> utilizing Flink. Pipeline is simple and stateless, consume data from Kafka,
>> apply some transformation and write records to Dynamodb.
>> I would want to collect records in buffer and batch write them instead of
>> writing one at a time. Also want to put retry and exponential back off for
>> unprocessed records in batch.
>>
>> My first question , is Dynamodb a suitable Sink? I googled and searched
>> on mailing list archive, but couldn’t find anyone having that requirement.
>>
>> What are some of the guidelines around implementing a fault tolerant Sink
>> with at least once guarantees? I am looking at ElasticSearch implementation
>> and it does have the capabilities that I want, Could I just try
>> implementing Dynamodb Sink similar way?
>>
>> Basically, implement Checkpointed interface,
>> Collect records in buffer in invoke method, flush buffer and checkEerrors
>> in snapshot method,
>> Flush method will call Dynamodb’s batchwriteitem and if there are any
>> unprocessed records add them back in buffer. FailureHandler will attempt to
>> flush buffer again or do nothing or fail the sink based on failure handling
>> policy.
>>
>> Please let me know if am in right direct
>>
>> Thank You
>
>

Reply via email to