Hi all...

modified my producer to now avro encode both the key and the value of the
message:
on Control Center it's all looking good, also got past the magic byte
problem.

Getting the error as lower down though.


Thinking I got my comment wrong, specifically with regard to the key.format
and key.field

/opt/flink/bin/flink run \
/opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \
kafka_sync_table \
-Dexecution.checkpointing.interval=10s \
-Dexecution.checkpointing.num-retained=5 \
-Dstate.checkpoints.num-retained=10 \
-Dpipeline.name='sync-kafka-topic-to-paimon-s3' \
--kafka_conf properties.bootstrap.servers=broker:29092 \
--kafka_conf topic=factory_iot \
--kafka_conf value.format=debezium-avro \
--kafka_conf key.format=debezium-avro \
--kafka_conf key.field=siteId \
--kafka_conf properties.group.id=123456 \
--kafka_conf schema.registry.url=http://schema-registry:9081 \
--kafka_conf scan.startup.mode=earliest-offset \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://metastore:9083 \
--warehouse s3a://warehouse/paimon/ \
--database iot \
--table factory_iot \
--table_conf sink.parallelism=4


the value displayed in control center for the key of the message is:

{ "siteId": 105 }


Any assistance would be appreciated.



2025-03-26 09:40:54,628 WARN  org.apache.hadoop.metrics2.impl.MetricsConfig
               [] - Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2025-03-26 09:40:54,642 INFO
 org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] -
Scheduled Metric snapshot period at 10 second(s).
2025-03-26 09:40:54,643 INFO
 org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] -
s3a-file-system metrics system started
java.lang.NoClassDefFoundError:
com/google/common/util/concurrent/internal/InternalFutureFailureAccess
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(Unknown Source)
at java.base/java.security.SecureClassLoader.defineClass(Unknown Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown
Source)
at
java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown
Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown
Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
Source)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(Unknown Source)
at java.base/java.security.SecureClassLoader.defineClass(Unknown Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown
Source)
at
java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown
Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown
Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
Source)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(Unknown Source)
at java.base/java.security.SecureClassLoader.defineClass(Unknown Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown
Source)
at
java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown
Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown
Source)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
Source)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at
com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3472)
at
com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3476)
at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985)
at
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:274)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:492)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:259)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:236)
at
org.apache.paimon.flink.action.cdc.serialization.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:39)
at
org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema.deserialize(KafkaDebeziumAvroDeserializationSchema.java:71)
at
org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema.deserialize(KafkaDebeziumAvroDeserializationSchema.java:38)
at
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils$KafkaConsumerWrapper.lambda$getRecords$0(KafkaActionUtils.java:346)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
at
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils$KafkaConsumerWrapper.getRecords(KafkaActionUtils.java:351)
at
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:67)
at
org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67)
at
org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150)
at
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117)
at
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215)
at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.ClassNotFoundException:
com.google.common.util.concurrent.internal.InternalFutureFailureAccess
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown
Source)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 74 more
flink@f678a5da4643:~$



-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to