Thank you Addison, this is very helpful.

On Fri, Mar 22, 2019 at 10:12 AM Addison Higham <addis...@gmail.com> wrote:

> Our implementation has quite a bit more going on just to deal with
> serialization of types, but here is pretty much the core of what we do in
> (psuedo) scala:
>
> class DynamoSink[...](...) extends RichProcessFunction[T] with
> ProcessingTimeCallback {
>
>   private var curTimer: Option[Long] = None
>   private var toWrite: ListBuffer[T] = new ListBuffer[T]
>   def timerService: ProcessingTimeService
> = 
> getRuntimeContext.asInstanceOf[StreamingRuntimeContext].getProcessingTimeService
>
>   override def invoke(element: T, context: Context[_]): Unit = {
>     if (curTimer.isEmpty) {
>       val timer = context.currentProcessingTime() +
> maxDelayTime.toMilliseconds
>       timerService.registerTimer(timer, this)
>       curTimer = Some(timer)
>     }
>
>     toWrite += element
>     if (toWrite.length >= 25) {
>       flushBufferedRecordsToDynamo()
>     }
>   }
>
>   override def onProcessingTime(timestamp: Long): Unit = {
>     if (toWrite.nonEmpty) {
>       flushBufferedRecordsToDynamo()
>     }
>     curTimer = None
>   }
>
>   def flushBufferedRecordsToDynamo(): Unit = {
>     val result      = batchWrite(toWrite)
>     val unprocessed = getUnprocessedItems(result)
>     toWrite.clear()
>     toWrite ++= unprocessed
>   }
>
>    // snapshot and restore the state of the toWrite buffer
>   override def snapshotState(context: FunctionSnapshotContext): Unit = ???
>
>   override def initializeState(context: FunctionInitializationContext):
> Unit = ???
>
>   // write to dynamo
>   def batchWrite(toWrite: ListBuffer[T]): List[BatchWriteItemResult] = ???
>
>   // capture any results in the BatchWriteItemResult that didn't get
> processed
>   def getUnprocessedItems(result: List[BatchWriteItemResult]):
> ListBuffer[T]  = ???
>
> }
>
> On Fri, Mar 22, 2019 at 10:59 AM Addison Higham <addis...@gmail.com>
> wrote:
>
>> Hi there,
>>
>> We have implemented a dynamo sink, have had no real issues, but
>> obviously, it is at-least-once and so we work around that by just
>> structuring our transformations so that they produce idempotent writes.
>>
>> What we do is pretty similar to what you suggest, we collect the records
>> in a buffer (which needs to be checkpointed)
>> However, instead of using the checkpointedFunction interface to trigger
>> batch writes, we use a RichSinkFunction and have a process time callback.
>> For each record, we ensure there is a process time callback set for some
>> configurable time in the future and add it to the buffer.  Dynamodb has a
>> max batch size of 25 records, so if the batch is full, we immediately
>> trigger a write.
>>
>> The process time callback serves mostly for situations with low  message
>> volume so that we can control the max amount of latency a record will have.
>>
>> We initially looked at using the Checkpointed function, but due to
>> dynamos relatively small batch write size we knew that we would need to be
>> flushing really often and that batches would never get very big so state
>> would be minimal.
>>
>> Hopefully that helps!
>>
>> On Fri, Mar 22, 2019 at 9:49 AM Vivek <vivek.tha...@gmail.com> wrote:
>>
>>>
>>> Hello,
>>>
>>> I am currently using Dynamodb as a persistent store in my application,
>>> and in process of transforming the application into streaming pipeline
>>> utilizing Flink. Pipeline is simple and stateless, consume data from Kafka,
>>> apply some transformation and write records to Dynamodb.
>>> I would want to collect records in buffer and batch write them instead
>>> of writing one at a time. Also want to put retry and exponential back off
>>> for unprocessed records in batch.
>>>
>>> My first question , is Dynamodb a suitable Sink? I googled and searched
>>> on mailing list archive, but couldn’t find anyone having that requirement.
>>>
>>> What are some of the guidelines around implementing a fault tolerant
>>> Sink with at least once guarantees? I am looking at ElasticSearch
>>> implementation and it does have the capabilities that I want, Could I just
>>> try implementing Dynamodb Sink similar way?
>>>
>>> Basically, implement Checkpointed interface,
>>> Collect records in buffer in invoke method, flush buffer and
>>> checkEerrors in snapshot method,
>>> Flush method will call Dynamodb’s batchwriteitem and if there are any
>>> unprocessed records add them back in buffer. FailureHandler will attempt to
>>> flush buffer again or do nothing or fail the sink based on failure handling
>>> policy.
>>>
>>> Please let me know if am in right direct
>>>
>>> Thank You
>>
>>

Reply via email to