Hi Caizhi, Here is my current configuration:
val dynamoDBSinkConfig: DynamoDBSinkConfig = (new DynamoDBSinkConfig.Builder).batchSize(50).queueLimit(20).build() new FlinkDynamoDBSink[Row]( dynamoDBBuilder, "tablename", dynamoDBSinkConfig, mapper ) I think this is batch write. On Tue, Dec 7, 2021 at 6:34 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > Thanks for the clarification. > > I'm not familiar with DynamoDB and you might want to modify this connector > a bit. Will a WriteRequest immediately send write requests to the database? > If yes you may want to instead cache the requests in memory and send them > only at snapshots. See [1] for the code to deal with incoming records and > [2] for snapshots. > > [1] > https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L127 > [2] > https://github.com/klarna-incubator/flink-connector-dynamodb/blob/10e8abaec2ef5473618efba41c31484dadb9ad39/src/main/java/com/klarna/flink/connectors/dynamodb/FlinkDynamoDBSink.java#L202 > > Jing Lu <ajin...@gmail.com> 于2021年12月8日周三 10:22写道: > >> Hi Cazhi, >> >> Thanks for your reply! The database is DynamoDB. The connector I use is >> https://github.com/klarna-incubator/flink-connector-dynamodb. My source >> is a continuous event stream. My Flink version is 1.12. >> >> Best, >> Jing >> >> On Tue, Dec 7, 2021 at 6:15 PM Caizhi Weng <tsreape...@gmail.com> wrote: >> >>> Hi! >>> >>> Which database are you referring to? If there is no officially supported >>> connector of this database you can create your own sink operator >>> with GenericWriteAheadSink. >>> >>> Note that if you're using Flink < 1.14 and if your source is bounded >>> (that is to say, your source will eventually come to an end and finishes >>> the job) you might lose the last bit of result. See [1] for detail. >>> >>> [1] https://lists.apache.org/thread/qffl2pvnng9kkd51z5xp65x7ssnnm526 >>> >>> Jing Lu <ajin...@gmail.com> 于2021年12月8日周三 05:51写道: >>> >>>> Hi, community >>>> >>>> I have a Kafka stream and want to use Flink for 10 minutes aggregation. >>>> However, the number of events is large, and the writes are throttled for >>>> the output database for a few seconds during an hour. I was thinking to >>>> write from Flink to another Kafka stream and using another Flink app to >>>> write to a database. Will this smooth the writing? What should I do for the >>>> second Flink app? >>>> >>>> >>>> Thanks, >>>> Jing >>>> >>>