This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 12fdd32 camel-kafka: fix integration test failing on CI
12fdd32 is described below
commit 12fdd32fc6030a50f8cc75634f6510e8ea7ced1e
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Sep 17 14:31:19 2021 +0200
camel-kafka: fix integration test failing on CI
The test kafkaMessageIsConsumedByCamelSeekedToBeginning is failing on CI
because the code was trying to run a seek call without a partition being
assigned to the consumer.
This ensures the seek is only run when an assignment happens.
---
.../camel/component/kafka/KafkaFetchRecords.java | 13 ------------
.../support/PartitionAssignmentListener.java | 24 +++++++---------------
.../consumer/support/SeekPolicyResumeStrategy.java | 8 --------
.../kafka/integration/KafkaConsumerFullIT.java | 10 +++++++++
4 files changed, 17 insertions(+), 38 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 086432e..720ee8d 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -33,8 +33,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import
org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
-import org.apache.camel.spi.StateRepository;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -80,10 +78,6 @@ class KafkaFetchRecords implements Runnable {
void preInit() {
createConsumer();
-
- StateRepository<String, String> offsetRepository =
kafkaConsumer.getEndpoint().getConfiguration().getOffsetRepository();
- String seekPolicy =
kafkaConsumer.getEndpoint().getConfiguration().getSeekTo();
- resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer,
offsetRepository, seekPolicy);
}
@Override
@@ -134,9 +128,6 @@ class KafkaFetchRecords implements Runnable {
if (isReconnecting()) {
subscribe();
- // on first run or reconnecting
- resume();
-
// set reconnect to false as the connection and resume is done at
this point
setReconnect(false);
@@ -148,10 +139,6 @@ class KafkaFetchRecords implements Runnable {
startPolling();
}
- protected void resume() {
- resumeStrategy.resume();
- }
-
private void subscribe() {
PartitionAssignmentListener listener = new PartitionAssignmentListener(
threadId, topicName,
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index b67ab0c..ce02e37 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.deserializeOffsetValue;
import static
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.serializeOffsetKey;
public class PartitionAssignmentListener implements ConsumerRebalanceListener {
@@ -40,6 +39,7 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
private final KafkaConfiguration configuration;
private final KafkaConsumer consumer;
private final Map<String, Long> lastProcessedOffset;
+ private final ResumeStrategy resumeStrategy;
private Supplier<Boolean> stopStateSupplier;
public PartitionAssignmentListener(String threadId, String topicName,
KafkaConfiguration configuration,
@@ -51,13 +51,12 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
this.consumer = consumer;
this.lastProcessedOffset = lastProcessedOffset;
this.stopStateSupplier = stopStateSupplier;
- }
- private void resumeFromOffset(TopicPartition topicPartition, String
offsetState) {
- // The state contains the last read offset, so you need to seek from
the next one
- long offset = deserializeOffsetValue(offsetState) + 1;
- LOG.debug("Resuming partition {} from offset {} from state",
topicPartition.partition(), offset);
- consumer.seek(topicPartition, offset);
+ StateRepository<String, String> offsetRepository =
configuration.getOffsetRepository();
+ String seekPolicy = configuration.getSeekTo();
+
+ LOG.info("Performing resume as {} ", seekPolicy);
+ resumeStrategy = ResumeStrategyFactory.newResumeStrategy(consumer,
offsetRepository, seekPolicy);
}
@Override
@@ -92,15 +91,6 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOG.debug("onPartitionsAssigned: {} from topic {}", threadId,
topicName);
- StateRepository<String, String> offsetRepository =
configuration.getOffsetRepository();
- if (offsetRepository != null) {
- for (TopicPartition partition : partitions) {
- String offsetState =
offsetRepository.getState(serializeOffsetKey(partition));
- if (offsetState != null && !offsetState.isEmpty()) {
- // The state contains the last read offset, so you need to
seek from the next one
- resumeFromOffset(partition, offsetState);
- }
- }
- }
+ resumeStrategy.resume();
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
index 8e5361d..d7606df 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
@@ -17,8 +17,6 @@
package org.apache.camel.component.kafka.consumer.support;
-import java.time.Duration;
-
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,15 +39,9 @@ public class SeekPolicyResumeStrategy implements
ResumeStrategy {
public void resume() {
if (seekPolicy.equals("beginning")) {
LOG.debug("Seeking from the beginning of topic");
- // This poll to ensure we have an assigned partition
- // otherwise seek won't work
- consumer.poll(Duration.ofMillis(100));
consumer.seekToBeginning(consumer.assignment());
} else if (seekPolicy.equals("end")) {
LOG.debug("Seeking from the end off the topic");
- // This poll to ensure we have an assigned partition
- // otherwise seek won't work
- consumer.poll(Duration.ofMillis(100));
consumer.seekToEnd(consumer.assignment());
}
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index f143bd2..f4a9dd4 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -35,7 +35,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
@@ -45,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests",
matches = "true",
disabledReason = "Runtime conflicts with the
idempotency tests")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
public static final String TOPIC = "test";
@@ -89,6 +93,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
};
}
+ @Order(3)
@Test
public void kafkaMessageIsConsumedByCamel() throws InterruptedException,
IOException {
String propagatedHeaderKey = "PropagatedCustomHeader";
@@ -119,6 +124,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive
propagated header");
}
+ @Order(2)
@Test
public void kafkaRecordSpecificHeadersAreNotOverwritten() throws
InterruptedException, IOException {
String propagatedHeaderKey = KafkaConstants.TOPIC;
@@ -137,6 +143,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
}
@Test
+ @Order(1)
public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws
Exception {
to.expectedMessageCount(5);
to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
@@ -150,6 +157,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
to.reset();
to.expectedMessageCount(5);
+
to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
// Restart endpoint,
@@ -164,6 +172,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
to.assertIsSatisfied(3000);
}
+ @Order(4)
@Test
public void kafkaMessageIsConsumedByCamelSeekedToEnd() throws Exception {
to.expectedMessageCount(5);
@@ -194,6 +203,7 @@ public class KafkaConsumerFullIT extends
BaseEmbeddedKafkaTestSupport {
to.assertIsSatisfied(3000);
}
+ @Order(5)
@Test
public void headerDeserializerCouldBeOverridden() {
KafkaEndpoint kafkaEndpoint