tsreaper commented on code in PR #550: URL: https://github.com/apache/flink-table-store/pull/550#discussion_r1121407946
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java: ########## @@ -18,39 +18,42 @@ package org.apache.flink.table.store.table.sink; +import org.apache.flink.table.store.annotation.Experimental; import org.apache.flink.table.store.data.BinaryRow; import org.apache.flink.table.store.data.InternalRow; import org.apache.flink.table.store.file.disk.IOManager; -import org.apache.flink.table.store.file.io.DataFileMeta; - -import java.util.List; +import org.apache.flink.table.store.table.Table; /** - * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to - * provide {@link InternalRow} writing. + * Write of {@link Table} to provide {@link InternalRow} writing. + * + * @since 0.4.0 */ +@Experimental public interface TableWrite extends AutoCloseable { - TableWrite withOverwrite(boolean overwrite); - + /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is set to true. */ TableWrite withIOManager(IOManager ioManager); - SinkRecord write(InternalRow rowData) throws Exception; + /** Calculate which partition {@code row} belongs to. */ + BinaryRow getPartition(InternalRow row); - /** Log record need to preserve original pk (which includes partition fields). */ - SinkRecord toLogRecord(SinkRecord record); + /** Calculate which bucket {@code row} belongs to. */ + int getBucket(InternalRow row); - void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception; + /** Write a row to the writer. */ + void write(InternalRow row) throws Exception; /** - * Notify that some new files are created at given snapshot in given bucket. + * Compact a bucket of a partition. By default, it will determine whether to perform the + * compaction according to the 'num-sorted-run.compaction-trigger' option. If fullCompaction is + * true, it will force a full compaction, which is expensive. * - * <p>Most probably, these files are created by another job. Currently this method is only used - * by the dedicated compact job to see files created by writer jobs. + * <p>NOTE: In Java API, full compaction is not automatically executed. If you open + * 'changelog-producer' of 'full-compaction', please execute this method regularly to produce Review Comment: open 'changelog-producer' of 'full-compaction' -> set 'full-compaction' to be 'changelog-producer' ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/InnerTableWrite.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.sink; + +import java.util.List; + +/** Inner {@link TableWrite} contains overwrite setter. */ +public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite { + + InnerTableWrite withOverwrite(boolean overwrite); + + @Override + default List<CommitMessage> prepareCommit() throws Exception { + return prepareCommit(true, BatchWriteBuilder.COMMIT_IDENTIFIER); Review Comment: Assert that this method is called only once. We should avoid users accidentally calling this method multiple times. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/InnerTableCommit.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.sink; + +import org.apache.flink.table.store.file.operation.Lock; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Inner {@link TableCommit} contains overwrite setter. */ +public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit { + + /** Overwrite writing, same as the 'INSERT OVERWRITE T PARTITION (...)' semantics of SQL. */ + default InnerTableCommit withOverwrite(@Nullable Map<String, String> staticPartition) { + if (staticPartition != null) { + withOverwrite(Collections.singletonList(staticPartition)); + } + return this; + } + + InnerTableCommit withOverwrite(@Nullable List<Map<String, String>> overwritePartitions); + + /** @deprecated lock should pass from table. */ + @Deprecated + InnerTableCommit withLock(Lock lock); + + @Override + default void commit(List<CommitMessage> commitMessages) { Review Comment: Assert that this method is called only once. We should avoid users accidentally calling this method multiple times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org