Hi all... modified my producer to now avro encode both the key and the value of the message: on Control Center it's all looking good, also got past the magic byte problem.
Getting the error as lower down though. Thinking I got my comment wrong, specifically with regard to the key.format and key.field /opt/flink/bin/flink run \ /opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \ kafka_sync_table \ -Dexecution.checkpointing.interval=10s \ -Dexecution.checkpointing.num-retained=5 \ -Dstate.checkpoints.num-retained=10 \ -Dpipeline.name='sync-kafka-topic-to-paimon-s3' \ --kafka_conf properties.bootstrap.servers=broker:29092 \ --kafka_conf topic=factory_iot \ --kafka_conf value.format=debezium-avro \ --kafka_conf key.format=debezium-avro \ --kafka_conf key.field=siteId \ --kafka_conf properties.group.id=123456 \ --kafka_conf schema.registry.url=http://schema-registry:9081 \ --kafka_conf scan.startup.mode=earliest-offset \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://metastore:9083 \ --warehouse s3a://warehouse/paimon/ \ --database iot \ --table factory_iot \ --table_conf sink.parallelism=4 the value displayed in control center for the key of the message is: { "siteId": 105 } Any assistance would be appreciated. 2025-03-26 09:40:54,628 WARN org.apache.hadoop.metrics2.impl.MetricsConfig [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2025-03-26 09:40:54,642 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - Scheduled Metric snapshot period at 10 second(s). 2025-03-26 09:40:54,643 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl [] - s3a-file-system metrics system started java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess at java.base/java.lang.ClassLoader.defineClass1(Native Method) at java.base/java.lang.ClassLoader.defineClass(Unknown Source) at java.base/java.security.SecureClassLoader.defineClass(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.defineClass1(Native Method) at java.base/java.lang.ClassLoader.defineClass(Unknown Source) at java.base/java.security.SecureClassLoader.defineClass(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.defineClass1(Native Method) at java.base/java.lang.ClassLoader.defineClass(Unknown Source) at java.base/java.security.SecureClassLoader.defineClass(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(Unknown Source) at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3472) at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3476) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) at com.google.common.cache.LocalCache.get(LocalCache.java:3962) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:274) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.read(AbstractKafkaAvroDeserializer.java:492) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:259) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:236) at org.apache.paimon.flink.action.cdc.serialization.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:39) at org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema.deserialize(KafkaDebeziumAvroDeserializationSchema.java:71) at org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema.deserialize(KafkaDebeziumAvroDeserializationSchema.java:38) at org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils$KafkaConsumerWrapper.lambda$getRecords$0(KafkaActionUtils.java:346) at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) at org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils$KafkaConsumerWrapper.getRecords(KafkaActionUtils.java:351) at org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:67) at org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67) at org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215) at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) Caused by: java.lang.ClassNotFoundException: com.google.common.util.concurrent.internal.InternalFutureFailureAccess at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) ... 74 more flink@f678a5da4643:~$ -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!