Hi, I tried with a simplied version of the attached code, and it shows the detailed exception is
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant at org$apache$flink$api$java$tuple$Tuple4$1$Converter.toInternal(Unknown Source) at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96) at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46) at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) at org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51) at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92) ... 14 more Thus it looks to me that the error is caused by the column f0 is of the type Long and could not be transformed into the TIMESTAMP_LTZ type directly. To fix this error, one possible solution might be change the declaration of tupled4DsTable as Table tupled4DsTable = tableEnv.fromDataStream( tuple4ds, Schema.newBuilder() .columnByExpression( "f0_ts", Expressions.callSql("TO_TIMESTAMP_LTZ(f0, 3)")) .column("f1", "BIGINT") .column("f2", "STRING") .column("f3", "STRING") .watermark("f0_ts", "SOURCE_WATERMARK()") .build()) .as("eventTime", "handlingTime", "transactionId", "originalEvent"); ​Sorry I'm not an expert in Table / SQL and might miss or overlook something. Best, Yun ------------------Original Mail ------------------ Sender:HG <hanspeter.sl...@gmail.com> Send Date:Thu Feb 17 22:07:07 2022 Recipients:user <user@flink.apache.org> Subject:java.io.IOException: Failed to deserialize consumer record due to/ How to serialize table output for KafkaSink Hello, I have to convert the table to Datastream and try to do it with toAppendStream (just saw that it is deprecated ) But I have not been able to do the conversion as yet. (See the attached code). Also my final Sink should be Kafka and the format ObjectNode/JSON. So I need a serializer eventually. What am I doing wrong? Can I convert to an ObjectNode with a serializer directly? Both toAppendStream and toDataStream fail with the same error. It fails with (shortened stack trace) java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54) ~[?:?] .... Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18, leaderEpoch = 0, offset = 27070, CreateTime = 1645105052689, serialized key size = -1, serialized value size = 3587, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@7fcb6863). ... Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator ... Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-dist_2.12-1.14.2.jar:1.14.2] .. Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input conversion from external DataStream API to internal Table API data structures. Make sure that the provided data types that configure the converters are correctly declared in the schema. Affected record: Table result = tableEnv.sqlQuery("select transactionId" + ", originalEvent" + ", handlingTime" + ", handlingTime - ifnull(lag(handlingTime) over (partition by transactionId order by eventTime), handlingTime) as elapsedTime" + " from " + tupled4DsTable + " order by eventTime"); result.printSchema(); TupleTypeInfo<Tuple4<String, String, Long, Long>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()); <-- deprecated and fails DataStream<Tuple4<String, String, Long, Long>> dsRow = tableEnv.toAppendStream(result, tupleType); <-- deprecated and fails DataStream<Row> xx = tableEnv.toDataStream(result); <-- fails with the same errorRegards Hans