[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800267#comment-17800267 ]
Feng Jin commented on FLINK-33934: ---------------------------------- [~cailiuyang] Does this mean that we should turn off Object Reuse for all DeserializeSchema? > Flink SQL Source use raw format maybe lead to data lost > ------------------------------------------------------- > > Key: FLINK-33934 > URL: https://issues.apache.org/jira/browse/FLINK-33934 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Runtime > Reporter: Cai Liuyang > Priority: Major > > In our product we encounter a case that lead to data lost, the job info: > 1. using flinkSQL that read data from messageQueue and write to hive (only > select value field, doesn't contain metadata field) > 2. the format of source table is raw format > > But if we select value field and metadata field at the same time, than the > data lost will not appear > > After we review the code, we found that the reason is the object reuse of > Raw-format(see code > [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), > why object reuse will lead to this problem is below (take kafka as example): > 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of > SourceOperator, Fetcher-Thread will read and deserialize data from kafka > partition, than put data to ElementQueue (see code [SourceOperator > FetcherTask > |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) > 2. SourceOperator's main thread will pull data from the > ElementQueue(which is shared with the FetcherThread) and process it (see code > [SourceOperator main > thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) > 3. For RawFormatDeserializationSchema, its deserialize function will > return the same object([reuse rowData > object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) > 4. So, if elementQueue have element that not be consumed, than the > fetcherThread can change the filed of the reused rawData that > RawFormatDeserializationSchema::deserialize returned, this will lead to data > lost; > > The reason that we select value and metadata field at the same time will not > encounter data lost is: > if we select metadata field there will return a new RowData object see > code: [DynamicKafkaDeserializationSchema deserialize with metadata field > |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] > and if we only select value filed, it will reuse the RowData object that > formatDeserializationSchema returned see code > [DynamicKafkaDeserializationSchema deserialize only with value > field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] > > To solve this problem, i think we should remove reuse object of > RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)