Hello, I have to convert the table to Datastream and try to do it with toAppendStream (just saw that it is deprecated ) But I have not been able to do the conversion as yet. (See the attached code). Also my final Sink should be Kafka and the format ObjectNode/JSON. So I need a serializer eventually.
What am I doing wrong? Can I convert to an ObjectNode with a serializer directly? Both toAppendStream and toDataStream fail with the same error. It fails with (shortened stack trace) java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54) ~[?:?] .... Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18, leaderEpoch = 0, offset = 27070, CreateTime = 1645105052689, serialized key size = -1, serialized value size = 3587, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@7fcb6863). ... Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator ... Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-dist_2.12-1.14.2.jar:1.14.2] .. Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input conversion from external DataStream API to internal Table API data structures. Make sure that the provided data types that configure the converters are correctly declared in the schema. Affected record: Table result = tableEnv.sqlQuery("select transactionId" + ", originalEvent" + ", handlingTime" + ", handlingTime - ifnull(lag(handlingTime) over (partition by transactionId order by eventTime), handlingTime) as elapsedTime" + " from " + tupled4DsTable + " order by eventTime"); result.printSchema(); *TupleTypeInfo<Tuple4<String, String, Long, Long>> tupleType = new TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()); <-- deprecated and failsDataStream<Tuple4<String, String, Long, Long>> dsRow = tableEnv.toAppendStream(result, tupleType)*; *<-- deprecated and fails* *DataStream<Row> xx = tableEnv.toDataStream(result); <-- fails with the same error* Regards Hans
code.java
Description: Binary data