Re: No execution.target specified in your configuration file

2020-12-20 Thread Kostas Kloudas
Hi Ben,

You can try using StreamExecutionEnvironment
streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
instead of directly creating a new one. This will allow to pick up the
configuration parameters you pass through the command line.

I hope this helps,
Kostas

On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
>
> I was wondering if I could get help with the issue described in this 
> stackoverflow post.


See lag end-to-end

2020-12-20 Thread Rex Fenley
Hello,

Is there some proxy to seeing the relative time it takes for records to
make it through an entire job plan? Maybe checkpoint alignment time would
be a proxy for this? Is there metrics for that or something else that would
provide signal here?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Flink 1.12

2020-12-20 Thread Yang Wang
I am afraid only the state handle is stored in the ConfigMap. The real
state data is stored in
the distributed storage configured via "high-availability.storageDir". I
believe you could find
more information in this class KubernetesStateHandleStore[1].

How could you find that the checkpointing information is stored twice? It
should not happen.

[1].
https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53


Best,
Yang

Boris Lublinsky  于2020年12月20日周日 上午12:49写道:

> Thanks Yang,
> This is still confusing.
> I did more experiments and see that checkpointing information is stored
> twice - in config map and in high-availability.storageDir
> Do we need this duplication?
> Do we need to specify high-availability.storageDir as defined in
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration
> Or just specifying
>
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> Is sufficient?
>
>
>
> On Dec 17, 2020, at 10:09 PM, Yang Wang  wrote:
>
> The latest successful checkpoint pointer is stored in the ConfigMap, as
> well as the JobGraph pointer.
> They could help us recover the running jobs before you delete the K8s
> deployment. If the HA ConfigMaps
> are deleted, then when you create a Flink cluster with the same
> cluster-id, it could not recover from the latest
> successful checkpoint automatically.
>
> Best,
> Yang
>
>
>
>
> Boris Lublinsky  于2020年12月18日周五 上午11:42写道:
>
>> Also re reading
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>>
>> This does not seem right:
>> To keep HA data while restarting the Flink cluster, simply delete the
>> deployment (via kubectl delete deploy ). All the Flink
>> cluster related resources will be deleted (e.g. JobManager Deployment,
>> TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps
>> will be retained because they do not set the owner reference. When
>> restarting the cluster, all previously running jobs will be recovered and
>> restarted from the latest successful checkpoint.
>>
>> Last successful checkpoint is not in the config maps, but rather on
>> persistent volume. Config map can be safely deleted. If you restart JM, it
>> will create a new leader anyways., So I would suggest to add owner
>> reference there
>>
>>
>> On Dec 17, 2020, at 8:49 PM, Yang Wang  wrote:
>>
>> Hi Boris,
>>
>> Thanks for your follow up response and trying the new KubernetesHAService.
>>
>> 1. It is a valid bug. We are not setting the service account for
>> TaskManager pod. Before the KubernetesHAService is introduced, it works
>> fine because the TaskManager does not need to access the K8s resource(e.g.
>> ConfigMap) directly. I have created a ticket[1] to support setting service
>> account for TaskManager.
>> 2. If you directly delete the JobManager deployment, then the HA related
>> ConfigMap will be retained. It is a by-design behavior. Because the job
>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this
>> cluster could recover in the future. If all the jobs in the application
>> reach to the terminal state, all the HA related ConfigMaps will be cleaned
>> up automatically. You could cancel the job and verify that. Refer here[2]
>> for more information.
>>
>> For the PVC based storage, if it could support multiple read-write then
>> the KubernetesHAService should work. Actually, it feels like a distributed
>> storage.
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-20664
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>
>> Best,
>> Yang
>>
>> Boris Lublinsky  于2020年12月18日周五 上午7:16写道:
>>
>>> And K8 native HA works,
>>> But there are 2 bugs in this implementation.
>>>
>>> 1. Task manager pods are running as default user account, which fails
>>> because it does not have access to config maps to get endpoint’s
>>> information. I had to add permissions to default service account to make it
>>> work. Ideally both JM and TM pods should run under the same service
>>> account.
>>> 2. When a Flink application is deleted, it clears the main config map,
>>> but not the ones used for leader election
>>>
>>>
>>> And finally it works fine with PVC based storage, as long as it is
>>> read-write many
>>>
>>>
>>> On Dec 15, 2020, at 8:40 PM, Yang Wang  wrote:
>>>
>>> Hi Boris,
>>>
>>> What is -p 10?
>>>
>>> It is same to --parallelism 10. Set the default parallelism to 10.
>>>
>>> does it require a special container build?
>>>
>>> No, the official flink docker image could be used
>>> directly. Unfortunately, we do not have the image now. And we are trying to
>>> figure out.
>>> You could follow the instructions below to have your own image.
>>>
>>>
>>> git clone https://github.com

Re: Flink 1.12

2020-12-20 Thread Boris Lublinsky
I understand this.
State storage Is defined defined by state.checkpointing.dir, for example
state.checkpoints.dir: file:///mnt/flink/storage/checkpoints

I am talking about reference defined in 2 places


> On Dec 20, 2020, at 8:05 PM, Yang Wang  wrote:
> 
> I am afraid only the state handle is stored in the ConfigMap. The real state 
> data is stored in
> the distributed storage configured via "high-availability.storageDir". I 
> believe you could find
> more information in this class KubernetesStateHandleStore[1].
> 
> How could you find that the checkpointing information is stored twice? It 
> should not happen.
> 
> [1]. 
> https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53
>  
> 
> 
> 
> Best,
> Yang
> 
> Boris Lublinsky  > 于2020年12月20日周日 上午12:49写道:
> Thanks Yang,
> This is still confusing.
> I did more experiments and see that checkpointing information is stored twice 
> - in config map and in high-availability.storageDir
> Do we need this duplication?
> Do we need to specify high-availability.storageDir as defined in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration
>  
> 
> Or just specifying
> 
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> Is sufficient?
>  
> 
> 
> 
>> On Dec 17, 2020, at 10:09 PM, Yang Wang > > wrote:
>> 
>> The latest successful checkpoint pointer is stored in the ConfigMap, as well 
>> as the JobGraph pointer.
>> They could help us recover the running jobs before you delete the K8s 
>> deployment. If the HA ConfigMaps
>> are deleted, then when you create a Flink cluster with the same cluster-id, 
>> it could not recover from the latest
>> successful checkpoint automatically.
>> 
>> Best,
>> Yang
>> 
>> 
>> 
>> 
>> Boris Lublinsky > > 于2020年12月18日周五 上午11:42写道:
>> Also re reading 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>>  
>> 
>> 
>> This does not seem right:
>> To keep HA data while restarting the Flink cluster, simply delete the 
>> deployment (via kubectl delete deploy ). All the Flink cluster 
>> related resources will be deleted (e.g. JobManager Deployment, TaskManager 
>> pods, services, Flink conf ConfigMap). HA related ConfigMaps will be 
>> retained because they do not set the owner reference. When restarting the 
>> cluster, all previously running jobs will be recovered and restarted from 
>> the latest successful checkpoint.
>> 
>> Last successful checkpoint is not in the config maps, but rather on 
>> persistent volume. Config map can be safely deleted. If you restart JM, it 
>> will create a new leader anyways., So I would suggest to add owner reference 
>> there
>> 
>> 
>>> On Dec 17, 2020, at 8:49 PM, Yang Wang >> > wrote:
>>> 
>>> Hi Boris,
>>> 
>>> Thanks for your follow up response and trying the new KubernetesHAService.
>>> 
>>> 1. It is a valid bug. We are not setting the service account for 
>>> TaskManager pod. Before the KubernetesHAService is introduced, it works 
>>> fine because the TaskManager does not need to access the K8s resource(e.g. 
>>> ConfigMap) directly. I have created a ticket[1] to support setting service 
>>> account for TaskManager. 
>>> 2. If you directly delete the JobManager deployment, then the HA related 
>>> ConfigMap will be retained. It is a by-design behavior. Because the job 
>>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this 
>>> cluster could recover in the future. If all the jobs in the application 
>>> reach to the terminal state, all the HA related ConfigMaps will be cleaned 
>>> up automatically. You could cancel the job and verify that. Refer here[2] 
>>> for more information.
>>> 
>>> For the PVC based storage, if it could support multiple read-write then the 
>>> KubernetesHAService should work. Actually, it feels like a distributed 
>>> storage.
>>> 
>>> [1]. https://issues.apache.org/jira/browse/FLINK-20664 
>>> 
>>> [2]. 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>>  
>>> 
>>> 
>>> Best,

Re: No execution.target specified in your configuration file

2020-12-20 Thread Ben Beasley
That worked. Thankyou, Kostas.

From: Kostas Kloudas 
Date: Sunday, December 20, 2020 at 7:21 AM
To: Ben Beasley 
Cc: user@flink.apache.org 
Subject: Re: No execution.target specified in your configuration file
Hi Ben,

You can try using StreamExecutionEnvironment
streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
instead of directly creating a new one. This will allow to pick up the
configuration parameters you pass through the command line.

I hope this helps,
Kostas

On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
>
> I was wondering if I could get help with the issue described in this 
> stackoverflow post.


Re: Flink 1.12

2020-12-20 Thread Yang Wang
IIUC, "state.checkpoints.dir" is specifying an external checkpoint path,
which will not be cleaned up unless
the users configured it explicitly[1].

However, for "high-availability.storageDir", it will be cleaned up
automatically when all the jobs in the application
reaches to the terminal state. Moreover, not only the checkpoints, but also
the generated job graphs, user jars/artifacts
are stored in this storage. You could check the content of this directory.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention

Best,
Yang

Boris Lublinsky  于2020年12月21日周一 上午10:18写道:

> I understand this.
> State storage Is defined defined by state.checkpointing.dir, for example
>
> state.checkpoints.dir: file:///mnt/flink/storage/checkpoints
>
>
> I am talking about reference defined in 2 places
>
>
> On Dec 20, 2020, at 8:05 PM, Yang Wang  wrote:
>
> I am afraid only the state handle is stored in the ConfigMap. The real
> state data is stored in
> the distributed storage configured via "high-availability.storageDir". I
> believe you could find
> more information in this class KubernetesStateHandleStore[1].
>
> How could you find that the checkpointing information is stored twice? It
> should not happen.
>
> [1].
> https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53
>
>
> Best,
> Yang
>
> Boris Lublinsky  于2020年12月20日周日 上午12:49写道:
>
>> Thanks Yang,
>> This is still confusing.
>> I did more experiments and see that checkpointing information is stored
>> twice - in config map and in high-availability.storageDir
>> Do we need this duplication?
>> Do we need to specify high-availability.storageDir as defined in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration
>> Or just specifying
>>
>> high-availability: 
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>
>> Is sufficient?
>>
>>
>>
>> On Dec 17, 2020, at 10:09 PM, Yang Wang  wrote:
>>
>> The latest successful checkpoint pointer is stored in the ConfigMap, as
>> well as the JobGraph pointer.
>> They could help us recover the running jobs before you delete the K8s
>> deployment. If the HA ConfigMaps
>> are deleted, then when you create a Flink cluster with the same
>> cluster-id, it could not recover from the latest
>> successful checkpoint automatically.
>>
>> Best,
>> Yang
>>
>>
>>
>>
>> Boris Lublinsky  于2020年12月18日周五 上午11:42写道:
>>
>>> Also re reading
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>>>
>>> This does not seem right:
>>> To keep HA data while restarting the Flink cluster, simply delete the
>>> deployment (via kubectl delete deploy ). All the Flink
>>> cluster related resources will be deleted (e.g. JobManager Deployment,
>>> TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps
>>> will be retained because they do not set the owner reference. When
>>> restarting the cluster, all previously running jobs will be recovered and
>>> restarted from the latest successful checkpoint.
>>>
>>> Last successful checkpoint is not in the config maps, but rather on
>>> persistent volume. Config map can be safely deleted. If you restart JM, it
>>> will create a new leader anyways., So I would suggest to add owner
>>> reference there
>>>
>>>
>>> On Dec 17, 2020, at 8:49 PM, Yang Wang  wrote:
>>>
>>> Hi Boris,
>>>
>>> Thanks for your follow up response and trying the new
>>> KubernetesHAService.
>>>
>>> 1. It is a valid bug. We are not setting the service account for
>>> TaskManager pod. Before the KubernetesHAService is introduced, it works
>>> fine because the TaskManager does not need to access the K8s resource(e.g.
>>> ConfigMap) directly. I have created a ticket[1] to support setting service
>>> account for TaskManager.
>>> 2. If you directly delete the JobManager deployment, then the HA related
>>> ConfigMap will be retained. It is a by-design behavior. Because the job
>>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this
>>> cluster could recover in the future. If all the jobs in the application
>>> reach to the terminal state, all the HA related ConfigMaps will be cleaned
>>> up automatically. You could cancel the job and verify that. Refer here[2]
>>> for more information.
>>>
>>> For the PVC based storage, if it could support multiple read-write then
>>> the KubernetesHAService should work. Actually, it feels like a distributed
>>> storage.
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-20664
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>>
>>> Best,
>>> Yang
>>>
>>> Boris Lublinsky  于2020年12月18日周五 上午7:16写道:
>>>
 And K8 native HA works,
 But there a

Re: See lag end-to-end

2020-12-20 Thread Yun Gao
Hi Rex,

   I think Latency Marker is what you need [1].


Best,
 Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking

--
Sender:Rex Fenley
Date:2020/12/21 04:57:59
Recipient:user
Cc:Brad Davis
Theme:See lag end-to-end

Hello,

Is there some proxy to seeing the relative time it takes for records to make it 
through an entire job plan? Maybe checkpoint alignment time would be a proxy 
for this? Is there metrics for that or something else that would provide signal 
here?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US 


a question about KubernetesConfigOptions

2020-12-20 Thread Debasish Ghosh
Hello -

In
https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
the various supported options are declared as constants.

I see that there is no support for options like Volumes and VolumeMounts.
Also I see entries for JOB_MANANGER_CPU and TASK_MANAGER_CPU but not for
JOB_MANAGER_MEMORY and TASK_MANAGER_MEMORY. How do we accommodate these if
we want to pass them as well ? I see that the class is annotated
with @PublicEvolving - just wanted to clarify if these are planned to be
added in future.

regards.
-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg