danny0405 commented on code in PR #5890:
URL: https://github.com/apache/hudi/pull/5890#discussion_r903660059


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java:
##########
@@ -250,6 +261,71 @@ private void testWriteToHoodie(
     }
 
     TestData.checkWrittenFullData(tempFile, expected);
+  }
+
+  private void testWriteToHoodieWithCluster(
+      Configuration conf,
+      String jobName,
+      int checkpoints,
+      Map<String, List<String>> expected) throws Exception {
+
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(4);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    // Read from file source
+    RowType rowType =
+        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+
+    JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+        rowType,
+        InternalTypeInfo.of(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
 
+    boolean isMor = 
conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
+
+    DataStream<RowData> dataStream;
+    if (isMor) {
+      TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+      format.setFilesFilter(FilePathFilter.createDefaultFilter());
+      TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+      format.setCharsetName("UTF-8");
+
+      dataStream = execEnv
+          // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+          .readFile(format, sourcePath, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+          .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+          .setParallelism(1);
+    } else {
+      dataStream = execEnv
+          // use continuous file source to trigger checkpoint
+          .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), checkpoints))
+          .name("continuous_file_source")
+          .setParallelism(1)
+          .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+          .setParallelism(4);
+    }
+
+    int parallelism = execEnv.getParallelism();
+    DataStream<HoodieRecord> hoodieRecordDataStream = 
Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
+    DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, 
parallelism, hoodieRecordDataStream);
+    execEnv.addOperator(pipeline.getTransformation());

Review Comment:
   We should use `Pipelines.append` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to