MartijnVisser commented on a change in pull request #19083:
URL: https://github.com/apache/flink/pull/19083#discussion_r827019131



##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,72 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+To read Avro records, you will need to add the `parquet-avro` dependency:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch 
and streaming modes.
 Thus, you can use this format for two kinds of data:

Review comment:
       ```suggestion
   This format is compatible with the new Source that can be used in both batch 
and streaming execution modes.
   Thus, you can use this format for two kinds of data:
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,72 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+To read Avro records, you will need to add the `parquet-avro` dependency:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch 
and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
 
-## Flink RowData
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-#### Bounded data example
+By default, a File Source is created in the bounded mode, to turn the source 
into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
additionally .
 
-In this example, you will create a DataStream containing Parquet records as 
Flink RowDatas. The schema is projected to read only the specified fields 
("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter 
specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields 
names are case-sensitive.
-There is no watermark strategy defined as records do not contain event 
timestamps.
+**Batch mode**
 
 ```java
-final LogicalType[] fieldTypes =
-  new LogicalType[] {
-  new DoubleType(), new IntType(), new VarCharType()
-  };
 
-final ParquetColumnarRowInputFormat<FileSourceSplit> format =
-  new ParquetColumnarRowInputFormat<>(
-  new Configuration(),
-  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
-  500,
-  false,
-  true);
-final FileSource<RowData> source =
-  FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .build();
-final DataStream<RowData> stream =
-  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+// reads bounded data of records from files at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+        
+// reads unbounded data of records from files by monitoring the Paths

Review comment:
       ```suggestion
   // Monitor the Paths to read data as unbounded data
   ```

##########
File path: docs/content/docs/connectors/datastream/formats/parquet.md
##########
@@ -39,46 +39,72 @@ To use the format you need to add the Flink Parquet 
dependency to your project:
        <version>{{< version >}}</version>
 </dependency>
 ```
- 
+
+To read Avro records, you will need to add the `parquet-avro` dependency:
+
+```xml
+<dependency>
+    <groupId>org.apache.parquet</groupId>
+    <artifactId>parquet-avro</artifactId>
+    <version>${flink.format.parquet.version}</version>
+    <optional>true</optional>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>it.unimi.dsi</groupId>
+            <artifactId>fastutil</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
 This format is compatible with the new Source that can be used in both batch 
and streaming modes.
 Thus, you can use this format for two kinds of data:
-- Bounded data
-- Unbounded data: monitors a directory for new files that appear 
 
-## Flink RowData
+- Bounded data: lists all files and reads them all.
+- Unbounded data: monitors a directory for new files that appear.
 
-#### Bounded data example
+By default, a File Source is created in the bounded mode, to turn the source 
into the continuous unbounded mode you can call 
+`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
additionally .
 
-In this example, you will create a DataStream containing Parquet records as 
Flink RowDatas. The schema is projected to read only the specified fields 
("f7", "f4" and "f99").  
-Flink will read records in batches of 500 records. The first boolean parameter 
specifies that timestamp columns will be interpreted as UTC.
-The second boolean instructs the application that the projected Parquet fields 
names are case-sensitive.
-There is no watermark strategy defined as records do not contain event 
timestamps.
+**Batch mode**
 
 ```java
-final LogicalType[] fieldTypes =
-  new LogicalType[] {
-  new DoubleType(), new IntType(), new VarCharType()
-  };
 
-final ParquetColumnarRowInputFormat<FileSourceSplit> format =
-  new ParquetColumnarRowInputFormat<>(
-  new Configuration(),
-  RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
-  500,
-  false,
-  true);
-final FileSource<RowData> source =
-  FileSource.forBulkFileFormat(format,  /* Flink Path */)
-  .build();
-final DataStream<RowData> stream =
-  env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+// reads bounded data of records from files at a time

Review comment:
       ```suggestion
   // Parquet rows are decoded in batches
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to