tsreaper commented on code in PR #430: URL: https://github.com/apache/flink-table-store/pull/430#discussion_r1044204129
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java: ########## @@ -36,4 +44,24 @@ public interface SnapshotEnumerator { */ @Nullable DataTableScan.DataFilePlan enumerate(); + + static void validateContinuous(TableSchema schema) { + CoreOptions options = new CoreOptions(schema.options()); + MergeEngine mergeEngine = options.mergeEngine(); + HashMap<MergeEngine, String> mergeEngineDesc = + new HashMap<MergeEngine, String>() { + { + put(MergeEngine.PARTIAL_UPDATE, "Partial update"); + put(MergeEngine.AGGREGATE, "Pre-aggregate"); + } + }; + if (schema.primaryKeys().size() > 0 + && mergeEngineDesc.containsKey(mergeEngine) + && options.changelogProducer() != FULL_COMPACTION) { + throw new ValidationException( + mergeEngineDesc.get(mergeEngine) + + " continuous reading is not supported. " + + "You can use full compaction changelog producer to support streaming reading."); + } + } Review Comment: Why not move this to `ContinuousDataFileSnapshotEnumerator` and rename it as `validate`? This validation is only about streaming reads of data files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org