If I'm understanding you correctly you're just trying to do some data reduction so that you write data for each key once every five minutes rather than for every CDC update.. Is that correct? You also want to keep the state for most recent key you've ever seen so you don't apply writes out of order.
The code you've provided isn't quite right AFAICT. The issue is that the window never get's PURGED so the data just continues to accumulate in the window. This will grow without bound. My advise would be to take a look at ProcessFunction and write one that does exactly what you want rather than messing around with windows and triggers for this use case. It will be much simpler in the end. -Jamie On Thu, Jan 17, 2019 at 4:32 PM knur <cristian.k...@gmail.com> wrote: > Hello there. > > So we have some Postgres tables that are mutable, and we want to create a > snapshot of them in S3 every X minutes. So we plan to use Debezium to send > a > CDC log of every row change into a Kafka topic, and then have Flink keep > the > latest state of each row to save that data into S3 subsequently. > > Our current job looks like this and works somehow well in most cases: > > // checkpoint interval is set to run every 10 minutes > > kafkaSource > .keyB { it.id } > .window(GlobalWindows.create()) > .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.minutes(5))) > .reduce { left, right -> > if (left.timestamp() > right.timestamp()) { > left > } else { > right > } > } > .addSink(StreamingFileSink > .forBulkFormat(Path(outputDir), > ParquetAvroWriters.forGenericRecord(avroSchema)) > > > .withBucketAssigner(DateTimeBucketAssignerr("'date='yyyy-MM-dd/'hour='HH/'minute='mm")) > .build()) > > We use `GlobalWindows.create()` because we want to hold in Flink's state > ALL > the changes send into Kafka (the reduce function, according to the docs, > will make sure to evict all events except the last one). > > This works, but we know there could be some edge cases. For instance, if > the > trigger fires around the same time that a checkpoint, we could get into a > position where StreamingFileSink rolls an incomplete set of all the events > triggered. > > So a couple of questions: > > 1. Is there a way to mark the events with the timestamp of the trigger that > fired them? > 2. Is the approach we took fine? (keep in mind that we will deal with giant > tables, so a batch job that queries them every N seconds is not an option). > 3. Do you foresee any other edge cases? > > Thanks for taking a look at this. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >