dianfu commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911793908


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 
'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new 
query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For 
efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with 
token awareness.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, 
if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts 
have been tried.
+
+        This policy is not datacenter aware and will include every known 
Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be 
inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = 
get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra 
cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a 
new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, 
max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        The default load reconnection policy.
+
+        A reconnection policy that waits exponentially longer between each 
reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ExponentialReconnectionPolicy
+        return 
ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 
'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each 
reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return 
ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry 
should be specified.
+    - IGNORE: no retry should be attempted and the exception should be 
ignored. In that case, the
+              operation that triggered the Cassandra exception will return an 
empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def consistency_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.
+
+        This policy retries queries in only two cases:
+        - On a read timeout, retries once on the same host if enough replicas 
replied but data was
+          not retrieved.
+        - On a write timeout, retries once on the same host if we timeout 
while writing the
+          distributed log used by batch statements.
+        - On an unavailable exception, retries once on the next host.
+        - On a request error, such as a client timeout, the query is retried 
on the next host.
+          Do not retry on read or write failures.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return RetryPolicy(JPolicies.defaultRetryPolicy())
+
+    @staticmethod
+    def fallthrough_retry_policy() -> 'RetryPolicy':
+        """

Review Comment:
   Add LoggingRetryPolicy?



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 
'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new 
query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For 
efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with 
token awareness.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, 
if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts 
have been tried.
+
+        This policy is not datacenter aware and will include every known 
Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be 
inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = 
get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra 
cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a 
new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, 
max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        The default load reconnection policy.
+
+        A reconnection policy that waits exponentially longer between each 
reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ExponentialReconnectionPolicy
+        return 
ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 
'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each 
reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return 
ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry 
should be specified.
+    - IGNORE: no retry should be attempted and the exception should be 
ignored. In that case, the
+              operation that triggered the Cassandra exception will return an 
empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def consistency_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.
+
+        This policy retries queries in only two cases:
+        - On a read timeout, retries once on the same host if enough replicas 
replied but data was
+          not retrieved.
+        - On a write timeout, retries once on the same host if we timeout 
while writing the
+          distributed log used by batch statements.
+        - On an unavailable exception, retries once on the next host.
+        - On a request error, such as a client timeout, the query is retried 
on the next host.
+          Do not retry on read or write failures.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return RetryPolicy(JPolicies.defaultRetryPolicy())
+
+    @staticmethod
+    def fallthrough_retry_policy() -> 'RetryPolicy':
+        """
+        A retry policy that never retries (nor ignores).
+        """
+        JFallthroughRetryPolicy = 
get_gateway().jvm.com.datastax.driver.core.policies. \
+            FallthroughRetryPolicy
+        return RetryPolicy(JFallthroughRetryPolicy.INSTANCE)
+
+
+class SpeculativeExecutionPolicy(object):
+    """
+    The policy that decides if the driver will send speculative queries to the 
next hosts when the
+    current host takes too long to respond.
+
+    Note that only idempotent statements will be speculatively retried.
+    """
+
+    def __init__(self, j_speculative_execution_policy):
+        self._j_speculative_execution_policy = j_speculative_execution_policy
+

Review Comment:
   Also add PercentileSpeculativeExecutionPolicy?



##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleClusterBuilder.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metrics;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.policies.AddressTranslator;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.Policies;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+/** A Simple ClusterBuilder which is currently used in PyFlink Cassandra 
connector. */
+public class SimpleClusterBuilder extends ClusterBuilder implements 
Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static Cluster.Builder clusterBuilder;

Review Comment:
   ```suggestion
       private final Cluster.Builder clusterBuilder;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to