Hi Yun It works. Thank you On Fri, Feb 18, 2022, 04:17 Yun Gao <yungao...@aliyun.com> wrote:
> Hi, > > I tried with a simplied version of the attached code, and it shows the > detailed exception is > > > Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to > java.time.Instant > > at org$apache$flink$api$java$tuple$Tuple4$1$Converter.toInternal(Unknown > Source) > > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96) > > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46) > > at > org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61) > > at > org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51) > > at > org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92) > ... 14 more > > Thus it looks to me that the error is caused by the column f0 is of the > type Long and could not be transformed into the > TIMESTAMP_LTZ type directly. To fix this error, one possible solution > might be change the declaration of tupled4DsTable as > > Table tupled4DsTable = > tableEnv.fromDataStream( > tuple4ds, > Schema.newBuilder() > .columnByExpression( > "f0_ts", > > Expressions.callSql("TO_TIMESTAMP_LTZ(f0, 3)")) > .column("f1", "BIGINT") > .column("f2", "STRING") > .column("f3", "STRING") > .watermark("f0_ts", "SOURCE_WATERMARK()") > .build()) > .as("eventTime", "handlingTime", "transactionId", > "originalEvent"); > > ​Sorry I'm not an expert in Table / SQL and might miss or overlook something. > > > > Best, > > Yun > > > ------------------Original Mail ------------------ > *Sender:*HG <hanspeter.sl...@gmail.com> > *Send Date:*Thu Feb 17 22:07:07 2022 > *Recipients:*user <user@flink.apache.org> > *Subject:*java.io.IOException: Failed to deserialize consumer record due > to/ How to serialize table output for KafkaSink > >> Hello, >> >> I have to convert the table to Datastream and try to do it with >> toAppendStream (just saw that it is deprecated ) >> But I have not been able to do the conversion as yet. (See the attached >> code). >> Also my final Sink should be Kafka and the format ObjectNode/JSON. >> So I need a serializer eventually. >> >> What am I doing wrong? Can I convert to an ObjectNode with a serializer >> directly? >> Both toAppendStream and toDataStream fail with the same error. >> >> It fails with (shortened stack trace) >> java.io.IOException: Failed to deserialize consumer record due to >> at >> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54) >> ~[?:?] >> .... >> Caused by: java.io.IOException: Failed to deserialize consumer record >> ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18, >> leaderEpoch = 0, offset = 27070, CreateTime = 1645105052689, serialized key >> size = -1, serialized value size = 3587, headers = RecordHeaders(headers = >> [], isReadOnly = false), key = null, value = [B@7fcb6863). >> ... >> Caused by: >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> ... >> Caused by: >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> .. >> Caused by: org.apache.flink.util.FlinkRuntimeException: Error during >> input conversion from external DataStream API to internal Table API data >> structures. Make sure that the provided data types that configure the >> converters are correctly declared in the schema. Affected record: >> >> Table result = tableEnv.sqlQuery("select transactionId" + >> ", originalEvent" + >> ", handlingTime" + >> ", handlingTime - ifnull(lag(handlingTime) over (partition by >> transactionId order by eventTime), handlingTime) as elapsedTime" + >> " from " + tupled4DsTable + " order by eventTime"); >> >> result.printSchema(); >> >> >> *TupleTypeInfo<Tuple4<String, String, Long, Long>> tupleType = new >> TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()); >> <-- deprecated and failsDataStream<Tuple4<String, String, Long, Long>> dsRow >> = tableEnv.toAppendStream(result, tupleType)*; >> *<-- deprecated and fails* >> >> *DataStream<Row> xx = tableEnv.toDataStream(result); <-- fails with the same >> error* >> >> Regards Hans >> >>