pengmide commented on code in PR #20120: URL: https://github.com/apache/flink/pull/20120#discussion_r911803654
########## flink-python/pyflink/datastream/connectors/cassandra.py: ########## @@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions': return self +# ---- Classes introduced to construct the ClusterBuilder ---- + + +class LoadBalancingPolicy(object): + """ + The policy that decides which Cassandra hosts to contact for each new query. + + The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy + is expected to exclude down hosts from query plans. + """ + + def __init__(self, j_load_balancing_policy): + self._j_load_balancing_policy = j_load_balancing_policy + + @staticmethod + def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy': + """ + The default load balancing policy. + + The default load balancing policy is DCAwareRoundRobinPolicy with token awareness. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy()) + + @staticmethod + def round_robin_policy() -> 'LoadBalancingPolicy': + """ + A Round-robin load balancing policy. + + This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the + next one (following the round-robin order) is tried, until all hosts have been tried. + + This policy is not datacenter aware and will include every known Cassandra hosts in its + round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will + want to use the DCAwareRoundRobinPolicy load balancing policy instead. + """ + JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy + return LoadBalancingPolicy(JRoundRobinPolicy()) + + +class ReconnectionPolicy(object): + """ + Policy that decides how often the reconnection to a dead node is attempted. + + Note that if the driver receives a push notification from the Cassandra cluster that a node is + UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be + created (in effect, the driver reset the scheduler). + + The default ExponentialReconnectionPolicy policy is usually adequate. + """ + + def __init__(self, j_reconnection_policy): + self._j_reconnection_policy = j_reconnection_policy + + @staticmethod + def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \ + -> 'ReconnectionPolicy': + """ + The default load reconnection policy. + + A reconnection policy that waits exponentially longer between each reconnection attempt + (but keeps a constant delay once a maximum delay is reached). + """ + JExponentialReconnectionPolicy = get_gateway().jvm. \ + com.datastax.driver.core.policies.ExponentialReconnectionPolicy + return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms)) + + @staticmethod + def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy': + """ + A reconnection policy that waits a constant time between each reconnection attempt. + """ + JConstantReconnectionPolicy = get_gateway().jvm.\ + com.datastax.driver.core.policies.ConstantReconnectionPolicy + return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms)) + + +class RetryPolicy(object): + """ + A policy that defines a default behavior to adopt when a request fails. + + There are three possible decisions: + - RETHROW: no retry should be attempted and an exception should be thrown. + - RETRY: the operation will be retried. The consistency level of the retry should be specified. + - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the + operation that triggered the Cassandra exception will return an empty result set. + """ + + def __init__(self, j_retry_policy): + self._j_retry_policy = j_retry_policy + + @staticmethod + def consistency_retry_policy() -> 'RetryPolicy': + """ + The default retry policy. + + This policy retries queries in only two cases: + - On a read timeout, retries once on the same host if enough replicas replied but data was + not retrieved. + - On a write timeout, retries once on the same host if we timeout while writing the + distributed log used by batch statements. + - On an unavailable exception, retries once on the next host. + - On a request error, such as a client timeout, the query is retried on the next host. + Do not retry on read or write failures. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return RetryPolicy(JPolicies.defaultRetryPolicy()) + + @staticmethod + def fallthrough_retry_policy() -> 'RetryPolicy': + """ + A retry policy that never retries (nor ignores). + """ + JFallthroughRetryPolicy = get_gateway().jvm.com.datastax.driver.core.policies. \ + FallthroughRetryPolicy + return RetryPolicy(JFallthroughRetryPolicy.INSTANCE) + + +class SpeculativeExecutionPolicy(object): + """ + The policy that decides if the driver will send speculative queries to the next hosts when the + current host takes too long to respond. + + Note that only idempotent statements will be speculatively retried. + """ + + def __init__(self, j_speculative_execution_policy): + self._j_speculative_execution_policy = j_speculative_execution_policy + Review Comment: What about fixing this in a separate PR as this PR is already very big? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org