Think I'll eventually pass the library issues, actually remove some and the error stays the same so I'm going to assume those libraries were actually not required.
So I'm consuming from a Confluent Kafka topic, which only gets records added to be inserted into paimon, no updates ever... My payload is avro serialized. but the below command results in the lower error... expecting the extracted payload to be debezium structured.... *Error stack.* *org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not a valid schema field: after at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: after at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94) at org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73) at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown Source) at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown Source) at org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70) at org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67) at org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215) at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 12 moreflink@01ab4a472184:~/lib/flink$ * Command: /opt/flink/bin/flink run \ /opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \ kafka_sync_table \ -Dexecution.checkpointing.interval=10s \ -Dexecution.checkpointing.num-retained=5 \ -Dstate.checkpoints.num-retained=10 \ -Dpipeline.name='sync-kafka-topic-to-paimon-s3' \ --kafka_conf properties.bootstrap.servers=broker:29092 \ --kafka_conf topic=factory_iot \ --kafka_conf value.format=debezium-avro \ --kafka_conf properties.group.id=123456 \ --kafka_conf schema.registry.url=http://schema-registry:9081 \ --kafka_conf scan.startup.mode=earliest-offset \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://metastore:9083 \ --warehouse s3a://warehouse/paimon/ \ --database iot \ --table factory_iot \ --table_conf sink.parallelism=4 I've already modified my Avro schema to include a op field with default value of c { "name": "factory_iot_value", "doc": "Factory Telemetry/IoT measurements", "type": "record", "fields": [ {"name": "ts", "type": "long"}, {"name": "metadata", "type": [ {"type": "record", "name": "metadata", "fields": [ {"name": "siteId","type": "int"}, {"name": "deviceId","type": "int"}, {"name": "sensorId","type": "int"}, {"name": "unit","type": "string"}, {"name": "ts_human","type": ["null", "string"], "default": null}, {"name": "location", "type": ["null", {"name": "location","type": "record", "fields": [ {"name": "latitude", "type": "double"}, {"name": "longitude", "type": "double"} ] } ], "default": null}, {"name": "deviceType", "type": ["null", "string"], "default": null} ] } ] }, {"name": "measurement", "type": "double"}, {"name": "op", "type": "string", "default": "c"} ] } -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!