tsreaper commented on code in PR #550: URL: https://github.com/apache/flink-table-store/pull/550#discussion_r1116611229
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java: ########## @@ -38,6 +39,8 @@ FileStoreWrite<T> withIOManager(IOManager ioManager); + FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool); Review Comment: Why do we need this? ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java: ########## @@ -131,7 +130,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException { } @Override - public TableWrite newWrite(String commitUser) { + public TableWriteImpl<?> newWrite(String commitUser) { Review Comment: `TableWriteImpl<?>` -> `TableWriteImpl<RowData>`? There are also similar issues in `ChangelogValueCountFileStoreTable` and `ChangelogWithKeyFileStoreTable`. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java: ########## @@ -84,6 +97,58 @@ public interface Table extends Serializable { /** Create a {@link TableRead} to read {@link Split}s. */ TableRead newRead(); + /** Create a {@link TableWrite} to write {@link InternalRow}s. */ + TableWrite newWrite(String commitUser); + + /** + * Create a {@link TableWrite} to write {@link InternalRow}s. + * + * <p>NOTE: This method will generate random commitUser. If you want to generate multiple + * commits, please use commitUser that is consistent with {@link #newCommit}. + */ + default TableWrite newWrite() { + return newWrite(UUID.randomUUID().toString()); + } + + /** Create a {@link TableCommit} to commit {@link CommitMessage}s. */ + TableCommit newCommit(String commitUser); + + /** + * Create a {@link TableCommit} to commit {@link CommitMessage}s. + * + * <p>NOTE: This method will generate random commitUser. If you want to generate multiple + * commits, please use commitUser that is consistent with {@link #newWrite}. + */ + default TableCommit newCommit() { + return newCommit(UUID.randomUUID().toString()); + } + + /** + * Delete according to filters. + * + * <p>NOTE: This method is only suitable for deletion of small amount of data. + */ + default void deleteWhere(String commitUser, List<Predicate> filters, Lock.Factory lockFactory) { Review Comment: We can randomly generate a `commitUser`. Just make sure both `TableWrite` and `TableCommit` use the same `commitUser`. This method doesn't seem suitable here, because all other methods, like `newWrite` and `newCommit` only returns a utility object without performing actual operation, but this method is different. Maybe moving this method to some `TableUtils` utility class? ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java: ########## @@ -18,121 +18,47 @@ package org.apache.flink.table.store.table.sink; -import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.operation.FileStoreCommit; -import org.apache.flink.table.store.file.operation.FileStoreExpire; +import org.apache.flink.table.store.annotation.Experimental; import org.apache.flink.table.store.file.operation.Lock; -import org.apache.flink.table.store.file.operation.PartitionExpire; +import org.apache.flink.table.store.table.Table; import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** - * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide - * snapshot commit and expiration. + * Commit of {@link Table} to provide {@link CommitMessage} committing. + * + * <p>According to the options, the expiration of snapshots and partitions will be completed in + * commit. + * + * @since 0.4.0 */ -public class TableCommit implements AutoCloseable { - - private final FileStoreCommit commit; - @Nullable private final FileStoreExpire expire; - @Nullable private final PartitionExpire partitionExpire; - - @Nullable private List<Map<String, String>> overwritePartitions = null; - @Nullable private Lock lock; - - public TableCommit( - FileStoreCommit commit, - @Nullable FileStoreExpire expire, - @Nullable PartitionExpire partitionExpire) { - this.commit = commit; - this.expire = expire; - this.partitionExpire = partitionExpire; - } - - public TableCommit withOverwritePartition(@Nullable Map<String, String> overwritePartition) { - if (overwritePartition != null) { - this.overwritePartitions = Collections.singletonList(overwritePartition); - } - return this; - } +@Experimental +public interface TableCommit extends AutoCloseable { - public TableCommit withOverwritePartitions( - @Nullable List<Map<String, String>> overwritePartitions) { - this.overwritePartitions = overwritePartitions; + default TableCommit withOverwritten() { Review Comment: This method name is also confusing too. By calling this method nothing should be overwritten? ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/Table.java: ########## @@ -84,6 +97,58 @@ public interface Table extends Serializable { /** Create a {@link TableRead} to read {@link Split}s. */ TableRead newRead(); + /** Create a {@link TableWrite} to write {@link InternalRow}s. */ + TableWrite newWrite(String commitUser); + + /** + * Create a {@link TableWrite} to write {@link InternalRow}s. + * + * <p>NOTE: This method will generate random commitUser. If you want to generate multiple + * commits, please use commitUser that is consistent with {@link #newCommit}. + */ + default TableWrite newWrite() { + return newWrite(UUID.randomUUID().toString()); + } Review Comment: This method and also the `newCommit()` method seems useless, because `TableWrite` and `TableCommit` must be used together and must have the same `user` (otherwise the written records will not be committed). ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java: ########## @@ -18,121 +18,47 @@ package org.apache.flink.table.store.table.sink; -import org.apache.flink.table.store.file.manifest.ManifestCommittable; -import org.apache.flink.table.store.file.operation.FileStoreCommit; -import org.apache.flink.table.store.file.operation.FileStoreExpire; +import org.apache.flink.table.store.annotation.Experimental; import org.apache.flink.table.store.file.operation.Lock; -import org.apache.flink.table.store.file.operation.PartitionExpire; +import org.apache.flink.table.store.table.Table; import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** - * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide - * snapshot commit and expiration. + * Commit of {@link Table} to provide {@link CommitMessage} committing. + * + * <p>According to the options, the expiration of snapshots and partitions will be completed in + * commit. + * + * @since 0.4.0 */ -public class TableCommit implements AutoCloseable { - - private final FileStoreCommit commit; - @Nullable private final FileStoreExpire expire; - @Nullable private final PartitionExpire partitionExpire; - - @Nullable private List<Map<String, String>> overwritePartitions = null; - @Nullable private Lock lock; - - public TableCommit( - FileStoreCommit commit, - @Nullable FileStoreExpire expire, - @Nullable PartitionExpire partitionExpire) { - this.commit = commit; - this.expire = expire; - this.partitionExpire = partitionExpire; - } - - public TableCommit withOverwritePartition(@Nullable Map<String, String> overwritePartition) { - if (overwritePartition != null) { - this.overwritePartitions = Collections.singletonList(overwritePartition); - } - return this; - } +@Experimental +public interface TableCommit extends AutoCloseable { - public TableCommit withOverwritePartitions( - @Nullable List<Map<String, String>> overwritePartitions) { - this.overwritePartitions = overwritePartitions; + default TableCommit withOverwritten() { + withOverwritten(Collections.emptyMap()); return this; } - public TableCommit withLock(Lock lock) { - commit.withLock(lock); - - if (expire != null) { - expire.withLock(lock); + default TableCommit withOverwritten(@Nullable Map<String, String> staticPartition) { Review Comment: Why change method name? Specifically why use `withOverwritten` instead of `withOverwrite`? It is hard for users to understand what they'll overwrite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org