Thank you for the response.

Yes currently in my PoC I'm using standalone integration.
Does ` spec.taskManager.replicas` has any effect when using native mode?

The reason I'm asking is that I need to know what is the "cacpacity" of
particular session cluster before I will submit the job into it.
And the way how I was doing this was this:

try (KubernetesClient kubernetesClient = new
KubernetesClientBuilder().build()) {
            MixedOperation<FlinkDeployment,
KubernetesResourceList<FlinkDeployment>,
io.fabric8.kubernetes.client.dsl.Resource<FlinkDeployment>>
                resources =
kubernetesClient.resources(FlinkDeployment.class);

            List<FlinkDeployment> items =
resources.inNamespace("default").list().getItems();
            for (FlinkDeployment item : items) {
                System.out.println("Flink Deployments: " + item);
                System.out.println("Number of TM replicas: " +
item.getSpec().getTaskManager().getReplicas());
            }
        }


Thanks,
Krzysztof

czw., 31 sie 2023 o 10:44 Gyula Fóra <gyula.f...@gmail.com> napisał(a):

> I guess your question is in the context of the standalone integration
> because native session deployments automatically add TMs on the fly as more
> are necessary.
>
> For standalone mode you should be able to configure
> `spec.taskManager.replicas` and if I understand correctly that will not
> shut down the running jobs.
> If you have problems please share your FlinkDeployment yaml and the
> operator logs in a JIRA ticket.
>
> In any case the native mode is probably better fit for your use-case.
>
> Gyula
>
> On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Just want to broth this up in case it was missed in the other
>> messages/queries :)
>>
>> TL:DR
>> How to add TM to Flink Session cluster via Java K8s client if Session
>> Cluster has running jobs?
>>
>> Thanks,
>> Krzysztof
>>
>> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> napisał(a):
>>
>>> Hi community,
>>> I have a use case where I would like to add an extra TM) to a running
>>> Flink session cluster that has Flink jobs deployed. Session cluster
>>> creation, job submission and cluster patching is done using flink k8s
>>> operator Java API. The Details of this are presented here [1]
>>>
>>> I would like to ask, what is a recommended path to add a TM to existing
>>> Session Cluster that currently runs number of Flink jobs using Java API.
>>> For simplicity lets assume that I dont want to resume jobs from a
>>> savepoint, just redeploy them.
>>>
>>> When executing steps from [1] I'm facing an issue where Session jobs are
>>> not redeployed on patched Session cluster however kubectl shows that there
>>> is FlinkSessionJob subbmited to the k8s.
>>>
>>> Additionally when I'm trying to delete FlinkSessionJob from kubectl,
>>> Flink k8s operator throws an exception described in [1]. In fact the state
>>> of that Flink deployment requires few steps to clean it up after that
>>> patch.
>>>
>>>
>>> [1]
>>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>>>
>>

Reply via email to