harangozop commented on code in PR #24654:
URL: https://github.com/apache/pulsar/pull/24654#discussion_r2336112120
##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java:
##########
@@ -162,38 +165,103 @@ public void open(Map<String, Object> config,
SourceContext sourceContext) throws
sourceTask.initialize(sourceTaskContext);
sourceTask.start(taskConfig);
}
+ private void onOffsetsFlushed(Throwable error, CompletableFuture<Void>
snapshotFlushFuture) {
+ if (error != null) {
+ log.error("Failed to flush offsets to storage: ", error);
+ offsetWriter.cancelFlush();
+ snapshotFlushFuture.completeExceptionally(new Exception("No
Offsets Added Error", error));
+ return;
+ }
+ try {
+ sourceTask.commit();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished flushing offsets to storage");
+ }
+ snapshotFlushFuture.complete(null);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.warn("Flush interrupted, cancelling", ie);
+ offsetWriter.cancelFlush();
+ snapshotFlushFuture.completeExceptionally(new Exception("Failed to
commit offsets", ie));
+ } catch (Throwable t) {
+ log.warn("Flush failed, cancelling", t);
+ offsetWriter.cancelFlush();
+ snapshotFlushFuture.completeExceptionally(new Exception("Failed to
commit offsets", t));
+ }
+ }
+
+ private void triggerOffsetsFlushIfNeeded() {
+ final CompletableFuture<Void> snapshotFlushFuture =
flushFutureRef.get();
+ // Only flush when we have a batch in flight, nothing outstanding, and
a pending future
+ if (snapshotFlushFuture == null || snapshotFlushFuture.isDone() ||
outstandingRecords.get() != 0) {
+ return;
+ }
+ if (!flushing.compareAndSet(false, true)) {
+ return; // someone else is flushing
+ }
+ try {
+ if (offsetWriter.beginFlush()) {
+ offsetWriter.doFlush((error, ignored) -> {
+ try {
+ onOffsetsFlushed(error, snapshotFlushFuture);
+ } finally {
+ flushing.set(false);
+ }
+ });
+ } else {
+ try {
+ onOffsetsFlushed(null, snapshotFlushFuture);
+ } finally {
+ flushing.set(false);
+ }
+ }
+ } catch (ConnectException alreadyFlushing) {
+ // Another thread initiated the flush; let their callback complete
the future.
+ // Keep 'flushing' = true until read() finalizes the batch.
+ } catch (Exception t) {
+ try {
+ onOffsetsFlushed(t, snapshotFlushFuture);
+ } finally {
+ flushing.set(false);
+ }
+ }
+ }
@Override
public synchronized Record<T> read() throws Exception {
while (true) {
if (currentBatch == null) {
- flushFuture = new CompletableFuture<>();
List<SourceRecord> recordList = sourceTask.poll();
if (recordList == null || recordList.isEmpty()) {
continue;
}
outstandingRecords.addAndGet(recordList.size());
currentBatch = recordList.iterator();
+
+ final CompletableFuture<Void> newFuture = new
CompletableFuture<>();
+ flushFutureRef.set(newFuture);
}
if (currentBatch.hasNext()) {
AbstractKafkaSourceRecord<T> processRecord =
processSourceRecord(currentBatch.next());
if (processRecord == null || processRecord.isEmpty()) {
outstandingRecords.decrementAndGet();
- continue;
+ triggerOffsetsFlushIfNeeded();
Review Comment:
The `continue` here is redundant.
This branch is the last statement in the while body; after the trigger fn
call the execution reaches the loop end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]