This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3635a7c7cd4 kafka manual commit classloading issue (#17372)
3635a7c7cd4 is described below
commit 3635a7c7cd413533e490149a40cd967d06478da0
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 6 12:13:52 2025 +0000
kafka manual commit classloading issue (#17372)
CAMEL-21833: Move kamelet kafka manual commit to camel-kafka due to
classloading problems. Remove dependency on camel-kafka in camel-kameleets.
---
.../camel/component}/kafka/BatchManualCommit.java | 30 ++++++++++++++--------
.../camel/component}/kafka/ManualCommit.java | 10 ++++++--
.../consumer/DefaultKafkaManualAsyncCommit.java | 1 -
.../kafka/consumer/DefaultKafkaManualCommit.java | 10 +++++---
.../batching/KafkaRecordBatchingProcessor.java | 5 ++++
components/camel-kamelet/pom.xml | 5 ----
.../kafka/KafkaHeaderDeserializer.java | 6 +----
.../utils/transform/MessageTimestampRouter.java | 5 ++--
.../kamelet/utils/transform/RegexRouter.java | 5 ++--
.../kamelet/utils/transform/TimestampRouter.java | 5 ++--
.../kamelet/utils/transform/kafka/ValueToKey.java | 3 +--
.../kamelet/utils/transform/RegexRouterTest.java | 5 ++--
12 files changed, 49 insertions(+), 41 deletions(-)
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
similarity index 55%
rename from
components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
rename to
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
index 8b97d3b9ff1..3bf8c2495f2 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/BatchManualCommit.java
@@ -14,29 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kamelet.utils.transform.kafka;
+package org.apache.camel.component.kafka;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BatchManualCommit implements Processor {
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchManualCommit.class);
+
@Override
public void process(Exchange exchange) throws Exception {
- List<?> exchanges = exchange.getMessage().getBody(List.class);
- if (exchanges.size() > 0) {
- final Object tmp = exchanges.get(exchanges.size() - 1);
- if (tmp instanceof Exchange element) {
- KafkaManualCommit manual
- =
element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
- if (manual != null) {
- manual.commit();
+ KafkaManualCommit manual
+ =
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
+ if (manual == null) {
+ List<?> exchanges = exchange.getMessage().getBody(List.class);
+ if (exchanges != null && !exchanges.isEmpty()) {
+ Object obj = exchanges.get(exchanges.size() - 1);
+ if (obj instanceof Exchange last) {
+ manual =
last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
}
}
}
+
+ if (manual != null) {
+ LOG.debug("Performing Kafka Batch manual commit: {}", manual);
+ manual.commit();
+ } else {
+ LOG.debug("Cannot perform Kafka Batch manual commit due header: {}
is missing", KafkaConstants.MANUAL_COMMIT);
+ }
}
}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
similarity index 76%
rename from
components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
rename to
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
index db92916f2b9..8fa06ab73aa 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/ManualCommit.java
@@ -14,20 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kamelet.utils.transform.kafka;
+package org.apache.camel.component.kafka;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ManualCommit implements Processor {
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchManualCommit.class);
+
@Override
public void process(Exchange exchange) throws Exception {
KafkaManualCommit manual =
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
if (manual != null) {
+ LOG.debug("Performing Kafka manual commit: {}", manual);
manual.commit();
+ } else {
+ LOG.debug("Cannot perform Kafka manual commit due header: {} is
missing", KafkaConstants.MANUAL_COMMIT);
}
}
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
index e38761c4ef7..7eea8cdc5f4 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
@@ -23,7 +23,6 @@ public class DefaultKafkaManualAsyncCommit extends
DefaultKafkaManualCommit impl
KafkaManualCommitFactory.KafkaRecordPayload recordPayload,
CommitManager commitManager) {
super(camelExchangePayload, recordPayload);
-
this.commitManager = commitManager;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
index 5aa71a8ccfc..617526cae0a 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualCommit.java
@@ -66,8 +66,6 @@ public abstract class DefaultKafkaManualCommit implements
KafkaManualCommit {
/**
* Gets the Camel Exchange payload
- *
- * @return
*/
public KafkaManualCommitFactory.CamelExchangePayload
getCamelExchangePayload() {
return camelExchangePayload;
@@ -75,10 +73,14 @@ public abstract class DefaultKafkaManualCommit implements
KafkaManualCommit {
/**
* Gets the Kafka record payload
- *
- * @return
*/
public KafkaManualCommitFactory.KafkaRecordPayload getKafkaRecordPayload()
{
return kafkaRecordPayload;
}
+
+ @Override
+ public String toString() {
+ return "KafkaManualCommit[topic=" + getTopicName() + ", offset=" +
getRecordOffset() + "]";
+ }
+
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index 09247cddc39..44193f89c71 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -159,8 +159,13 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
Message message = exchange.getMessage();
var exchanges = exchangeList.stream().toList();
message.setBody(exchanges);
+
try {
if (configuration.isAllowManualCommit()) {
+ Exchange last = exchanges.isEmpty() ? null :
exchanges.get(exchanges.size() - 1);
+ if (last != null) {
+ message.setHeader(KafkaConstants.MANUAL_COMMIT,
last.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT));
+ }
manualCommitResultProcessing(camelKafkaConsumer, exchange);
} else {
autoCommitResultProcessing(camelKafkaConsumer, exchange,
exchanges.size());
diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml
index b6f9a7149b5..018a567d0f1 100644
--- a/components/camel-kamelet/pom.xml
+++ b/components/camel-kamelet/pom.xml
@@ -71,11 +71,6 @@
<version>1.2</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-kafka</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jackson</artifactId>
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
index 6cb8f2f1b02..7f5eae8041a 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -23,7 +23,6 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.support.SimpleTypeConverter;
/**
@@ -84,12 +83,9 @@ public class KafkaHeaderDeserializer implements Processor {
/**
* Exclude special Kafka headers from auto deserialization.
- *
- * @param entry
- * @return
*/
private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
- return !entry.getKey().equals(KafkaConstants.HEADERS) &&
!entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
+ return !entry.getKey().equals("kafka.HEADERS") &&
!entry.getKey().equals("CamelKafkaManualCommit");
}
public void setEnabled(String enabled) {
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
index 3b1ef2eb3dc..048a74ec032 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class MessageTimestampRouter {
@@ -55,7 +54,7 @@ public class MessageTimestampRouter {
}
Object rawTimestamp = null;
- String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
+ String topicName = ex.getMessage().getHeader("kafka.TOPIC",
String.class);
for (String key : splittedKeys) {
if (ObjectHelper.isNotEmpty(key)) {
rawTimestamp = body.get(key);
@@ -83,7 +82,7 @@ public class MessageTimestampRouter {
replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
}
- ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
updatedTopic);
+ ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
}
}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
index 517f1d0b787..4bbc27a56ab 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
@@ -21,7 +21,6 @@ import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class RegexRouter {
@@ -29,12 +28,12 @@ public class RegexRouter {
public void process(
@ExchangeProperty("regex") String regex,
@ExchangeProperty("replacement") String replacement, Exchange ex) {
Pattern regexPattern = Pattern.compile(regex);
- String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
+ String topicName = ex.getMessage().getHeader("kafka.TOPIC",
String.class);
if (ObjectHelper.isNotEmpty(topicName)) {
final Matcher matcher = regexPattern.matcher(topicName);
if (matcher.matches()) {
final String topicUpdated = matcher.replaceFirst(replacement);
- ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
topicUpdated);
+ ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC",
topicUpdated);
}
}
String ceType = ex.getMessage().getHeader("ce-type", String.class);
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
index 9c660497f0e..db41dcbf013 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
@@ -25,7 +25,6 @@ import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class TimestampRouter {
@@ -41,7 +40,7 @@ public class TimestampRouter {
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
Long timestamp = null;
- String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
+ String topicName = ex.getMessage().getHeader("kafka.TOPIC",
String.class);
Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
if (rawTimestamp instanceof Long) {
timestamp = (Long) rawTimestamp;
@@ -62,7 +61,7 @@ public class TimestampRouter {
replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
}
- ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
updatedTopic);
+ ex.getMessage().setHeader("kafka.OVERRIDE_TOPIC", updatedTopic);
}
}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
index 8333ab25cf6..208ab32b0b9 100644
---
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.InvalidPayloadException;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
public class ValueToKey {
@@ -48,7 +47,7 @@ public class ValueToKey {
}
}
- ex.getMessage().setHeader(KafkaConstants.KEY, key);
+ ex.getMessage().setHeader("kafka.KEY", key);
}
boolean filterNames(String fieldName, List<String> splittedFields) {
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
index c1837a86718..d782a05fb89 100644
---
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.kamelet.utils.transform;
import org.apache.camel.Exchange;
-import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Assertions;
@@ -42,10 +41,10 @@ class RegexRouterTest {
void shouldReplaceFieldToPlainJson() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);
- exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic);
+ exchange.getMessage().setHeader("kafka.TOPIC", topic);
processor.process(".*ll.*", "newTopic", exchange);
- Assertions.assertEquals("newTopic",
exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC));
+ Assertions.assertEquals("newTopic",
exchange.getMessage().getHeader("kafka.OVERRIDE_TOPIC"));
}
}