[ 
https://issues.apache.org/jira/browse/FLINK-26548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504120#comment-17504120
 ] 

Lijie Wang commented on FLINK-26548:
------------------------------------

Hi [~Leo Zhou] , thanks for trying it.

-> and the parallelism of source operator is also 1 even I set 
*_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8

The "Custom File source" is a non-parallel source, it's parallelism must be 1.

 -> the num of records sent by the source operator is always 1

I think this is caused by FLINK-26576, and {{env.getParallelism()}} is *-1* 
when using adaptive batch scheduler. 

 

Currently, the adaptive batch scheduler can't know which operator is the actual 
source. It can only be judged by number of inputs, if there are no inputs, it 
is considered as a source. 
And I think this problem only occurs when using legacy source, because the new 
source(FLIP-27) will not have two operators.

> the source parallelism is not set correctly with AdaptiveBatchScheduler
> -----------------------------------------------------------------------
>
>                 Key: FLINK-26548
>                 URL: https://issues.apache.org/jira/browse/FLINK-26548
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.15.0
>            Reporter: zl
>            Priority: Major
>         Attachments: image-2022-03-09-19-00-18-396.png
>
>
> When running *_org.apache.flink.table.tpcds.TpcdsTestProgram_* with 
> {_}*AdaptiveBatchScheduler*{_}, I ran into a problem:the num of records sent 
> by the source operator is always 1, and the parallelism of source operator is 
> also 1 even I set 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_* to 8.
> !image-2022-03-09-19-00-18-396.png!
> After some research, I found that the operator A is not the actual file 
> reader, it just splits files and assigns splits to downstream tasks for 
> further processing, and the operator B is the actual file reader task. Here, 
> the parallelism of operator B is 64, and the records sent by operator A is 1, 
> this means, operator A assigned all splits to a task of operator B, {*}_the 
> other 63 tasks of operator B is idle_{*}, it is unreasonable.
> In this case,  the parallelism of operator B should be 
> *_jobmanager.adaptive-batch-scheduler.default-source-parallelism_*  and the 
> num of records sent by operator A also should be 
> {*}_jobmanager.adaptive-batch-scheduler.default-source-parallelism_{*}.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to