Hi guys. Curious to understand.
I'm busy trying to stitch together a Kafka -> Paimon pipeline using the action framework. I got this working using straight JSON. Currently modifying to be AVRO serialised onto the Kafka topic, this implies needing to use the debezium-avro CDC reader as there is no other way to read the data from the topic. Why would I need to include in my schema on Kafka the "op" field.. with default "c". Why is it now complaining about an "after" tag in the payload. This implies the user needs "add" this somewhere along the line vs having it be a possible configuration setting in the flink command to kick off the job. haven't figured out how to get this working yet, error below G ----------------------------------------------------------- The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not a valid schema field: after at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: after at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205) at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94) at org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73) at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown Source) at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown Source) at org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70) at org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67) at org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117) at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215) at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 12 more -- You have the obligation to inform one honestly of the risk, and as a person you are committed to educate yourself to the total risk in any activity! Once informed & totally aware of the risk, every fool has the right to kill or injure themselves as they see fit!