tsreaper commented on code in PR #433: URL: https://github.com/apache/flink-table-store/pull/433#discussion_r1046746710
########## flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java: ########## @@ -18,99 +18,70 @@ package org.apache.flink.table.store.connector.sink; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.store.file.predicate.PredicateConverter; -import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.file.utils.OffsetRowData; import org.apache.flink.table.store.table.FileStoreTable; -import org.apache.flink.table.store.table.sink.TableWrite; -import org.apache.flink.table.store.table.source.DataSplit; -import org.apache.flink.table.store.table.source.Split; -import org.apache.flink.table.store.table.source.TableScan; -import org.apache.flink.table.types.logical.RowType; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; /** A dedicated operator for manual triggered compaction. */ public class StoreCompactOperator extends PrepareCommitOperator { - private static final Logger LOG = LoggerFactory.getLogger(StoreCompactOperator.class); - private final FileStoreTable table; - private final String commitUser; - @Nullable private final Map<String, String> compactPartitionSpec; + private final StoreSinkWrite.Provider storeSinkWriteProvider; + private final boolean isStreaming; - private TableScan scan; - private TableWrite write; + private transient StoreSinkWrite write; + private transient RowDataSerializer partitionSerializer; + private transient OffsetRowData reusedPartition; public StoreCompactOperator( FileStoreTable table, - String commitUser, - @Nullable Map<String, String> compactPartitionSpec) { + StoreSinkWrite.Provider storeSinkWriteProvider, + boolean isStreaming) { + Preconditions.checkArgument( Review Comment: We'll introduce a `TableStoreCompactJob` so user can submit this job class to Flink. This job class will disable "write.compaction-skip" so user does not need to care about it. This `checkArgument` is just for sanity check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org