Hi, After deep dive into the source code, I guess you use the StreamTableEnvironment#fromDataStream method, this method only supports the insert-only message. According to your case, I think you should use the StreamTableEnvironment#fromChangelogStream[1], it supports consuming update row.
[1] https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L317 Best, Ron 完结篇 <2366123...@qq.com> 于2023年8月12日周六 02:29写道: > Flink:1.15.2 > > I am now going to change the data stream from *DataStream<String>* to > *DataStream<ROW>* > > Already implemented (*insert only works fine*), but when > DataStream<String> contains *update *information > > The error is: > *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during > input conversion. Conversion expects insert-only records but DataStream API > record contains: UPDATE_BEFORE* > at > org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > *at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)* > at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62) > at > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) > at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118) > at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > > *kafkaflink.java:179-180 lines of code* > > Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE, > beforeObject, rowTypeInfo); > collector. collect(before); > > The before data output is -U[1, test, 123-456-789] > > I would like to know : How to convert the stream containing *update* data > from *DataStream<String>* to *DataStream<ROW>* >