Hi,

After deep dive into the source code, I guess you use the
StreamTableEnvironment#fromDataStream method, this method only supports the
insert-only message. According to your case, I think you should use the
StreamTableEnvironment#fromChangelogStream[1], it supports consuming update
row.

[1]
https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L317

Best,
Ron

完结篇 <2366123...@qq.com> 于2023年8月12日周六 02:29写道:

> Flink:1.15.2
>
> I am now going to change the data stream from *DataStream<String>* to
> *DataStream<ROW>*
>
> Already implemented (*insert only works fine*), but when
> DataStream<String> contains *update *information
>
> The error is:
> *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during
> input conversion. Conversion expects insert-only records but DataStream API
> record contains: UPDATE_BEFORE*
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> *at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)*
> at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118)
> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
>
> *kafkaflink.java:179-180 lines of code*
>
> Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE,
> beforeObject, rowTypeInfo);
> collector. collect(before);
>
> The before data output is -U[1, test, 123-456-789]
>
> I would like to know : How to convert the stream containing *update* data
> from *DataStream<String>* to *DataStream<ROW>*
>

Reply via email to