dianfu commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911637191


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+

Review Comment:
   Update the unit tests for the added functionalities?
   PS: the test_connector.py has been moved to directory 
pyflink/datastream/connectors/tests/. Need to rebase the PR when adding tests.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new 
query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For 
efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':

Review Comment:
   ```suggestion
       def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
   ```



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -135,6 +279,168 @@ class ClusterBuilder(object):
     def __init__(self, j_cluster_builder):
         self._j_cluster_builder = j_cluster_builder

Review Comment:
   If we introduce class SimpleClusterBuilder, we could set 
self._j_cluster_builder to SimpleClusterBuilder() in the constructor.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new 
query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For 
efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with 
token awareness.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, 
if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts 
have been tried.
+
+        This policy is not datacenter aware and will include every known 
Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be 
inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = 
get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra 
cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a 
new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 
'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each 
reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return 
ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, 
max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits exponentially longer between each 
reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return 
ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry 
should be specified.
+    - IGNORE: no retry should be attempted and the exception should be 
ignored. In that case, the
+              operation that triggered the Cassandra exception will return an 
empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def default_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.

Review Comment:
   It's not clear what's the behavior of the default retry policy. It would be 
great to document it clearly. Besides, it would be great to also update the 
method name accordingly.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -135,6 +279,168 @@ class ClusterBuilder(object):
     def __init__(self, j_cluster_builder):
         self._j_cluster_builder = j_cluster_builder
 
+    def with_cluster_name(self, name: str) -> 'ClusterBuilder':
+        """
+        An optional name for the creation cluster.
+
+        Note: this is not related to the Cassandra cluster name (though you 
are free to provide the
+        same name).
+        """
+        self._j_cluster_builder.withClusterName(name)

Review Comment:
   Where the _j_cluster_builder comes from? I guess we need to introduce a 
class SimpleClusterBuilder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to