Hi guys.

Curious to understand.

I'm busy trying to stitch together a Kafka -> Paimon pipeline using the
action framework.
I got this working using straight JSON.

Currently modifying to be AVRO serialised onto the Kafka topic, this
implies needing to use the debezium-avro CDC reader as there is no other
way to read the data from the topic.

Why would I need to include in my schema on Kafka the "op" field.. with
default "c".
Why is it now complaining about an "after" tag in the payload. This implies
the user needs "add" this somewhere along the line vs having it be a
possible configuration setting in the flink command to kick off the job.

haven't figured out how to get this working yet, error below

G

-----------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Not a valid schema field: after
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field:
after
at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282)
at
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205)
at
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94)
at
org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown
Source)
at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
Source)
at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown
Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown Source)
at
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70)
at
org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67)
at
org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150)
at
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117)
at
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215)
at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more

-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to