ok, so it seems we have all the libraries in place.below is the schema registered on my Kafka topic, followed by the payload being posted and then the Debezium CDC error... with the "after" tag problem.
*Avro Schema* { "name": "factory_iot_value", "doc": "Factory Telemetry/IoT measurements", "namespace": "factory_iot.avro", "type": "record", "fields": [ {"name": "ts", "type": "long", "doc": "UTC Timestamp", "logicalType": "timestamp-millis"}, {"name": "metadata", "type": [ {"type": "record", "name": "metadata", "fields": [ {"name": "siteId","type": "int", "doc": "Factory/Site ID"}, {"name": "deviceId","type": "int", "doc": "Device ID"}, {"name": "sensorId","type": "int", "doc": "Sensor on Device ID"}, {"name": "unit","type": "string", "doc": "Measurement units of sensor"}, {"name": "ts_human","type": ["null", "string"], "default": null, "doc": "Human readable Timestamp"}, {"name": "location", "type": ["null", {"name": "location","type": "record", "fields": [ {"name": "latitude", "type": "double"}, {"name": "longitude", "type": "double"} ] } ], "default": null, "doc": "GPS Coords of Factory/Site"}, {"name": "deviceType", "type": ["null", "string"], "default": null, "doc": "Device Description"} ] } ] }, {"name": "measurement", "type": "double", "doc": "Measurement retried at UTC timestamp for Sensor on Device located at Site"}, {"name": "op", "type": "string", "default": "c"} ] } *payload.* { "timestamp" : "2024-10-02T00:00:00.869Z", "metadata" : { "siteId" : 1009, "deviceId" : 1042, "sensorId" : 10180, "unit" : "Psi", "ts_human" : "2024-10-02T00:00:00.869Z", "location": { "latitude": -26.195246, "longitude": 28.034088 }, "deviceType" : "Oil Pump" }, "measurement" : 1013.3997 } *Error* org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not a valid schema field: after at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)*Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: after* at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94) at org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73) at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown Source) at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown Source) at org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70) at org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67) at org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215) at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 12 more flink@01ab4a472184:~/lib/flink$ -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!