yifan-c commented on code in PR #99:
URL: 
https://github.com/apache/cassandra-analytics/pull/99#discussion_r1996359837


##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/CdcBuilder.java:
##########
@@ -41,26 +41,26 @@
 public class CdcBuilder
 {
     @NotNull
-    final String jobId;
-    final int partitionId;
+    protected final String jobId;
+    protected final int partitionId;
     @NotNull
     TokenRangeSupplier tokenRangeSupplier = () -> null;
     @NotNull
-    SchemaSupplier schemaSupplier;
+    protected SchemaSupplier schemaSupplier;
     @NotNull
     CassandraSource cassandraSource = CassandraSource.DEFAULT;
     @NotNull
     StatePersister statePersister = StatePersister.STUB;
     @NotNull
-    CdcOptions cdcOptions = CdcOptions.DEFAULT;
+    protected CdcOptions cdcOptions = CdcOptions.DEFAULT;
     @NotNull
     ICdcStats stats = ICdcStats.STUB;
     @Nullable

Review Comment:
   `asyncExecutor` is not `Nullable`. The builder cannot be built, if the field 
is `null`. 
   Update the annotation to `@NotNull`?



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.ThreadLocalMonotonicTimestampGenerator;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.StatePersister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.util.CompressionUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SidecarStatePersister buffers CDC state and flushes at regular time 
intervals, so we only write the latest CDC state and don't wastefully write 
expired data.
+ */
+public class SidecarStatePersister implements StatePersister
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarStatePersister.class);
+
+    // group latest state by jobId/token range, so we persist independently
+    protected final ConcurrentHashMap<PersistWrapper.Key, PersistWrapper> 
latestState = new ConcurrentHashMap<>();
+    protected final ConcurrentLinkedQueue<TimedFutureWrapper> activeFlush = 
new ConcurrentLinkedQueue<>();
+    private final ThreadLocalMonotonicTimestampGenerator timestampGenerator = 
new ThreadLocalMonotonicTimestampGenerator();
+    private final SidecarCdcOptions sidecarCdcOptions;
+    private final CdcOptions cdcOptions;
+    private final SidecarCdcCassandraClient cassandraClient;
+    private final SidecarCdcStats sidecarCdcStats;
+    private final AsyncExecutor asyncExecutor;
+    volatile long timerId = -1L;
+
+    public SidecarStatePersister(SidecarCdcOptions sidecarCdcOptions,
+                                 CdcOptions cdcOptions,
+                                 SidecarCdcStats sidecarCdcStats,
+                                 SidecarCdcCassandraClient cassandraClient,
+                                 AsyncExecutor asyncExecutor)
+    {
+        this.sidecarCdcOptions = sidecarCdcOptions;
+        this.cdcOptions = cdcOptions;
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.cassandraClient = cassandraClient;
+        this.asyncExecutor = asyncExecutor;
+    }
+
+    // StatePersister implemented methods
+
+    @Override
+    public void persist(String jobId, int partitionId, @Nullable TokenRange 
tokenRange, @NotNull ByteBuffer buf)
+    {
+        final PersistWrapper latest = new PersistWrapper(jobId, partitionId, 
tokenRange, buf, timestampGenerator.next());
+        PersistWrapper.Key key = latest.key();
+        if (!latest.equals(this.latestState.get(key)))
+        {
+            this.latestState.compute(key, (k, prev) -> !latest.equals(prev) ? 
latest : prev);
+        }
+    }
+
+    @NotNull
+    @Override
+    public List<CdcState> loadState(String jobId, int partitionId, @Nullable 
TokenRange tokenRange)
+    {
+        CompressionUtil compressionUtil = 
CdcBridgeFactory.get(cdcOptions.version()).compressionUtil();
+        List<Integer> sizes = new ArrayList<>();
+        // deserialize and merge the CDC state objects into canonical view
+        List<CdcState> result = loadStateForRange(jobId, tokenRange)
+                                .peek(bytes -> sizes.add(bytes.length))
+                                .map(bytes -> 
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes))
+                                .collect(Collectors.toList());
+        int count = sizes.size();
+        int len = sizes.stream().mapToInt(i -> i).sum();
+        LOGGER.debug("Read CDC state from Cassandra jobId={} start={} end={} 
stateCount={} stateSize={}",
+                     jobId, tokenRange == null ? "null" : 
tokenRange.lowerEndpoint(), tokenRange == null ? "null" : 
tokenRange.upperEndpoint(), count, len);
+        sidecarCdcStats.captureCdcConsumerReadFromState(count, len);
+        return result;
+    }
+
+    @VisibleForTesting
+    public Stream<byte[]> loadStateForRange(String jobId, @Nullable TokenRange 
tokenRange)
+    {
+        return cassandraClient
+               .loadStateForRange(jobId, tokenRange);
+    }
+
+    /**
+     * Start the SidecarStatePersister to flush to Cassandra every 
`persistDelay()`
+     */
+    public synchronized void start()
+    {
+        if (timerId >= 0)
+        {
+            // already running
+            return;
+        }
+        this.timerId = asyncExecutor.periodicTimer(this::persistToCassandra, 
sidecarCdcOptions.persistDelay().toMillis());
+    }
+
+    /**
+     * Stop the SidecarStatePersister gracefully, blocking to await for any 
pending flushes to complete.
+     */
+    public void stop()
+    {
+        stop(true);
+    }
+
+    public synchronized void stop(boolean flush)
+    {
+        if (this.timerId < 0)
+        {
+            // not running
+            return;
+        }
+
+        asyncExecutor.cancelTimer(this.timerId);
+        this.timerId = -1;
+
+        if (flush)
+        {
+            flush();
+        }
+    }
+
+    // internal methods
+
+    protected void persistToCassandra()
+    {
+        persistToCassandra(false);
+    }
+
+    protected void persistToCassandra(boolean force)
+    {
+        // clean-up finished futures
+        activeFlush.removeIf(wrapper -> {
+            if (wrapper.allDone())
+            {
+                try
+                {
+                    wrapper.await();
+                    sidecarCdcStats.capturePersistSucceeded(System.nanoTime() 
- wrapper.startTimeNanos);
+                }
+                catch (InterruptedException e)
+                {
+                    LOGGER.warn("Persist failed with InterruptedException", e);
+                    Thread.currentThread().interrupt();
+                    sidecarCdcStats.capturePersistFailed(e);
+                }
+                catch (ExecutionException e)
+                {
+                    LOGGER.warn("Persist failed", e);
+                    sidecarCdcStats.capturePersistFailed(e);
+                }

Review Comment:
   Maybe just catch `Throwable` to be safe. 



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.clients.Sidecar;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.sidecar.client.SidecarInstancesProvider;
+import org.apache.cassandra.sidecar.client.StreamBuffer;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.exceptions.TransportFailureException;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
+
+public class SidecarCdcClient
+{
+    final Sidecar.ClientConfig config;
+    final SidecarClient sidecarClient;
+    final ICdcStats stats;
+
+    public SidecarCdcClient(Sidecar.ClientConfig config,
+                            SidecarClient sidecarClient,
+                            ICdcStats stats)
+    {
+        this.config = config;
+        this.sidecarClient = sidecarClient;
+        this.stats = stats;
+    }
+
+    public CompletableFuture<List<CommitLog>> 
listCdcCommitLogSegments(CassandraInstance instance)
+    {
+        return sidecarClient.listCdcSegments(toSidecarInstance(instance))
+                            .thenApply(
+                            response ->
+                            response.segmentInfos()
+                                    .stream()
+                                    .map(segment -> (CommitLog) new 
SidecarCdcCommitLogSegment(this, instance, segment, config))
+                                    .collect(Collectors.toList())
+                            ).exceptionally(throwable -> {
+            final Throwable cause = ThrowableUtils.rootCause(throwable);
+            if (cause instanceof TransportFailureException.Nonretryable
+                && ((TransportFailureException.Nonretryable) 
cause).isNotFound())
+            {
+                // Rescue the 404 not found exception - it is a permitted error
+                return Collections.emptyList();
+            }
+            // Rethrow the other exception
+            if (throwable instanceof Error)
+            {
+                throw (Error) throwable;
+            }
+            throw new RuntimeException(cause);
+        });
+    }
+
+    public void streamCdcCommitLogSegment(CassandraInstance instance, String 
segment, HttpRange httpRange, StreamConsumer streamConsumer)
+    {
+        sidecarClient.streamCdcSegments(toSidecarInstance(instance), segment, 
httpRange, new org.apache.cassandra.sidecar.client.StreamConsumer()
+        {
+            @Override
+            public void onRead(StreamBuffer streamBuffer)
+            {
+                streamConsumer.onRead(new 
org.apache.cassandra.spark.utils.streaming.StreamBuffer()
+                {
+                    @Override
+                    public void getBytes(int index, ByteBuffer destination, 
int length)
+                    {
+                        streamBuffer.copyBytes(index, destination, length);
+                    }
+
+                    @Override
+                    public void getBytes(int index, byte[] destination, int 
destinationIndex, int length)
+                    {
+                        streamBuffer.copyBytes(index, destination, 
destinationIndex, length);
+                    }
+
+                    @Override
+                    public byte getByte(int index)
+                    {
+                        return streamBuffer.getByte(index);
+                    }
+
+                    @Override
+                    public int readableBytes()
+                    {
+                        return streamBuffer.readableBytes();
+                    }
+
+                    @Override
+                    public void release()
+                    {
+                        streamBuffer.release();
+                    }
+                });
+            }
+
+            @Override
+            public void onComplete()
+            {
+                streamConsumer.onEnd();
+            }
+
+            @Override
+            public void onError(Throwable throwable)
+            {
+                streamConsumer.onError(throwable);
+            }
+        });
+    }
+
+    protected SidecarInstance toSidecarInstance(CassandraInstance instance)
+    {
+        return new SidecarInstance()
+        {
+            @Override
+            public int port()
+            {
+                return config.effectivePort();
+            }
+
+            @Override
+            public String hostname()
+            {
+                return instance.nodeName();
+            }
+        };
+    }
+
+    public static SidecarCdcClient from(SidecarInstancesProvider 
sidecarInstancesProvider,
+                                        Sidecar.ClientConfig config,
+                                        SecretsProvider secretsProvider,
+                                        ICdcStats stats) throws IOException
+    {
+        return new SidecarCdcClient(config, 
Sidecar.from(sidecarInstancesProvider, config, secretsProvider), stats);
+    }

Review Comment:
   Unused. 



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.ThreadLocalMonotonicTimestampGenerator;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.StatePersister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.util.CompressionUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SidecarStatePersister buffers CDC state and flushes at regular time 
intervals, so we only write the latest CDC state and don't wastefully write 
expired data.
+ */
+public class SidecarStatePersister implements StatePersister
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarStatePersister.class);
+
+    // group latest state by jobId/token range, so we persist independently
+    protected final ConcurrentHashMap<PersistWrapper.Key, PersistWrapper> 
latestState = new ConcurrentHashMap<>();
+    protected final ConcurrentLinkedQueue<TimedFutureWrapper> activeFlush = 
new ConcurrentLinkedQueue<>();
+    private final ThreadLocalMonotonicTimestampGenerator timestampGenerator = 
new ThreadLocalMonotonicTimestampGenerator();
+    private final SidecarCdcOptions sidecarCdcOptions;
+    private final CdcOptions cdcOptions;
+    private final SidecarCdcCassandraClient cassandraClient;
+    private final SidecarCdcStats sidecarCdcStats;
+    private final AsyncExecutor asyncExecutor;
+    volatile long timerId = -1L;
+
+    public SidecarStatePersister(SidecarCdcOptions sidecarCdcOptions,
+                                 CdcOptions cdcOptions,
+                                 SidecarCdcStats sidecarCdcStats,
+                                 SidecarCdcCassandraClient cassandraClient,
+                                 AsyncExecutor asyncExecutor)
+    {
+        this.sidecarCdcOptions = sidecarCdcOptions;
+        this.cdcOptions = cdcOptions;
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.cassandraClient = cassandraClient;
+        this.asyncExecutor = asyncExecutor;
+    }
+
+    // StatePersister implemented methods
+
+    @Override
+    public void persist(String jobId, int partitionId, @Nullable TokenRange 
tokenRange, @NotNull ByteBuffer buf)
+    {
+        final PersistWrapper latest = new PersistWrapper(jobId, partitionId, 
tokenRange, buf, timestampGenerator.next());
+        PersistWrapper.Key key = latest.key();
+        if (!latest.equals(this.latestState.get(key)))
+        {
+            this.latestState.compute(key, (k, prev) -> !latest.equals(prev) ? 
latest : prev);
+        }
+    }
+
+    @NotNull
+    @Override
+    public List<CdcState> loadState(String jobId, int partitionId, @Nullable 
TokenRange tokenRange)
+    {
+        CompressionUtil compressionUtil = 
CdcBridgeFactory.get(cdcOptions.version()).compressionUtil();
+        List<Integer> sizes = new ArrayList<>();
+        // deserialize and merge the CDC state objects into canonical view
+        List<CdcState> result = loadStateForRange(jobId, tokenRange)
+                                .peek(bytes -> sizes.add(bytes.length))
+                                .map(bytes -> 
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes))

Review Comment:
   nit: 
   
   ```suggestion
                                  .map(bytes -> {
                                       sizes.add(bytes.length);
                                       return 
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes);
                                   })
   ```



##########
cassandra-analytics-sidecar-client/src/main/java/org/apache/cassandra/clients/Sidecar.java:
##########
@@ -122,51 +119,6 @@ public static SidecarClient from(SidecarInstancesProvider 
sidecarInstancesProvid
         return buildClient(sidecarConfig, vertx, httpClientConfig, 
sidecarInstancesProvider);
     }
 
-    static String transportModeBasedWriterUserAgent(DataTransport transport)
-    {
-        switch (transport)
-        {
-            case S3_COMPAT:
-                return BuildInfo.WRITER_S3_USER_AGENT;
-            case DIRECT:
-            default:
-                return BuildInfo.WRITER_USER_AGENT;
-        }
-    }
-
-    public static SidecarClient from(SidecarInstancesProvider 
sidecarInstancesProvider, BulkSparkConf conf)
-    {
-        Vertx vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true)
-                                                    
.setWorkerPoolSize(conf.getMaxHttpConnections()));
-

Review Comment:
   Just curious, what prevent from having it kept in the same file? (instead of 
moving to `AnalyticsSidecarClient`). I think the idea of further consolidate. 



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.ThreadLocalMonotonicTimestampGenerator;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.StatePersister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.util.CompressionUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SidecarStatePersister buffers CDC state and flushes at regular time 
intervals, so we only write the latest CDC state and don't wastefully write 
expired data.
+ */
+public class SidecarStatePersister implements StatePersister
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarStatePersister.class);
+
+    // group latest state by jobId/token range, so we persist independently
+    protected final ConcurrentHashMap<PersistWrapper.Key, PersistWrapper> 
latestState = new ConcurrentHashMap<>();
+    protected final ConcurrentLinkedQueue<TimedFutureWrapper> activeFlush = 
new ConcurrentLinkedQueue<>();
+    private final ThreadLocalMonotonicTimestampGenerator timestampGenerator = 
new ThreadLocalMonotonicTimestampGenerator();
+    private final SidecarCdcOptions sidecarCdcOptions;
+    private final CdcOptions cdcOptions;
+    private final SidecarCdcCassandraClient cassandraClient;
+    private final SidecarCdcStats sidecarCdcStats;
+    private final AsyncExecutor asyncExecutor;
+    volatile long timerId = -1L;
+
+    public SidecarStatePersister(SidecarCdcOptions sidecarCdcOptions,
+                                 CdcOptions cdcOptions,
+                                 SidecarCdcStats sidecarCdcStats,
+                                 SidecarCdcCassandraClient cassandraClient,
+                                 AsyncExecutor asyncExecutor)
+    {
+        this.sidecarCdcOptions = sidecarCdcOptions;
+        this.cdcOptions = cdcOptions;
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.cassandraClient = cassandraClient;
+        this.asyncExecutor = asyncExecutor;
+    }
+
+    // StatePersister implemented methods
+
+    @Override
+    public void persist(String jobId, int partitionId, @Nullable TokenRange 
tokenRange, @NotNull ByteBuffer buf)
+    {
+        final PersistWrapper latest = new PersistWrapper(jobId, partitionId, 
tokenRange, buf, timestampGenerator.next());
+        PersistWrapper.Key key = latest.key();
+        if (!latest.equals(this.latestState.get(key)))
+        {
+            this.latestState.compute(key, (k, prev) -> !latest.equals(prev) ? 
latest : prev);
+        }
+    }
+
+    @NotNull
+    @Override
+    public List<CdcState> loadState(String jobId, int partitionId, @Nullable 
TokenRange tokenRange)
+    {
+        CompressionUtil compressionUtil = 
CdcBridgeFactory.get(cdcOptions.version()).compressionUtil();
+        List<Integer> sizes = new ArrayList<>();
+        // deserialize and merge the CDC state objects into canonical view
+        List<CdcState> result = loadStateForRange(jobId, tokenRange)
+                                .peek(bytes -> sizes.add(bytes.length))
+                                .map(bytes -> 
CdcState.deserialize(CdcKryoRegister.kryo(), compressionUtil, bytes))
+                                .collect(Collectors.toList());
+        int count = sizes.size();
+        int len = sizes.stream().mapToInt(i -> i).sum();
+        LOGGER.debug("Read CDC state from Cassandra jobId={} start={} end={} 
stateCount={} stateSize={}",
+                     jobId, tokenRange == null ? "null" : 
tokenRange.lowerEndpoint(), tokenRange == null ? "null" : 
tokenRange.upperEndpoint(), count, len);
+        sidecarCdcStats.captureCdcConsumerReadFromState(count, len);
+        return result;
+    }
+
+    @VisibleForTesting
+    public Stream<byte[]> loadStateForRange(String jobId, @Nullable TokenRange 
tokenRange)
+    {
+        return cassandraClient
+               .loadStateForRange(jobId, tokenRange);
+    }
+
+    /**
+     * Start the SidecarStatePersister to flush to Cassandra every 
`persistDelay()`
+     */
+    public synchronized void start()
+    {
+        if (timerId >= 0)
+        {
+            // already running
+            return;
+        }
+        this.timerId = asyncExecutor.periodicTimer(this::persistToCassandra, 
sidecarCdcOptions.persistDelay().toMillis());
+    }
+
+    /**
+     * Stop the SidecarStatePersister gracefully, blocking to await for any 
pending flushes to complete.
+     */
+    public void stop()
+    {
+        stop(true);
+    }
+
+    public synchronized void stop(boolean flush)
+    {
+        if (this.timerId < 0)
+        {
+            // not running
+            return;
+        }
+
+        asyncExecutor.cancelTimer(this.timerId);
+        this.timerId = -1;

Review Comment:
   Relying on the condition `timerId == -1` might not be sufficient. The 
scheduled run could be still running when cancelTimer returns. However, it is 
not something to be addressed in this patch. Just calling it out. 



##########
cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/CdcBuilder.java:
##########
@@ -41,26 +41,26 @@
 public class CdcBuilder
 {
     @NotNull
-    final String jobId;
-    final int partitionId;
+    protected final String jobId;
+    protected final int partitionId;
     @NotNull
     TokenRangeSupplier tokenRangeSupplier = () -> null;
     @NotNull
-    SchemaSupplier schemaSupplier;
+    protected SchemaSupplier schemaSupplier;
     @NotNull
     CassandraSource cassandraSource = CassandraSource.DEFAULT;
     @NotNull
     StatePersister statePersister = StatePersister.STUB;
     @NotNull
-    CdcOptions cdcOptions = CdcOptions.DEFAULT;
+    protected CdcOptions cdcOptions = CdcOptions.DEFAULT;
     @NotNull
     ICdcStats stats = ICdcStats.STUB;
     @Nullable
-    AsyncExecutor asyncExecutor = null;
+    protected AsyncExecutor asyncExecutor = null;
     @Nullable
-    CommitLogProvider commitLogProvider = null;
+    protected CommitLogProvider commitLogProvider = null;
     @NotNull
-    EventConsumer eventConsumer;
+    protected EventConsumer eventConsumer;
     @NotNull

Review Comment:
   Those are `@NotNull`



##########
cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCommitLogProviderTests.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SidecarCommitLogProviderTests
+{
+    private static final List<CassandraInstance> INSTANCES = Arrays.asList(
+    new CassandraInstance("0", "local1", "DC1"),
+    new CassandraInstance("100", "local2", "DC1"),
+    new CassandraInstance("200", "local3", "DC1"),
+    new CassandraInstance("300", "local4", "DC1"),
+    new CassandraInstance("400", "local5", "DC1"),
+    new CassandraInstance("500", "local6", "DC1"),
+    new CassandraInstance("1", "local7", "DC2"),
+    new CassandraInstance("101", "local8", "DC2"),
+    new CassandraInstance("201", "local9", "DC2"),
+    new CassandraInstance("301", "local10", "DC2"),
+    new CassandraInstance("401", "local11", "DC2"),
+    new CassandraInstance("501", "local12", "DC2")
+    );
+
+    @Test
+    public void testCommitLogProvider()
+    {
+        CassandraInstance instance1 = INSTANCES.get(0);
+        CassandraInstance instance2 = INSTANCES.get(1);
+        CassandraInstance instance3 = INSTANCES.get(2);
+        CassandraInstance instance4 = INSTANCES.get(3);
+        CassandraInstance instance5 = INSTANCES.get(4);
+        CassandraInstance instance6 = INSTANCES.get(5);
+
+        ClusterConfigProvider clusterConfigProvider = 
getClusterConfigProvider();
+        SidecarCdcClient sidecarCdcClient = mock(SidecarCdcClient.class);
+        Map<CassandraInstance, Integer> instanceListCount = new HashMap<>();
+        MutableInt segmentIdGenerator = new MutableInt(0);
+        Answer<CompletableFuture<List<CommitLog>>> listCommitLogAnswer = 
invocation -> {
+            CassandraInstance instance = invocation.getArgument(0, 
CassandraInstance.class);
+            instanceListCount.compute(instance, (key, prev) -> prev == null ? 
1 : prev + 1);
+            int instanceId = 
Integer.parseInt(instance.nodeName().substring(5));
+            int numLogs = instanceId * 13;
+            List<CommitLog> logs = IntStream.rangeClosed(1, numLogs)
+                                            .mapToObj(segmentId -> 
mockCommitLog(instance, segmentIdGenerator.getAndAdd(1)))
+                                            .collect(Collectors.toList());
+            return CompletableFuture.completedFuture(logs);
+        };
+
+        
when(sidecarCdcClient.listCdcCommitLogSegments(any(CassandraInstance.class))).thenAnswer(listCommitLogAnswer);
+        SidecarCommitLogProvider commitLogProvider = 
getSidecarCommitLogProvider(clusterConfigProvider, sidecarCdcClient);
+
+        // list logs on instance1, instance2, instance3
+        TokenRange tokenRange = TokenRange.openClosed(
+        new BigInteger("-50"),
+        new BigInteger("-10")
+        );
+        List<CommitLog> logs = commitLogProvider
+                               .logs(tokenRange)
+                               .sorted()
+                               .collect(Collectors.toList());
+        int expectedNumberOfLogs = 13 + (2 * 13) + (3 * 13);
+        assertEquals(expectedNumberOfLogs, logs.size());
+        IntStream.range(0, logs.size())
+                 .forEach(index -> assertEquals(index, 
logs.get(index).segmentId()));
+        assertEquals(3, instanceListCount.size());
+        assertEquals(1, instanceListCount.get(instance1));
+        assertEquals(1, instanceListCount.get(instance2));
+        assertEquals(1, instanceListCount.get(instance3));
+
+        // list logs on instance4, instance5, instance6
+        TokenRange tokenRange2 = TokenRange.openClosed(
+        new BigInteger("210"),
+        new BigInteger("280")
+        );
+        List<CommitLog> logs2 = commitLogProvider
+                                .logs(tokenRange2)
+                                .collect(Collectors.toList());
+        expectedNumberOfLogs = (4 * 13) + (5 * 13) + (6 * 13);

Review Comment:
   nit: extract "4, 5, 6 and 13" to variable to improve readability. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to