Hi Caizhi,

Here is my current configuration:

val dynamoDBSinkConfig: DynamoDBSinkConfig =
  (new DynamoDBSinkConfig.Builder).batchSize(50).queueLimit(20).build()

new FlinkDynamoDBSink[Row](
  dynamoDBBuilder,
  "tablename",
  dynamoDBSinkConfig,
  mapper
)


I think this is batch write.




On Tue, Dec 7, 2021 at 6:34 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> Thanks for the clarification.
>
> I'm not familiar with DynamoDB and you might want to modify this connector
> a bit. Will a WriteRequest immediately send write requests to the database?
> If yes you may want to instead cache the requests in memory and send them
> only at snapshots. See [1] for the code to deal with incoming records and
> [2] for snapshots.
>
> [1]
> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127
> [2]
> https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202
>
> Jing Lu <ajin...@gmail.com> 于2021年12月8日周三 10:22写道:
>
>> Hi Cazhi,
>>
>> Thanks for your reply! The database is DynamoDB. The connector I use is
>> https://github.com/klarna-incubator/flink-connector-dynamodb. My source
>> is a continuous event stream. My Flink version is 1.12.
>>
>> Best,
>> Jing
>>
>> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> Which database are you referring to? If there is no officially supported
>>> connector of this database you can create your own sink operator
>>> with GenericWriteAheadSink.
>>>
>>> Note that if you're using Flink < 1.14 and if your source is bounded
>>> (that is to say, your source will eventually come to an end and finishes
>>> the job) you might lose the last bit of result. See [1] for detail.
>>>
>>> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526
>>>
>>> Jing Lu <ajin...@gmail.com> 于2021年12月8日周三 05:51写道:
>>>
>>>> Hi, community
>>>>
>>>> I have a Kafka stream and want to use Flink for 10 minutes aggregation.
>>>> However, the number of events is large, and the writes are throttled for
>>>> the output database for a few seconds during an hour. I was thinking to
>>>> write from Flink to another Kafka stream and using another Flink app to
>>>> write to a database. Will this smooth the writing? What should I do for the
>>>> second Flink app?
>>>>
>>>>
>>>> Thanks,
>>>> Jing
>>>>
>>>

Reply via email to