[ https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754376#comment-17754376 ]
Hang Ruan commented on FLINK-29039: ----------------------------------- This bug is the same as https://issues.apache.org/jira/browse/FLINK-25132. We cannot get the right result when the deserializer reuses the object and the connector put deserialized records in a collection. This will cause the collection contains the same object. > RowData produced by LineBytesInputFormat is reused, but > DeserializationSchemaAdapter#Reader only shallow copies produced data, thus > result will always be the last row value > ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-29039 > URL: https://issues.apache.org/jira/browse/FLINK-29039 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.15.1 > Environment: This issue was discovered on MacOS Big Sur. > Reporter: Marco A. Villalobos > Assignee: Hang Ruan > Priority: Major > > RowData produced by LineBytesInputFormat is reused, but > DeserializationSchemaAdapter#Reader only shallow copies produced data, thus > result will always be the last row value. > > Given this program: > {code:java} > package mvillalobos.bug; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.TableResult; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static org.apache.flink.table.api.Expressions.$; > public class IsThisABatchSQLBug { > public static void main(String[] args) { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" + > " `file.path` STRING NOT NULL METADATA,\n" + > " `file.name` STRING NOT NULL METADATA,\n" + > " `file.size` BIGINT NOT NULL METADATA,\n" + > " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL > METADATA,\n" + > " line STRING\n" + > " ) WITH (\n" + > " 'connector' = 'filesystem', \n" + > " 'format' = 'raw'\n" + > " );"); > tableEnv.executeSql("CREATE TABLE historical_raw_source\n" + > " WITH (\n" + > " 'path' = > '/Users/minmay/dev/mvillalobos/historical/data'\n" + > " ) LIKE historical_raw_source_template;"); final > TableResult output = > tableEnv.from("historical_raw_source").select($("line")).execute(); > output.print(); > } > } {code} > and this sample.csv file in the > '/Users/minmay/dev/mvillalobos/historical/data' directory: > {code:java} > one > two > three > four > five > six > seven > eight > nine > ten {code} > {{The print results are:}} > {code:java} > +----+--------------------------------+ > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > | +I | ten | > +----+--------------------------------+ > 10 rows in set {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)