JingsongLi commented on code in PR #411:
URL: https://github.com/apache/flink-table-store/pull/411#discussion_r1036797509


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/SnapshotEnumerator.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Enumerate incremental changes from newly created snapshots.
+ *
+ * <p>The first call to this enumerator will produce a {@link 
DataTableScan.DataFilePlan} containing
+ * the base files for the following incremental changes (or just return null 
if there are no base
+ * files).
+ *
+ * <p>Following calls to this enumerator will produce {@link 
DataTableScan.DataFilePlan}s containing
+ * incremental changed files. If there is currently no newer snapshots, null 
will be returned
+ * instead.
+ */
+public interface SnapshotEnumerator extends 
Callable<DataTableScan.DataFilePlan> {}

Review Comment:
   Can it not be a `Callable`? I found it is hard to find the caller in IDE. 
Like:
   ```
   SnapshotEnumerator {
      @Nullable
      DataTableScan.DataFilePlan enumerate();
   }
   ```



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import 
org.apache.flink.table.store.table.source.snapshot.DeltaSnapshotEnumerator;
+import 
org.apache.flink.table.store.table.source.snapshot.FullCompactionChangelogSnapshotEnumerator;
+import 
org.apache.flink.table.store.table.source.snapshot.InputChangelogSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/** Unbounded {@link FlinkSource} for reading records. It continuously 
monitors new snapshots. */
+public class ContinuousFileStoreSource extends FlinkSource {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable table;
+    private final long discoveryInterval;
+
+    public ContinuousFileStoreSource(
+            FileStoreTable table,
+            long discoveryInterval,
+            @Nullable int[][] projectedFields,
+            @Nullable Predicate predicate,
+            @Nullable Long limit) {
+        super(table, projectedFields, predicate, limit);
+        this.table = table;
+        this.discoveryInterval = discoveryInterval;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            PendingSplitsCheckpoint checkpoint) {
+        DataTableScan scan = table.newScan();
+        if (predicate != null) {
+            scan.withFilter(predicate);
+        }
+
+        Long nextSnapshotId;

Review Comment:
   Minor:
   ```
   Long nextSnapshotId = null;
   Collection<FileStoreSourceSplit> splits = new ArrayList<>();
   if (checkpoint != null) {
       nextSnapshotId = checkpoint.currentSnapshotId();
       splits = checkpoint.splits();
   }
   ```



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java:
##########
@@ -67,7 +78,12 @@ public PendingSplitsCheckpoint deserialize(int version, 
byte[] serialized) throw
             view.readFully(bytes);
             splits.add(splitSerializer.deserialize(version, bytes));
         }
+
         long currentSnapshotId = view.readLong();
-        return new PendingSplitsCheckpoint(splits, currentSnapshotId);
+        if (currentSnapshotId == INVALID_SNAPSHOT) {

Review Comment:
   Minor: `? :`



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileStoreSource.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import 
org.apache.flink.table.store.table.source.snapshot.DeltaSnapshotEnumerator;
+import 
org.apache.flink.table.store.table.source.snapshot.FullCompactionChangelogSnapshotEnumerator;
+import 
org.apache.flink.table.store.table.source.snapshot.InputChangelogSnapshotEnumerator;
+import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/** Unbounded {@link FlinkSource} for reading records. It continuously 
monitors new snapshots. */
+public class ContinuousFileStoreSource extends FlinkSource {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable table;
+    private final long discoveryInterval;
+
+    public ContinuousFileStoreSource(
+            FileStoreTable table,
+            long discoveryInterval,
+            @Nullable int[][] projectedFields,
+            @Nullable Predicate predicate,
+            @Nullable Long limit) {
+        super(table, projectedFields, predicate, limit);
+        this.table = table;
+        this.discoveryInterval = discoveryInterval;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
+            SplitEnumeratorContext<FileStoreSourceSplit> context,
+            PendingSplitsCheckpoint checkpoint) {
+        DataTableScan scan = table.newScan();
+        if (predicate != null) {
+            scan.withFilter(predicate);
+        }
+
+        Long nextSnapshotId;
+        Collection<FileStoreSourceSplit> splits;
+        if (checkpoint == null) {
+            nextSnapshotId = null;
+            splits = new ArrayList<>();
+        } else {
+            nextSnapshotId = checkpoint.currentSnapshotId();
+            splits = checkpoint.splits();
+        }
+
+        return new ContinuousFileSplitEnumerator(
+                context,
+                splits,
+                nextSnapshotId,
+                discoveryInterval,
+                createSnapshotEnumerator(scan, nextSnapshotId));
+    }
+
+    private SnapshotEnumerator createSnapshotEnumerator(DataTableScan scan, 
Long nextSnapshotId) {

Review Comment:
   Maybe we can extract this method to `SnapshotEnumerator`:
   ```
   private static SnapshotEnumerator createSnapshotEnumerator(FileStoreTable 
table, DataTableScan scan, Long nextSnapshotId)
   ```



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+/** Abstract class for all {@link SnapshotEnumerator}s which enumerate record 
related data files. */
+public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataFileSnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+    private final DataTableScan scan;
+    private final CoreOptions.LogStartupMode startupMode;
+    private @Nullable final Long startupMillis;
+
+    private @Nullable Long nextSnapshotId;
+
+    public DataFileSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            CoreOptions.LogStartupMode startupMode,
+            @Nullable Long startupMillis,
+            @Nullable Long nextSnapshotId) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.startupMode = startupMode;
+        this.startupMillis = startupMillis;
+
+        this.nextSnapshotId = nextSnapshotId;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan call() {
+        if (nextSnapshotId == null) {
+            return firstCall();
+        } else {
+            return nextCall();
+        }
+    }
+
+    private DataTableScan.DataFilePlan firstCall() {
+        Long startingSnapshotId = snapshotManager.latestSnapshotId();
+        if (startingSnapshotId == null) {
+            LOG.debug("There is currently no snapshot. Wait for the snapshot 
generation.");
+            return null;
+        }
+
+        DataTableScan.DataFilePlan plan;
+        switch (startupMode) {
+            case FULL:
+                plan = 
scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+                break;
+            case FROM_TIMESTAMP:
+                Preconditions.checkNotNull(
+                        startupMillis,
+                        String.format(
+                                "%s can not be null when you use %s for %s",
+                                CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+                                CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                                CoreOptions.LOG_SCAN.key()));
+                startingSnapshotId = 
snapshotManager.earlierThanTimeMills(startupMillis);
+                plan = new DataTableScan.DataFilePlan(startingSnapshotId, 
Collections.emptyList());
+                break;
+            case LATEST:
+                plan = new DataTableScan.DataFilePlan(startingSnapshotId, 
Collections.emptyList());
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown log startup mode " + startupMode.name());
+        }
+
+        nextSnapshotId = startingSnapshotId + 1;
+        return plan;
+    }
+
+    private DataTableScan.DataFilePlan nextCall() {
+        while (true) {
+            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+                LOG.debug(
+                        "Next snapshot id {} does not exist, wait for the 
snapshot generation.",
+                        nextSnapshotId);
+                return null;
+            }
+
+            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+
+            if (shouldSkipSnapshot(snapshot)) {
+                nextSnapshotId++;
+                continue;
+            }
+
+            LOG.debug("Find snapshot id {}.", nextSnapshotId);
+            DataTableScan.DataFilePlan plan = 
getPlan(scan.withSnapshot(nextSnapshotId));
+            nextSnapshotId++;
+            return plan;
+        }
+    }
+
+    protected abstract boolean shouldSkipSnapshot(Snapshot snapshot);

Review Comment:
   Minor: revert to `shouldReadSnapshot`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+/** Abstract class for all {@link SnapshotEnumerator}s which enumerate record 
related data files. */
+public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataFileSnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+    private final DataTableScan scan;
+    private final CoreOptions.LogStartupMode startupMode;
+    private @Nullable final Long startupMillis;
+
+    private @Nullable Long nextSnapshotId;
+
+    public DataFileSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            CoreOptions.LogStartupMode startupMode,
+            @Nullable Long startupMillis,
+            @Nullable Long nextSnapshotId) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.startupMode = startupMode;
+        this.startupMillis = startupMillis;
+
+        this.nextSnapshotId = nextSnapshotId;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan call() {
+        if (nextSnapshotId == null) {
+            return firstCall();
+        } else {
+            return nextCall();
+        }
+    }
+
+    private DataTableScan.DataFilePlan firstCall() {
+        Long startingSnapshotId = snapshotManager.latestSnapshotId();
+        if (startingSnapshotId == null) {
+            LOG.debug("There is currently no snapshot. Wait for the snapshot 
generation.");
+            return null;
+        }
+
+        DataTableScan.DataFilePlan plan;
+        switch (startupMode) {
+            case FULL:
+                plan = 
scan.withKind(ScanKind.ALL).withSnapshot(startingSnapshotId).plan();
+                break;
+            case FROM_TIMESTAMP:
+                Preconditions.checkNotNull(
+                        startupMillis,
+                        String.format(
+                                "%s can not be null when you use %s for %s",
+                                CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+                                CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                                CoreOptions.LOG_SCAN.key()));
+                startingSnapshotId = 
snapshotManager.earlierThanTimeMills(startupMillis);
+                plan = new DataTableScan.DataFilePlan(startingSnapshotId, 
Collections.emptyList());
+                break;
+            case LATEST:
+                plan = new DataTableScan.DataFilePlan(startingSnapshotId, 
Collections.emptyList());
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown log startup mode " + startupMode.name());
+        }
+
+        nextSnapshotId = startingSnapshotId + 1;
+        return plan;
+    }
+
+    private DataTableScan.DataFilePlan nextCall() {

Review Comment:
   `nextEnumerate`?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializer.java:
##########
@@ -46,19 +48,28 @@ public int getVersion() {
     public byte[] serialize(PendingSplitsCheckpoint pendingSplitsCheckpoint) 
throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+
         view.writeInt(pendingSplitsCheckpoint.splits().size());
         for (FileStoreSourceSplit split : pendingSplitsCheckpoint.splits()) {
             byte[] bytes = splitSerializer.serialize(split);
             view.writeInt(bytes.length);
             view.write(bytes);
         }
-        view.writeLong(pendingSplitsCheckpoint.currentSnapshotId());
+
+        Long currentSnapshotId = pendingSplitsCheckpoint.currentSnapshotId();
+        if (currentSnapshotId == null) {

Review Comment:
   Minor: view.writeLong(currentSnapshotId == null ? INVALID_SNAPSHOT : 
currentSnapshotId);



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java:
##########
@@ -82,31 +85,19 @@ public TableStreamingReader(
     }
 
     @Nullable
-    public Iterator<RowData> nextBatch() throws IOException {
+    public Iterator<RowData> nextBatch() throws Exception {
         if (enumerator == null) {
             DataTableScan scan = table.newScan();
             if (predicate != null) {
                 scan.withFilter(predicate);
             }
-            DataTableScan.DataFilePlan plan = scan.plan();
-            if (plan.snapshotId == null) {
-                return null;
-            }
-            long snapshotId = plan.snapshotId;
             enumerator =
-                    new SnapshotEnumerator(
-                            table.location(),
-                            scan.withIncremental(true),
-                            table.options().changelogProducer(),
-                            snapshotId);
-            return read(plan);
-        } else {
-            SnapshotEnumerator.EnumeratorResult result = enumerator.call();
-            if (result == null) {
-                return null;
-            }
-            return read(result.plan);
+                    new DeltaSnapshotEnumerator(

Review Comment:
   Maybe we can put these in constructor?
   The change here is that initialization does not need to generate a batch.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/snapshot/DataFileSnapshotEnumerator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source.snapshot;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.operation.ScanKind;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+/** Abstract class for all {@link SnapshotEnumerator}s which enumerate record 
related data files. */
+public abstract class DataFileSnapshotEnumerator implements SnapshotEnumerator 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataFileSnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+    private final DataTableScan scan;
+    private final CoreOptions.LogStartupMode startupMode;
+    private @Nullable final Long startupMillis;
+
+    private @Nullable Long nextSnapshotId;
+
+    public DataFileSnapshotEnumerator(
+            Path tablePath,
+            DataTableScan scan,
+            CoreOptions.LogStartupMode startupMode,
+            @Nullable Long startupMillis,
+            @Nullable Long nextSnapshotId) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.startupMode = startupMode;
+        this.startupMillis = startupMillis;
+
+        this.nextSnapshotId = nextSnapshotId;
+    }
+
+    @Override
+    public DataTableScan.DataFilePlan call() {
+        if (nextSnapshotId == null) {
+            return firstCall();
+        } else {
+            return nextCall();
+        }
+    }
+
+    private DataTableScan.DataFilePlan firstCall() {

Review Comment:
   `tryFirstEnumerate`? Because may no first when there is no snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to