jyothsnakonisa commented on code in PR #99:
URL: 
https://github.com/apache/cassandra-analytics/pull/99#discussion_r2002031033


##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.clients.Sidecar;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.sidecar.client.StreamBuffer;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.exceptions.TransportFailureException;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
+
+public class SidecarCdcClient

Review Comment:
   Do we need a separate `SidecarCdcClient` class? Now that `streamCdcSegments` 
and `listCdcSegments` are added to SidecarClient, may be this class is no 
longer needed. All this class is doing Is making delegate call to above methods 
and processing the response may be we can move that processing response logic.



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcOptions.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public interface SidecarCdcOptions

Review Comment:
   We have this interface in sidecar, 
https://github.com/apache/cassandra-sidecar/blob/trunk/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java
   
   We can import the one from sidecar and get rid of this interface



##########
scripts/build-sidecar.sh:
##########
@@ -44,7 +44,7 @@ if [ "x${LOCAL_SIDECAR_REPO}" != "x" ]; then
 else
   
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}";
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-f3bcbba3dcd81b640711baa35a76a2d949ce6c5e}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-c1260886642e789fa15a3fb900251bea529fcc09}"

Review Comment:
   Since we have OSS sidecar version release, can we use that instead?



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.time.Duration;
+
+import o.a.c.sidecar.client.shaded.common.response.data.CdcSegmentInfo;
+import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
+import org.apache.cassandra.cdc.api.CommitLog;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.clients.Sidecar;
+import org.apache.cassandra.spark.data.FileType;
+import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
+import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The SidecarCdcCommitLogSegment provides a CommitLog source for reading the
+ * CommitLog segments over the Sidecar HTTP API.
+ */
+public class SidecarCdcCommitLogSegment implements CommitLog
+{
+    private final CassandraInstance instance;
+    private final CdcSegmentInfo segment;
+    private final CassandraFileSource<CommitLog> source;
+    private final ICdcStats stats;
+
+    public SidecarCdcCommitLogSegment(SidecarCdcClient sidecar,
+                                      CassandraInstance instance,
+                                      CdcSegmentInfo segment,
+                                      Sidecar.ClientConfig clientConfig)
+    {
+        this.instance = instance;
+        this.segment = segment;
+        this.stats = sidecar.stats;
+
+        final SidecarCdcCommitLogSegment thisLog = this;
+        this.source = new CassandraFileSource<CommitLog>()
+        {
+            public void request(long start, long end, StreamConsumer consumer)
+            {
+                sidecar.streamCdcCommitLogSegment(instance, segment.name, 
HttpRange.of(start, end), consumer);
+            }
+
+            public CommitLog cassandraFile()
+            {
+                return thisLog;
+            }
+
+            public long maxBufferSize()
+            {
+                return clientConfig.maxBufferSize();
+            }
+
+            public long chunkBufferSize()
+            {
+                return clientConfig.chunkBufferSize(fileType());
+            }
+
+            public FileType fileType()
+            {
+                return FileType.COMMITLOG;
+            }
+
+            public long size()
+            {
+                return segment.idx;
+            }
+
+            @Nullable
+            @Override
+            public Duration timeout()
+            {
+                final int timeout = clientConfig.timeoutSeconds();

Review Comment:
   we have not been using final for local variables, can we please remove final 
here and in other places for the sake of consistency.



##########
cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarStatePersister.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.ThreadLocalMonotonicTimestampGenerator;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.cdc.CdcKryoRegister;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.StatePersister;
+import org.apache.cassandra.cdc.state.CdcState;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.apache.cassandra.spark.utils.ThrowableUtils;
+import org.apache.cassandra.util.CompressionUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SidecarStatePersister buffers CDC state and flushes at regular time 
intervals, so we only write the latest CDC state and don't wastefully write 
expired data.
+ */
+public class SidecarStatePersister implements StatePersister

Review Comment:
   I see a StatePersister in sidecar but the implementation is is a little 
different between two classes, can we potentially get rid of the one in Sidecar?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to