tsreaper commented on code in PR #315: URL: https://github.com/apache/flink-table-store/pull/315#discussion_r996658295
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java: ########## @@ -122,48 +139,61 @@ public void write(KeyValue kv) throws Exception { @Override public long memoryOccupancy() { - return memTable.memoryOccupancy(); + return writeBuffer.memoryOccupancy(); } @Override public void flushMemory() throws Exception { - if (memTable.size() > 0) { + boolean success = writeBuffer.flushMemory(); + if (!success) { + flushWriteBuffer(); + } + } + + private void flushWriteBuffer() throws Exception { + if (writeBuffer.size() > 0) { if (compactManager.shouldWaitCompaction()) { // stop writing, wait for compaction finished trySyncLatestCompaction(true); } - // write changelog file - // - // NOTE: We must first call memTable.rawIterator(), then call memTable.mergeIterator(). - // Otherwise memTable.rawIterator() will generate sorted, but not yet merged, data. See - // comments on MemTable for more details. - if (changelogProducer == ChangelogProducer.INPUT) { - KeyValueDataFileWriter changelogWriter = writerFactory.createChangelogFileWriter(0); - changelogWriter.write(memTable.rawIterator()); - changelogWriter.close(); - changelogFiles.add(changelogWriter.result()); + final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter = + changelogProducer == ChangelogProducer.INPUT + ? writerFactory.createRollingChangelogFileWriter(0) + : null; Review Comment: Create `changelogWriter` when creating `MergeTreeWriter`. Then in `MergeTreeWriter#write` we write into `changelogWriter` and `writeBuffer` at the same time. In this way we can keep the order of changelog. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java: ########## @@ -189,6 +189,30 @@ public class CoreOptions implements Serializable { .withDescription( "Amount of data to build up in memory before converting to a sorted on-disk file."); + public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE = + ConfigOptions.key("write-buffer-spillable") + .booleanType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("Whether the write buffer can be spillable.") + .list( + text( + "When `input` changelog-producer is not enabled, it is enabled by default."), + text( + "When `input` changelog-producer is started, it is closed by default," Review Comment: When changelog producer is set to input it is also enabled by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org