[ https://issues.apache.org/jira/browse/FLINK-33934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cai Liuyang updated FLINK-33934: -------------------------------- Description: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse rowData object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should diable reuse object of RawFormatDeserializationSchema. was: In our product we encounter a case that lead to data lost, the job info: 1. using flinkSQL that read data from messageQueue and write to hive (only select value field, doesn't contain metadata field) 2. the format of source table is raw format But if we select value field and metadata field at the same time, than the data lost will not appear After we review the code, we found that the reason is the object reuse of Raw-format(see code [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), why object reuse will lead to this problem is below (take kafka as example): 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of SourceOperator, Fetcher-Thread will read and deserialize data from kafka partition, than put data to ElementQueue (see code [SourceOperator FetcherTask |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) 2. SourceOperator's main thread will pull data from the ElementQueue(which is shared with the FetcherThread) and process it (see code [SourceOperator main thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) 3. For RawFormatDeserializationSchema, its deserialize function will return the same object([reuse object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) 4. So, if elementQueue have element that not be consumed, than the fetcherThread can change the filed of the reused rawData that RawFormatDeserializationSchema::deserialize returned, this will lead to data lost; The reason that we select value and metadata field at the same time will not encounter data lost is: if we select metadata field there will return a new RowData object see code: [DynamicKafkaDeserializationSchema deserialize with metadata field |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] and if we only select value filed, it will reuse the RowData object that formatDeserializationSchema returned see code [DynamicKafkaDeserializationSchema deserialize only with value field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] To solve this problem, i think we should diable reuse object of RawFormatDeserializationSchema. > Flink SQL Source use raw format maybe lead to data lost > ------------------------------------------------------- > > Key: FLINK-33934 > URL: https://issues.apache.org/jira/browse/FLINK-33934 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Runtime > Reporter: Cai Liuyang > Priority: Major > > In our product we encounter a case that lead to data lost, the job info: > 1. using flinkSQL that read data from messageQueue and write to hive (only > select value field, doesn't contain metadata field) > 2. the format of source table is raw format > > But if we select value field and metadata field at the same time, than the > data lost will not appear > > After we review the code, we found that the reason is the object reuse of > Raw-format(see code > [RawFormatDeserializationSchema|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]), > why object reuse will lead to this problem is below (take kafka as example): > 1. RawFormatDeserializationSchema will be used in the Fetcher-Thread of > SourceOperator, Fetcher-Thread will read and deserialize data from kafka > partition, than put data to ElementQueue (see code [SourceOperator > FetcherTask > |https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L64]) > 2. SourceOperator's main thread will pull data from the > ElementQueue(which is shared with the FetcherThread) and process it (see code > [SourceOperator main > thread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L188]) > 3. For RawFormatDeserializationSchema, its deserialize function will > return the same object([reuse rowData > object|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java#L62]) > 4. So, if elementQueue have element that not be consumed, than the > fetcherThread can change the filed of the reused rawData that > RawFormatDeserializationSchema::deserialize returned, this will lead to data > lost; > > The reason that we select value and metadata field at the same time will not > encounter data lost is: > if we select metadata field there will return a new RowData object see > code: [DynamicKafkaDeserializationSchema deserialize with metadata field > |https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L249] > and if we only select value filed, it will reuse the RowData object that > formatDeserializationSchema returned see code > [DynamicKafkaDeserializationSchema deserialize only with value > field|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L113] > > To solve this problem, i think we should diable reuse object of > RawFormatDeserializationSchema. -- This message was sent by Atlassian Jira (v8.20.10#820010)