Hi, 

I tried with a simplied version of the attached code, and it shows the
detailed exception is 

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
java.time.Instant
 at org$apache$flink$api$java$tuple$Tuple4$1$Converter.toInternal(Unknown 
Source)
 at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96)
 at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46)
 at 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
 at 
org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
 at 
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92)
 ... 14 more

Thus it looks to me that the error is caused by the column f0 is of the type 
Long and could not be transformed into the 
TIMESTAMP_LTZ type directly. To fix this error, one possible solution might be 
change the declaration of tupled4DsTable as

Table tupled4DsTable =
        tableEnv.fromDataStream(
                        tuple4ds,
                        Schema.newBuilder()
                                .columnByExpression(
                                        "f0_ts",
                                        
Expressions.callSql("TO_TIMESTAMP_LTZ(f0, 3)"))
                                .column("f1", "BIGINT")
                                .column("f2", "STRING")
                                .column("f3", "STRING")
                                .watermark("f0_ts", "SOURCE_WATERMARK()")
                                .build())
                .as("eventTime", "handlingTime", "transactionId", 
"originalEvent");
​Sorry I'm not an expert in Table / SQL and might miss or overlook something.


Best,
Yun


 ------------------Original Mail ------------------
Sender:HG <hanspeter.sl...@gmail.com>
Send Date:Thu Feb 17 22:07:07 2022
Recipients:user <user@flink.apache.org>
Subject:java.io.IOException: Failed to deserialize consumer record due to/ How 
to serialize table output for KafkaSink

Hello,

I have to convert the table to Datastream and try to do it with toAppendStream 
(just saw that it is deprecated ) 
But I have not been able to do the conversion as yet. (See the attached code).
Also my final Sink should be Kafka and the format ObjectNode/JSON.
So I need a serializer eventually.

What am I doing wrong? Can I convert to an ObjectNode with a serializer 
directly?
Both toAppendStream and toDataStream fail with the same error.

It fails with (shortened stack trace)
java.io.IOException: Failed to deserialize consumer record due to
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54)
 ~[?:?]
....
Caused by: java.io.IOException: Failed to deserialize consumer record 
ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18, leaderEpoch 
= 0, offset = 27070, CreateTime = 1645105052689, serialized key size = -1, 
serialized value size = 3587, headers = RecordHeaders(headers = [], isReadOnly 
= false), key = null, value = [B@7fcb6863).
...
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
...
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
..
Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input 
conversion from external DataStream API to internal Table API data structures. 
Make sure that the provided data types that configure the converters are 
correctly declared in the schema. Affected record:
Table result = tableEnv.sqlQuery("select transactionId" +
        ", originalEvent" +
        ", handlingTime" +
        ", handlingTime - ifnull(lag(handlingTime) over (partition by 
transactionId order by eventTime), handlingTime) as elapsedTime" +
        " from " + tupled4DsTable + " order by eventTime");

result.printSchema();

TupleTypeInfo<Tuple4<String, String, Long, Long>> tupleType = new 
TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()); 
<-- deprecated and fails
DataStream<Tuple4<String, String, Long, Long>> dsRow = 
tableEnv.toAppendStream(result, tupleType);                                     
        <-- deprecated and fails
DataStream<Row> xx = tableEnv.toDataStream(result); <-- fails with the same 
errorRegards Hans

Reply via email to