Flink??1.15.2
I am now going to change the data stream from DataStream<String> to DataStream<ROW> Already implemented (insert only works fine), but when DataStream<String> contains update information The error is: Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input conversion. Conversion expects insert-only records but DataStream API record contains: UPDATE_BEFORE at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180) at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62) at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118) at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) kafkaflink.java:179-180 lines of code Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE, beforeObject, rowTypeInfo); collector. collect(before);The before data output is -U[1, test, 123-456-789] I would like to know : How to convert the stream containing update data from DataStream<String> to DataStream<ROW>