[ https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648708#comment-17648708 ]
Dmitry Yaraev commented on FLINK-30314: --------------------------------------- [~echauchot], [~martijnvisser] Thank you for your efforts in verifying the issue. It would be great to have this implemented (as a bug fix or a new feature). As promised, I ran several tests using the code from the above repository and I was able to reproduce the issue with version 1.16.0 as well. I changed the code a bit, so it could write output to the console: {code:java|title=TestCompressedJson.java|borderStyle=solid} package examples.flinktest; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; public class TestCompressedJson { public static void main(final String[] args) { final String inputDir = args[0]; // init Table Env final EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inBatchMode().build(); final TableEnvironment tableEnv = TableEnvironment.create(environmentSettings); // create source tableEnv.executeSql( String.format( "CREATE TEMPORARY TABLE SourceTable (my_field1 BIGINT, my_field2 INT, my_field3 VARCHAR(2147483647)) WITH ('connector' = 'filesystem', 'path' = '%s', 'format' = 'json')", inputDir)); // create sink tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable (my_field1 BIGINT, my_field2 INT, my_field3 VARCHAR(2147483647)) WITH ('connector' = 'print')"); final Table sourceTable = tableEnv.from("SourceTable"); sourceTable.executeInsert("SinkTable"); } }{code} Also, I added a few more lines to the data file so that it contained 30 lines. Then I ran the code 3 with different inputs: # input.json (without compression) # input.json.gz # input.json.zip There were no errors for any of the runs, and the outputs were as follows: {code:java|title=input.json|borderStyle=solid} 4> +I[4646464, 654, test0] 4> +I[4646464, 654, test1] 4> +I[4646464, 654, test2] 4> +I[4646464, 654, test3] 4> +I[4646464, 654, test4] 4> +I[4646464, 654, test5] 4> +I[4646464, 654, test6] 4> +I[4646464, 654, test7] 4> +I[4646464, 654, test8] 4> +I[4646464, 654, test9] 4> +I[4646464, 654, test10] 4> +I[4646464, 654, test11] 4> +I[4646464, 654, test12] 4> +I[4646464, 654, test13] 4> +I[4646464, 654, test14] 4> +I[4646464, 654, test15] 4> +I[4646464, 654, test16] 4> +I[4646464, 654, test17] 4> +I[4646464, 654, test18] 4> +I[4646464, 654, test19] 4> +I[4646464, 654, test20] 4> +I[4646464, 654, test21] 4> +I[4646464, 654, test22] 4> +I[4646464, 654, test23] 4> +I[4646464, 654, test24] 4> +I[4646464, 654, test25] 4> +I[4646464, 654, test26] 4> +I[4646464, 654, test27] 4> +I[4646464, 654, test28] 4> +I[4646464, 654, test29]{code} {code:java|title=input.json.gz|borderStyle=solid} 4> +I[4646464, 654, test0] 4> +I[4646464, 654, test1] 4> +I[4646464, 654, test2] {code} There was no output for input.json.zip. It just silently exited with no output or errors. I would still keep this ticket as a bug. > Unable to read all records from compressed line-delimited JSON files using > Table API > ------------------------------------------------------------------------------------ > > Key: FLINK-30314 > URL: https://issues.apache.org/jira/browse/FLINK-30314 > Project: Flink > Issue Type: Improvement > Components: API / Core > Affects Versions: 1.15.2 > Reporter: Dmitry Yaraev > Priority: Major > > I am reading gzipped JSON line-delimited files in the batch mode using > [FileSystem > Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/]. > For reading the files a new table is created with the following > configuration: > {code:sql} > CREATE TEMPORARY TABLE `my_database`.`my_table` ( > `my_field1` BIGINT, > `my_field2` INT, > `my_field3` VARCHAR(2147483647) > ) WITH ( > 'connector' = 'filesystem', > 'path' = 'path-to-input-dir', > 'format' = 'json', > 'json.ignore-parse-errors' = 'false', > 'json.fail-on-missing-field' = 'true' > ) {code} > In the input directory I have two files: input-00000.json.gz and > input-00001.json.gz. As it comes from the filenames, the files are compressed > with GZIP. Each of the files contains 10 records. The issue is that only 2 > records from each file are read (4 in total). If decompressed versions of the > same data files are used, all 20 records are read. > As far as I understand, that problem may be related to the fact that split > length, which is used when the files are read, is in fact the length of a > compressed file. So files are closed before all records are read from them > because read position of the decompressed file stream exceeds split length. > Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we > could identify if the file compressed or not. The flag can be set to true in > {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file > streams. With such a flag it could be possible to differentiate > non-splittable compressed files and only rely on the end of the stream. -- This message was sent by Atlassian Jira (v8.20.10#820010)