MartijnVisser commented on a change in pull request #19083: URL: https://github.com/apache/flink/pull/19083#discussion_r827019131
########## File path: docs/content/docs/connectors/datastream/formats/parquet.md ########## @@ -39,46 +39,72 @@ To use the format you need to add the Flink Parquet dependency to your project: <version>{{< version >}}</version> </dependency> ``` - + +To read Avro records, you will need to add the `parquet-avro` dependency: + +```xml +<dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${flink.format.parquet.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </exclusion> + </exclusions> +</dependency> +``` + This format is compatible with the new Source that can be used in both batch and streaming modes. Thus, you can use this format for two kinds of data: Review comment: ```suggestion This format is compatible with the new Source that can be used in both batch and streaming execution modes. Thus, you can use this format for two kinds of data: ``` ########## File path: docs/content/docs/connectors/datastream/formats/parquet.md ########## @@ -39,46 +39,72 @@ To use the format you need to add the Flink Parquet dependency to your project: <version>{{< version >}}</version> </dependency> ``` - + +To read Avro records, you will need to add the `parquet-avro` dependency: + +```xml +<dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${flink.format.parquet.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </exclusion> + </exclusions> +</dependency> +``` + This format is compatible with the new Source that can be used in both batch and streaming modes. Thus, you can use this format for two kinds of data: -- Bounded data -- Unbounded data: monitors a directory for new files that appear -## Flink RowData +- Bounded data: lists all files and reads them all. +- Unbounded data: monitors a directory for new files that appear. -#### Bounded data example +By default, a File Source is created in the bounded mode, to turn the source into the continuous unbounded mode you can call +`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` additionally . -In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99"). -Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC. -The second boolean instructs the application that the projected Parquet fields names are case-sensitive. -There is no watermark strategy defined as records do not contain event timestamps. +**Batch mode** ```java -final LogicalType[] fieldTypes = - new LogicalType[] { - new DoubleType(), new IntType(), new VarCharType() - }; -final ParquetColumnarRowInputFormat<FileSourceSplit> format = - new ParquetColumnarRowInputFormat<>( - new Configuration(), - RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}), - 500, - false, - true); -final FileSource<RowData> source = - FileSource.forBulkFileFormat(format, /* Flink Path */) - .build(); -final DataStream<RowData> stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +// reads bounded data of records from files at a time +FileSource.forBulkFileFormat(BulkFormat,Path...) + +// reads unbounded data of records from files by monitoring the Paths Review comment: ```suggestion // Monitor the Paths to read data as unbounded data ``` ########## File path: docs/content/docs/connectors/datastream/formats/parquet.md ########## @@ -39,46 +39,72 @@ To use the format you need to add the Flink Parquet dependency to your project: <version>{{< version >}}</version> </dependency> ``` - + +To read Avro records, you will need to add the `parquet-avro` dependency: + +```xml +<dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${flink.format.parquet.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </exclusion> + </exclusions> +</dependency> +``` + This format is compatible with the new Source that can be used in both batch and streaming modes. Thus, you can use this format for two kinds of data: -- Bounded data -- Unbounded data: monitors a directory for new files that appear -## Flink RowData +- Bounded data: lists all files and reads them all. +- Unbounded data: monitors a directory for new files that appear. -#### Bounded data example +By default, a File Source is created in the bounded mode, to turn the source into the continuous unbounded mode you can call +`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` additionally . -In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields ("f7", "f4" and "f99"). -Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC. -The second boolean instructs the application that the projected Parquet fields names are case-sensitive. -There is no watermark strategy defined as records do not contain event timestamps. +**Batch mode** ```java -final LogicalType[] fieldTypes = - new LogicalType[] { - new DoubleType(), new IntType(), new VarCharType() - }; -final ParquetColumnarRowInputFormat<FileSourceSplit> format = - new ParquetColumnarRowInputFormat<>( - new Configuration(), - RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}), - 500, - false, - true); -final FileSource<RowData> source = - FileSource.forBulkFileFormat(format, /* Flink Path */) - .build(); -final DataStream<RowData> stream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); +// reads bounded data of records from files at a time Review comment: ```suggestion // Parquet rows are decoded in batches ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org