tsreaper commented on code in PR #430:
URL: https://github.com/apache/flink-table-store/pull/430#discussion_r1044204129


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java:
##########
@@ -36,4 +44,24 @@ public interface SnapshotEnumerator {
      */
     @Nullable
     DataTableScan.DataFilePlan enumerate();
+
+    static void validateContinuous(TableSchema schema) {
+        CoreOptions options = new CoreOptions(schema.options());
+        MergeEngine mergeEngine = options.mergeEngine();
+        HashMap<MergeEngine, String> mergeEngineDesc =
+                new HashMap<MergeEngine, String>() {
+                    {
+                        put(MergeEngine.PARTIAL_UPDATE, "Partial update");
+                        put(MergeEngine.AGGREGATE, "Pre-aggregate");
+                    }
+                };
+        if (schema.primaryKeys().size() > 0
+                && mergeEngineDesc.containsKey(mergeEngine)
+                && options.changelogProducer() != FULL_COMPACTION) {
+            throw new ValidationException(
+                    mergeEngineDesc.get(mergeEngine)
+                            + " continuous reading is not supported. "
+                            + "You can use full compaction changelog producer 
to support streaming reading.");
+        }
+    }

Review Comment:
   Why not move this to `ContinuousDataFileSnapshotEnumerator` and rename it as 
`validate`? This validation is only about streaming reads of data files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to