Hi Alexis,

Could you clarify what you mean by "If I add more slots to the task
manager, I see the transformations actually start in parallel even though I
submit the job with 'flink run -p 1'"?
Are you asking if multiple slots are working simultaneously, or if a single
JobVertex contains multiple subtasks?

In fact, the number of slots and parallelism are not the same concept.  And
Flink Batch jobs can run even with only a single slot, and when more slots
become available, Flink will schedule and deploy more parallelizable tasks
(unless their upstream tasks have not finished). If you want only one slot
to be active at a time, you can limit the resources of the cluster — for
instance, by setting "slotmanager.number-of-slots.max" to 1.

If you intend for each JobVertex to have a parallelism of 1, and you find
that this isn't being enforced when using the "flink run -p 1" command. In
that case, it would be helpful to have more detailed information to assist
with troubleshooting, including the version of Flink in use and the
JobManager logs.

Alexis Sarda-Espinosa <sarda.espin...@gmail.com> 于2024年7月6日周六 15:35写道:

> Hi Junrui,
>
> Thanks for the confirmation. I tested some more and I'm seeing a strange
> behavior.
>
> I'm currently testing a single source stream that is fed to 6 identical
> transformations. The state processor api requires batch mode and, from what
> I can tell, I must specify a parallelism of 1 in the job, otherwise it
> freezes. However, if I add more slots to the task manager, I see the
> transformations actually start in parallel even though I submit the job
> with "flink run -p 1". Is this expected of batch mode?
>
> Additionally, regardless of how much memory I give to the task manager,
> some transformations finish in around 6 seconds, and then the others need
> more than 1 minute even though it's the same transformation, and each one
> writes around 70MB in my local disk. The flame graph shows the slow
> operators are just parked due to an ArrayBlockingQueue whose size is hard
> coded as 16 in the Flink sources. Am I missing something crucial for tuning
> such jobs?
>
> Regards,
> Alexis.
>
> On Sat, 6 Jul 2024, 03:29 Junrui Lee, <jrlee....@gmail.com> wrote:
>
>> Hi Alexis,
>>
>> For the SavepointWriter, I've briefly looked over the code and the write
>> operation is enforced as non-parallel.
>>
>> Best,
>> Junrui
>>
>> Alexis Sarda-Espinosa <sarda.espin...@gmail.com> 于2024年7月6日周六 01:27写道:
>>
>>> Hi Gabor,
>>>
>>> Thanks for the quick response. What about SavepointWriter? In my case
>>> I'm actually writing a job that will read from an existing savepoint and
>>> modify some of its data to write a new one.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
>>> gabor.g.somo...@gmail.com>:
>>>
>>>> Hi Alexis,
>>>>
>>>> It depends. When one uses SavepointLoader to read metadata only then
>>>> it's non-parallel.
>>>> SavepointReader however is basically a normal batch job with all its
>>>> features.
>>>>
>>>> G
>>>>
>>>>
>>>> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Really quick question, when using the state processor API, are all
>>>>> transformations performed in a non-parallel fashion?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>

Reply via email to