CodeSmell commented on code in PR #11935:
URL: https://github.com/apache/camel/pull/11935#discussion_r1388285546
##########
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java:
##########
@@ -0,0 +1,159 @@
+package org.apache.camel.component.kafka.integration;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.CamelKafkaUtil;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * this will test basic breakOnFirstError functionality
+ * uses allowManualCommit and KafkaManualCommit
+ * because relying on Camel default to use NOOP Commit Manager
+ * this means the route implementation MUST manage all offset commits
+ */
+class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT extends
BaseEmbeddedKafkaTestSupport {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.class);
+
+ public static final String ROUTE_ID =
"breakOnFirstErrorBatchOnExceptionIT";
+ public static final String TOPIC = "test-foobar";
+
+ @EndpointInject("kafka:" + TOPIC
+ + "?groupId=KafkaBreakOnFirstErrorIT"
+ + "&autoOffsetReset=earliest"
+ + "&autoCommitEnable=false"
+ + "&allowManualCommit=true"
+ + "&breakOnFirstError=true"
+ + "&maxPollRecords=3"
+ + "&pollTimeoutMs=1000"
+ +
"&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ +
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+ private Endpoint from;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String>
producer;
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ producer = new
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ MockConsumerInterceptor.recordsCaptured.clear();
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+ }
+
+ /**
+ * will continue to retry the message that is in error
+ */
+ @Test
+ public void kafkaBreakOnFirstErrorBasicCapabilityWithoutOnExcepton()
throws Exception {
+ to.reset();
+ to.expectedMessageCount(7);
+
+ // old behavior before the fix
+ // message-3 causes an error
+ // and breakOnFirstError will cause it to be retried 1x
+ // then we move on
+ //to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-3", "message-4", "message-5");
Review Comment:
IMO this new behavior is what the route really wants
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]