Think I'll eventually pass the library issues, actually remove some and the
error stays the same so I'm going to assume those libraries were actually
not required.

So I'm consuming from a Confluent Kafka topic, which only gets
records added to be inserted into paimon, no updates ever...

My payload is avro serialized. but the below command results in the lower
error... expecting the extracted payload to be debezium structured....

*Error stack.*








































*org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Not a valid schema field: after at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at java.base/java.security.AccessController.doPrivileged(Native Method) at
java.base/javax.security.auth.Subject.doAs(Unknown Source) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)Caused
by: org.apache.avro.AvroRuntimeException: Not a valid schema field: after
at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282) at
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205)
at
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94)
at
org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown
Source) at
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown
Source) at
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown
Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown
Source) at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown
Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown
Source) at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown
Source) at
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70)
at
org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67)
at
org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150)
at
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117)
at
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215)
at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source) at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 moreflink@01ab4a472184:~/lib/flink$ *

Command:
/opt/flink/bin/flink run \
/opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \
kafka_sync_table \
-Dexecution.checkpointing.interval=10s \
-Dexecution.checkpointing.num-retained=5 \
-Dstate.checkpoints.num-retained=10 \
-Dpipeline.name='sync-kafka-topic-to-paimon-s3' \
--kafka_conf properties.bootstrap.servers=broker:29092 \
--kafka_conf topic=factory_iot \
--kafka_conf value.format=debezium-avro \
--kafka_conf properties.group.id=123456 \
--kafka_conf schema.registry.url=http://schema-registry:9081 \
--kafka_conf scan.startup.mode=earliest-offset \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://metastore:9083 \
--warehouse s3a://warehouse/paimon/ \
--database iot \
--table factory_iot \
--table_conf sink.parallelism=4

I've already modified my Avro schema to include a op field with default
value of c

{
"name": "factory_iot_value",
"doc": "Factory Telemetry/IoT measurements",
"type": "record",
"fields": [
{"name": "ts", "type": "long"},
{"name": "metadata", "type": [
{"type": "record",
"name": "metadata",
"fields": [
{"name": "siteId","type": "int"},
{"name": "deviceId","type": "int"},
{"name": "sensorId","type": "int"},
{"name": "unit","type": "string"},
{"name": "ts_human","type": ["null", "string"], "default": null},
{"name": "location", "type": ["null",
{"name": "location","type": "record",
"fields": [
{"name": "latitude", "type": "double"},
{"name": "longitude", "type": "double"}
]
}
], "default": null},
{"name": "deviceType", "type": ["null", "string"], "default": null}
]
}
]
},
{"name": "measurement", "type": "double"},
{"name": "op", "type": "string", "default": "c"}
]
}

-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to