Liu created FLINK-28208: --------------------------- Summary: The method createBatchSink in class HiveTableSink should setParallelism for map operator Key: FLINK-28208 URL: https://issues.apache.org/jira/browse/FLINK-28208 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.16.0 Reporter: Liu
The problem is found when using Adaptive Batch Scheduler. In these, a simple SQL like "select * from * where *" would generate three operators including source, map and sink. The map's parallelism is set to -1 by default and is not the same with source and sink. As a result, the three operators can not be chained together. The reason is that we add map operator in method createBatchSink but not setParallelism. The changed code is as following: {code:java} private DataStreamSink<Row> createBatchSink( DataStream<RowData> dataStream, DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, final int parallelism) throws IOException { ... return dataStream .map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value)) .setParallelism(parallelism) // New added to ensure the right parallelism .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)