[ 
https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754376#comment-17754376
 ] 

Hang Ruan commented on FLINK-29039:
-----------------------------------

This bug is the same as https://issues.apache.org/jira/browse/FLINK-25132.

We cannot get the right result when the deserializer reuses the object and the 
connector put deserialized records in a collection. This will cause the 
collection contains the same object.

> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29039
>                 URL: https://issues.apache.org/jira/browse/FLINK-29039
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.15.1
>         Environment: This issue was discovered on MacOS Big Sur.
>            Reporter: Marco A. Villalobos
>            Assignee: Hang Ruan
>            Priority: Major
>
> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value.
>  
> Given this program:
> {code:java}
> package mvillalobos.bug;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class IsThisABatchSQLBug {  
>    public static void main(String[] args) {
>      final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>      env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>      final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>      tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
>            "        `file.path`              STRING NOT NULL METADATA,\n" +
>            "        `file.name`              STRING NOT NULL METADATA,\n" +
>            "        `file.size`              BIGINT NOT NULL METADATA,\n" +
>            "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
> METADATA,\n" +
>            "        line                    STRING\n" +
>            "      ) WITH (\n" +
>            "        'connector' = 'filesystem', \n" +
>            "        'format' = 'raw'\n" +
>            "      );");
>      tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
>            "      WITH (\n" +
>            "        'path' = 
> '/Users/minmay/dev/mvillalobos/historical/data'\n" +
>            "      ) LIKE historical_raw_source_template;");     final 
> TableResult output = 
> tableEnv.from("historical_raw_source").select($("line")).execute();
>      output.print();
>   }
> } {code}
> and this sample.csv file in the 
> '/Users/minmay/dev/mvillalobos/historical/data' directory:
> {code:java}
> one
> two
> three
> four
> five
> six
> seven
> eight
> nine
> ten {code}
> {{The print results are:}}
> {code:java}
> +----+--------------------------------+
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> +----+--------------------------------+
> 10 rows in set {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to