hlteoh37 commented on code in PR #151:
URL: 
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1726540874


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -40,9 +42,39 @@ public enum InitialPosition {
     public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS =
             ConfigOptions.key("flink.shard.discovery.intervalmillis")
                     .longType()
-                    .defaultValue(10000L)
+                    .defaultValue(60000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");
 
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
+            
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry build shard lineage 
if describestream returns inconsistent response");
+
+    public static final ConfigOption<Integer> DESCRIBE_STREAM_RETRY_CALL_COUNT 
=
+            ConfigOptions.key("flink.describestream.numretries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry describestream call 
if it returns a retriable exception");

Review Comment:
   nit: `retryable`



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -40,9 +42,39 @@ public enum InitialPosition {
     public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS =
             ConfigOptions.key("flink.shard.discovery.intervalmillis")
                     .longType()
-                    .defaultValue(10000L)
+                    .defaultValue(60000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");
 
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
+            
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry build shard lineage 
if describestream returns inconsistent response");
+
+    public static final ConfigOption<Integer> DESCRIBE_STREAM_RETRY_CALL_COUNT 
=
+            ConfigOptions.key("flink.describestream.numretries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry describestream call 
if it returns a retriable exception");
+
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN =

Review Comment:
   Can we just use the `Duration` type directly rather than use `min` and 
`intType`?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
+        shardGraphTracker.addNodes(discoveredSplits.getShards());
+
+        boolean streamDisabled = 
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+        int describeStreamInconsistencyResolutionCount =
+                
sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
+        for (int i = 0;
+                i < describeStreamInconsistencyResolutionCount
+                        && !streamDisabled
+                        && shardGraphTracker.inconsistencyDetected();
+                i++) {
+            ListShardsResult shardsToResolveInconsistencies =
+                    streamProxy.listShards(
+                            streamArn, 
shardGraphTracker.getEarliestClosedLeafNode());
+            
shardGraphTracker.addNodes(shardsToResolveInconsistencies.getShards());
+        }
+
+        if (shardGraphTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in describestream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + shardGraphTracker.getEarliestClosedLeafNode());
             return;

Review Comment:
   Would be good to add a reason why we return here and not throw and exception.
   
   I think we should also mention it on the description of 
`DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT` and also the connector 
overview!



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/ShardGraphTracker.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+/**
+ * Class to track the state of shard graph created as part of describestream 
operation. This will
+ * track for inconsistent shards returned due to describestream operation. 
Caller will have to call
+ * describestream again with the chronologically first leaf node to resolve 
inconsistencies.

Review Comment:
   Q: Should we make this logic part of this class? e.g. rename as 
`ShardGraphConsistencyDetector`? Not 100% sure of this suggestion. WDYT @gguptp 
@karubian 



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
+        shardGraphTracker.addNodes(discoveredSplits.getShards());
+
+        boolean streamDisabled = 
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+        int describeStreamInconsistencyResolutionCount =
+                
sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
+        for (int i = 0;
+                i < describeStreamInconsistencyResolutionCount
+                        && !streamDisabled
+                        && shardGraphTracker.inconsistencyDetected();
+                i++) {
+            ListShardsResult shardsToResolveInconsistencies =
+                    streamProxy.listShards(
+                            streamArn, 
shardGraphTracker.getEarliestClosedLeafNode());
+            
shardGraphTracker.addNodes(shardsToResolveInconsistencies.getShards());
+        }
+
+        if (shardGraphTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in describestream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + shardGraphTracker.getEarliestClosedLeafNode());
             return;
         }
 
-        for (DynamoDbStreamsShardSplit split : splits) {
-            splitAssignment.get(subtaskId).remove(split);
-            assignedSplitIds.remove(split.splitId());
-            unassignedSplits.add(split);
+        splitTracker.addSplits(shardGraphTracker.getNodes());
+        splitTracker.cleanupGarbageSplits(
+                shardGraphTracker.getNodes().stream()
+                        .map(shard -> shard.shardId())
+                        .collect(Collectors.toSet()));

Review Comment:
   nit: should the method just be `removeSplits()`? 



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -40,9 +42,39 @@ public enum InitialPosition {
     public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS =
             ConfigOptions.key("flink.shard.discovery.intervalmillis")
                     .longType()
-                    .defaultValue(10000L)
+                    .defaultValue(60000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");
 
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
+            
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry build shard lineage 
if describestream returns inconsistent response");
+
+    public static final ConfigOption<Integer> DESCRIBE_STREAM_RETRY_CALL_COUNT 
=
+            ConfigOptions.key("flink.describestream.numretries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry describestream call 
if it returns a retriable exception");
+
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN =
+            ConfigOptions.key("flink.describestream.backoff.mindelay")
+                    .intType()

Review Comment:
   let's use duration type here



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializer.java:
##########
@@ -29,15 +29,19 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 /** Used to serialize and deserialize the {@link 
DynamoDbStreamsSourceEnumeratorState}. */
 @Internal
 public class DynamoDbStreamsSourceEnumeratorStateSerializer
         implements 
SimpleVersionedSerializer<DynamoDbStreamsSourceEnumeratorState> {
 
-    private static final int CURRENT_VERSION = 0;
+    private static final Set<Integer> COMPATIBLE_VERSIONS = new 
HashSet<>(Arrays.asList(0, 1));

Review Comment:
   we have not actually released this connector - so we don't need to up the 
serializer version even though we are making backwards incompatible changes. 
This is different from the KDS connector. Can we keep the `CURRENT_VERSION` as 
0?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
+        shardGraphTracker.addNodes(discoveredSplits.getShards());
+
+        boolean streamDisabled = 
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+        int describeStreamInconsistencyResolutionCount =
+                
sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
+        for (int i = 0;
+                i < describeStreamInconsistencyResolutionCount
+                        && !streamDisabled
+                        && shardGraphTracker.inconsistencyDetected();
+                i++) {
+            ListShardsResult shardsToResolveInconsistencies =
+                    streamProxy.listShards(
+                            streamArn, 
shardGraphTracker.getEarliestClosedLeafNode());
+            
shardGraphTracker.addNodes(shardsToResolveInconsistencies.getShards());
+        }
+
+        if (shardGraphTracker.inconsistencyDetected()) {
+            LOG.error(
+                    "There are inconsistencies in describestream which we were 
not able to resolve. First leaf node on which inconsistency was detected:"
+                            + shardGraphTracker.getEarliestClosedLeafNode());
             return;
         }
 
-        for (DynamoDbStreamsShardSplit split : splits) {
-            splitAssignment.get(subtaskId).remove(split);
-            assignedSplitIds.remove(split.splitId());
-            unassignedSplits.add(split);
+        splitTracker.addSplits(shardGraphTracker.getNodes());

Review Comment:
   This is a little confusing - can we name the `splitGraphTracker` something 
like `splitGraphManager`?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(ListShardsResult discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
+        shardGraphTracker.addNodes(discoveredSplits.getShards());
+
+        boolean streamDisabled = 
discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
+        int describeStreamInconsistencyResolutionCount =
+                
sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);

Review Comment:
   can we abstract this away to at least a `private` method in this class, or 
even better a separate class like `DDBStreamConsistencyTracker`? Would make the 
code for `processDiscoveredSplits` easier to read (and test, if it's in a 
separate class)



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -162,75 +216,14 @@ private List<DynamoDbStreamsShardSplit> 
initialDiscoverSplits() {
      *
      * @return list of discovered splits
      */
-    private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() {
-        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
-
-        // Any shard discovered after the initial startup should be read from 
the start, since they
-        // come from resharding
-        return mapToSplits(shards, InitialPosition.TRIM_HORIZON);
-    }
-
-    private List<DynamoDbStreamsShardSplit> mapToSplits(
-            List<Shard> shards, InitialPosition initialPosition) {
-        StartingPosition startingPosition;
-        switch (initialPosition) {
-            case LATEST:
-                startingPosition = StartingPosition.latest();
-                break;
-            case TRIM_HORIZON:
-            default:
-                startingPosition = StartingPosition.fromStart();
-        }
-
-        List<DynamoDbStreamsShardSplit> splits = new ArrayList<>();
-        for (Shard shard : shards) {
-            splits.add(new DynamoDbStreamsShardSplit(streamArn, 
shard.shardId(), startingPosition));
-        }
-
-        return splits;
-    }
-
-    /**
-     * This method assigns a given set of DynamoDb Streams splits to the 
readers currently
-     * registered on the cluster. This assignment is done via a side-effect on 
the {@link
-     * SplitEnumeratorContext} object.
-     *
-     * @param discoveredSplits list of discovered splits
-     * @param throwable thrown when discovering splits. Will be null if no 
throwable thrown.
-     */
-    private void assignSplits(
-            List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable 
throwable) {
-        if (throwable != null) {
-            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
-        }
-
-        if (context.registeredReaders().size() < context.currentParallelism()) 
{
-            LOG.info(
-                    "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, Registered readers: {}",
-                    context.currentParallelism(),
-                    context.registeredReaders().size());
-            unassignedSplits.addAll(discoveredSplits);
-            return;
-        }
-
-        Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = 
new HashMap<>();
-        for (DynamoDbStreamsShardSplit split : unassignedSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-        unassignedSplits.clear();
-        for (DynamoDbStreamsShardSplit split : discoveredSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-
-        updateLastSeenShardId(discoveredSplits);
-        updateSplitAssignment(newSplitAssignments);
-        context.assignSplits(new SplitsAssignment<>(newSplitAssignments));
+    private List<Shard> periodicallyDiscoverSplits() {
+        return streamProxy.listShards(streamArn, null);
     }
 
     private void assignSplitToSubtask(
             DynamoDbStreamsShardSplit split,
             Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments) 
{
-        if (assignedSplitIds.contains(split.splitId())) {
+        if (splitTracker.isAssigned(split.splitId())) {
             LOG.info(
                     "Skipping assignment of shard {} from stream {} because it 
is already assigned.",

Review Comment:
   Do we expect this to be printed constantly, or just when there are data 
inconsistencies? If this is only printed during data inconsistencies, then we 
should keep it as `INFO`



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -43,6 +45,15 @@ public enum InitialPosition {
                     .defaultValue(10000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");
 
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
+            
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry build shard lineage 
if describestream returns inconsistent response");
+
+    public static final Duration MIN_DDB_STREAMS_SHARD_RETENTION = 
Duration.ofHours(6);

Review Comment:
   Let's add a comment on why this is 6 hours!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to