ok, so it seems we have all the libraries in place.below is the schema
registered on my Kafka topic, followed by the payload being posted and then
the Debezium CDC error... with the "after" tag problem.


*Avro Schema*

{
    "name": "factory_iot_value",
    "doc": "Factory Telemetry/IoT measurements",
    "namespace": "factory_iot.avro",
    "type": "record",
    "fields": [
        {"name": "ts", "type": "long", "doc": "UTC Timestamp",
"logicalType": "timestamp-millis"},
        {"name": "metadata", "type": [
                {"type": "record",
                 "name": "metadata",
                 "fields": [
                    {"name": "siteId","type": "int", "doc": "Factory/Site ID"},
                    {"name": "deviceId","type": "int", "doc": "Device ID"},
                    {"name": "sensorId","type": "int", "doc": "Sensor
on Device ID"},
                    {"name": "unit","type": "string", "doc":
"Measurement units of sensor"},
                    {"name": "ts_human","type": ["null", "string"],
"default": null, "doc": "Human readable Timestamp"},
                    {"name": "location", "type": ["null",
                        {"name": "location","type": "record",
                            "fields": [
                                {"name": "latitude", "type": "double"},
                                {"name": "longitude", "type": "double"}
                            ]
                        }
                    ], "default": null, "doc": "GPS Coords of Factory/Site"},
                    {"name": "deviceType", "type": ["null", "string"],
"default": null, "doc": "Device Description"}
                    ]
                }
            ]
        },
        {"name": "measurement", "type": "double", "doc": "Measurement
retried at UTC timestamp for Sensor on Device located at Site"},
        {"name": "op", "type": "string", "default": "c"}
    ]
}

*payload.*

{
    "timestamp" : "2024-10-02T00:00:00.869Z",
    "metadata" : {
        "siteId" : 1009,
        "deviceId" : 1042,
        "sensorId" : 10180,
        "unit" : "Psi",
        "ts_human" : "2024-10-02T00:00:00.869Z",
        "location": {
            "latitude": -26.195246,
            "longitude": 28.034088
        },
        "deviceType" : "Oil Pump"
    },
    "measurement" : 1013.3997
}

*Error*

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Not a valid schema field: after
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Unknown Source)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)*Caused
by: org.apache.avro.AvroRuntimeException: Not a valid schema field:
after*
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282)
        at 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205)
        at 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94)
        at 
org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown 
Source)
        at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown
Source)
        at 
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown
Source)
        at 
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown
Source)
        at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
Source)
        at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown 
Source)
        at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
        at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown 
Source)
        at 
org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70)
        at 
org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67)
        at 
org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150)
        at 
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117)
        at 
org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215)
        at 
org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 12 more
flink@01ab4a472184:~/lib/flink$




-- 
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!

Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!

Reply via email to