Hi Yun
It works.
Thank you

On Fri, Feb 18, 2022, 04:17 Yun Gao <yungao...@aliyun.com> wrote:

> Hi,
>
> I tried with a simplied version of the attached code, and it shows the
> detailed exception is
>
>
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
> java.time.Instant
>
>  at org$apache$flink$api$java$tuple$Tuple4$1$Converter.toInternal(Unknown 
> Source)
>
>  at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96)
>
>  at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46)
>
>  at 
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
>
>  at 
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
>
>  at 
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92)
>  ... 14 more
>
> Thus it looks to me that the error is caused by the column f0 is of the
> type Long and could not be transformed into the
> TIMESTAMP_LTZ type directly. To fix this error, one possible solution
> might be change the declaration of tupled4DsTable as
>
> Table tupled4DsTable =
>         tableEnv.fromDataStream(
>                         tuple4ds,
>                         Schema.newBuilder()
>                                 .columnByExpression(
>                                         "f0_ts",
>                                         
> Expressions.callSql("TO_TIMESTAMP_LTZ(f0, 3)"))
>                                 .column("f1", "BIGINT")
>                                 .column("f2", "STRING")
>                                 .column("f3", "STRING")
>                                 .watermark("f0_ts", "SOURCE_WATERMARK()")
>                                 .build())
>                 .as("eventTime", "handlingTime", "transactionId", 
> "originalEvent");
>
> ​Sorry I'm not an expert in Table / SQL and might miss or overlook something.
>
>
>
> Best,
>
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*HG <hanspeter.sl...@gmail.com>
> *Send Date:*Thu Feb 17 22:07:07 2022
> *Recipients:*user <user@flink.apache.org>
> *Subject:*java.io.IOException: Failed to deserialize consumer record due
> to/ How to serialize table output for KafkaSink
>
>> Hello,
>>
>> I have to convert the table to Datastream and try to do it with
>> toAppendStream (just saw that it is deprecated )
>> But I have not been able to do the conversion as yet. (See the attached
>> code).
>> Also my final Sink should be Kafka and the format ObjectNode/JSON.
>> So I need a serializer eventually.
>>
>> What am I doing wrong? Can I convert to an ObjectNode with a serializer
>> directly?
>> Both toAppendStream and toDataStream fail with the same error.
>>
>> It fails with (shortened stack trace)
>> java.io.IOException: Failed to deserialize consumer record due to
>>         at
>> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54)
>> ~[?:?]
>> ....
>> Caused by: java.io.IOException: Failed to deserialize consumer record
>> ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18,
>> leaderEpoch = 0, offset = 27070, CreateTime = 1645105052689, serialized key
>> size = -1, serialized value size = 3587, headers = RecordHeaders(headers =
>> [], isReadOnly = false), key = null, value = [B@7fcb6863).
>> ...
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> ...
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>>         at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> ..
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Error during
>> input conversion from external DataStream API to internal Table API data
>> structures. Make sure that the provided data types that configure the
>> converters are correctly declared in the schema. Affected record:
>>
>> Table result = tableEnv.sqlQuery("select transactionId" +
>>         ", originalEvent" +
>>         ", handlingTime" +
>>         ", handlingTime - ifnull(lag(handlingTime) over (partition by 
>> transactionId order by eventTime), handlingTime) as elapsedTime" +
>>         " from " + tupled4DsTable + " order by eventTime");
>>
>> result.printSchema();
>>
>>
>> *TupleTypeInfo<Tuple4<String, String, Long, Long>> tupleType = new 
>> TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()); 
>> <-- deprecated and failsDataStream<Tuple4<String, String, Long, Long>> dsRow 
>> = tableEnv.toAppendStream(result, tupleType)*;                               
>>               *<-- deprecated and fails*
>>
>> *DataStream<Row> xx = tableEnv.toDataStream(result); <-- fails with the same 
>> error*
>>
>> Regards Hans
>>
>>

Reply via email to