[ 
https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648708#comment-17648708
 ] 

Dmitry Yaraev commented on FLINK-30314:
---------------------------------------

[~echauchot], [~martijnvisser] Thank you for your efforts in verifying the 
issue. It would be great to have this implemented (as a bug fix or a new 
feature). As promised, I ran several tests using the code from the above 
repository and I was able to reproduce the issue with version 1.16.0 as well. I 
changed the code a bit, so it could write output to the console:
{code:java|title=TestCompressedJson.java|borderStyle=solid}
package examples.flinktest;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class TestCompressedJson {

    public static void main(final String[] args) {
        final String inputDir = args[0];

        // init Table Env
        final EnvironmentSettings environmentSettings =
                EnvironmentSettings.newInstance().inBatchMode().build();
        final TableEnvironment tableEnv = 
TableEnvironment.create(environmentSettings);

        // create source
        tableEnv.executeSql(
                String.format(
                        "CREATE TEMPORARY TABLE SourceTable (my_field1 BIGINT, 
my_field2 INT,  my_field3 VARCHAR(2147483647)) WITH ('connector' = 
'filesystem', 'path' = '%s', 'format' = 'json')",
                        inputDir));

        // create sink
        tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable (my_field1 
BIGINT, my_field2 INT,  my_field3 VARCHAR(2147483647)) WITH ('connector' = 
'print')");

        final Table sourceTable = tableEnv.from("SourceTable");
        sourceTable.executeInsert("SinkTable");
    }
}{code}
Also, I added a few more lines to the data file so that it contained 30 lines. 
Then I ran the code 3 with different inputs:
 # input.json (without compression)
 # input.json.gz
 # input.json.zip

There were no errors for any of the runs, and the outputs were as follows:
{code:java|title=input.json|borderStyle=solid}
4> +I[4646464, 654, test0]
4> +I[4646464, 654, test1]
4> +I[4646464, 654, test2]
4> +I[4646464, 654, test3]
4> +I[4646464, 654, test4]
4> +I[4646464, 654, test5]
4> +I[4646464, 654, test6]
4> +I[4646464, 654, test7]
4> +I[4646464, 654, test8]
4> +I[4646464, 654, test9]
4> +I[4646464, 654, test10]
4> +I[4646464, 654, test11]
4> +I[4646464, 654, test12]
4> +I[4646464, 654, test13]
4> +I[4646464, 654, test14]
4> +I[4646464, 654, test15]
4> +I[4646464, 654, test16]
4> +I[4646464, 654, test17]
4> +I[4646464, 654, test18]
4> +I[4646464, 654, test19]
4> +I[4646464, 654, test20]
4> +I[4646464, 654, test21]
4> +I[4646464, 654, test22]
4> +I[4646464, 654, test23]
4> +I[4646464, 654, test24]
4> +I[4646464, 654, test25]
4> +I[4646464, 654, test26]
4> +I[4646464, 654, test27]
4> +I[4646464, 654, test28]
4> +I[4646464, 654, test29]{code}
{code:java|title=input.json.gz|borderStyle=solid}
4> +I[4646464, 654, test0]
4> +I[4646464, 654, test1]
4> +I[4646464, 654, test2] {code}
There was no output for input.json.zip. It just silently exited with no output 
or errors. I would still keep this ticket as a bug.

> Unable to read all records from compressed line-delimited JSON files using 
> Table API
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-30314
>                 URL: https://issues.apache.org/jira/browse/FLINK-30314
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Core
>    Affects Versions: 1.15.2
>            Reporter: Dmitry Yaraev
>            Priority: Major
>
> I am reading gzipped JSON line-delimited files in the batch mode using 
> [FileSystem 
> Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/].
>  For reading the files a new table is created with the following 
> configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
>   `my_field1` BIGINT,
>   `my_field2` INT,
>   `my_field3` VARCHAR(2147483647)
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'path-to-input-dir',
>   'format' = 'json',
>   'json.ignore-parse-errors' = 'false',
>   'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-00000.json.gz and 
> input-00001.json.gz. As it comes from the filenames, the files are compressed 
> with GZIP. Each of the files contains 10 records. The issue is that only 2 
> records from each file are read (4 in total). If decompressed versions of the 
> same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split 
> length, which is used when the files are read, is in fact the length of a 
> compressed file. So files are closed before all records are read from them 
> because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we 
> could identify if the file compressed or not. The flag can be set to true in 
> {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file 
> streams. With such a flag it could be possible to differentiate 
> non-splittable compressed files and only rely on the end of the stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to