Flink sink never executes

2020-12-21 Thread Ben Beasley
First off I want to thank the folks in this email list for their help thus far.

I’m facing another strange issue where if I add a window to my stream, the sink 
no longer executes. However the sink executes without the windowing. I 
described my problem on 
stackoverflow
 so that the code is easy to read.

I wonder if anyone can help me once more, I believe the solution could be 
simple for someone familiar with the code. I believe I’ve followed the 
tutorials and articles on the flink website correctly.


Re: No execution.target specified in your configuration file

2020-12-21 Thread Kostas Kloudas
Glad I could help!

On Mon, Dec 21, 2020 at 3:42 AM Ben Beasley  wrote:
>
> That worked. Thankyou, Kostas.
>
>
>
> From: Kostas Kloudas 
> Date: Sunday, December 20, 2020 at 7:21 AM
> To: Ben Beasley 
> Cc: user@flink.apache.org 
> Subject: Re: No execution.target specified in your configuration file
>
> Hi Ben,
>
> You can try using StreamExecutionEnvironment
> streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> instead of directly creating a new one. This will allow to pick up the
> configuration parameters you pass through the command line.
>
> I hope this helps,
> Kostas
>
> On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
> >
> > I was wondering if I could get help with the issue described in this 
> > stackoverflow post.


checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout:
 60 min

execution.checkpointing.tolerable-failed-checkpoints:12

execution.checkpointing.unaligned
  true
and also explicitly set
state.checkpoints.dir



Queryable state on task managers that are not running the job

2020-12-21 Thread Martin Boyanov
Hi,
I'm running a long-running flink job in cluster mode and I'm interested in
using the queryable state functionality.
I have the following problem: when I query the flink task managers (i.e.
the queryable state proxy), it is possible to hit a task manager which
doesn't have the requested state, because the job is not running on that
task manager.
For example, I might have a cluster with 5 task managers, but the job is
deployed only on 3 of those. If my query hits any of the two idle task
managers, I naturally get an error message that the job does not exist.
My current solution is to size the cluster appropriately so that there are
no idle task managers. I was wondering if there was a better solution or if
this could be handled better in the future?
Thanks in advance.
Kind regards,
Martin


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread Till Rohrmann
What are exactly the problems when the checkpoint recovery does not work?
Even if the ZooKeeper connection is temporarily disconnected which leads to
the JobMaster losing leadership and the job being suspended, the next
leader should continue where the first job left stopped because of the lost
ZooKeeper connection.

What happens under the hood when restoring from a savepoint is that it is
inserted into the CompletedCheckpointStore where also the other checkpoints
are stored. If now a failure happens, Flink will first try to recover from
a checkpoint/savepoint from the CompletedCheckpointStore and only if this
store does not contain any checkpoints/savepoints, it will use the
savepoint with which the job is started. The CompletedCheckpointStore
persists the checkpoint/savepoint information by writing the pointers to
ZooKeeper.

Cheers,
Till

On Mon, Dec 21, 2020 at 11:38 AM vishalovercome  wrote:

> Thanks for your reply!
>
> What I have seen is that the job terminates when there's intermittent loss
> of connectivity with zookeeper. This is in-fact the most common reason why
> our jobs are terminating at this point. Worse, it's unable to restore from
> checkpoint during some (not all) of these terminations. Under these
> scenarios, won't the job try to recover from a savepoint?
>
> I've gone through various tickets reporting stability issues due to
> zookeeper that you've mentioned you intend to resolve soon. But until the
> zookeeper based HA is stable, should we assume that it will repeatedly
> restore from savepoints? I would rather rely on kafka offsets to resume
> where it left off rather than savepoints.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
Thanks for your reply!

What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminations. Under these
scenarios, won't the job try to recover from a savepoint? 

I've gone through various tickets reporting stability issues due to
zookeeper that you've mentioned you intend to resolve soon. But until the
zookeeper based HA is stable, should we assume that it will repeatedly
restore from savepoints? I would rather rely on kafka offsets to resume
where it left off rather than savepoints.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-21 Thread vishalovercome
Thanks for your reply!

What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminations. Under these
scenarios, won't the job try to recover from a savepoint?

I've gone through various tickets reporting stability issues due to
zookeeper that you've mentioned you intend to resolve soon. But until the
zookeeper based HA is stable, should we assume that it will repeatedly
restore from savepoints? I would rather rely on kafka offsets to resume
where it left off rather than savepoints.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 
statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 
1.9.2. There may be different issues for checkpoint timeout, and one possible 
one might be there are back-pressure due to some operator could not process its 
records  in time, which would block the checkpoints. I think you might check 
the back-pressure [1] first, and if there is indeed back pressure, then you 
might try unaligned checkpoints or solve the back pressure by increasing the 
parallelism of slow operators. 

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html
 



 --Original Mail --
Sender:Colletta, Edward 
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org 
Subject:checkpointing seems to be throttled.

Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.

  
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.
We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?
Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?
For now we plan on setting 
execution.checkpointing.timeout: 60 min
execution.checkpointing.tolerable-failed-checkpoints:12execution.checkpointing.unaligned
  trueand also explicitly setstate.checkpoints.dir


Re: [Help Required:]-Unable to submit job from REST API

2020-12-21 Thread Yun Gao

Hi Puneet,

   From the doc it seems submitting a job via rest api should send a post 
request to /jars/:jarid/run [1]. The response "Not Found" should means the REST 
API server does not know the request type.

Best,
 Yun





 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html#jars-jarid-run--
Sender:Puneet Kinra
Date:2020/12/21 19:47:31
Recipient:user
Theme:[Help Required:]-Unable to submit job from REST API

oHi All

Unable to submit job from REST API (Flink-Monitoring API),

Steps followed:

1) Load the jar using load api.
2) can see the jar in the /tmp/flink-web folder.
3) Try to run the jar using the following.

Request

http://host-ip/45f30ad6-c8fb-4c2c-9fbf-c4f56acdd9d9_stream-processor-jar-with-dependencies.jar/run?programArgs=/users/puneet/app/orchestrator/PropertiesStream_back.json&entryClass=com.orchestrator.flowExecution.GraphExecutor
 

Response:

{
"errors": [
"Not found."
]
}

-- 
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
e-mail :puneet.ki...@customercentria.com




[Help Required:]-Unable to submit job from REST API

2020-12-21 Thread Puneet Kinra
oHi All

Unable to submit job from REST API (Flink-Monitoring API),

*Steps followed:*

1) Load the jar using load api.
2) can see the jar in the /tmp/flink-web folder.
3) Try to run the jar using the following.

*Request*

http://host-ip/45f30ad6-c8fb-4c2c-9fbf-c4f56acdd9d9_stream-processor-jar-with-dependencies.jar/run?programArgs=/users/puneet/app/orchestrator/PropertiesStream_back.json&entryClass=com.orchestrator.flowExecution.GraphExecutor


Response:

{
"errors": [
"Not found."
]
}

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


checkpoint delay consume message

2020-12-21 Thread nick toker
Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between
the time we write a message to the KAFKA topic and the time the flink kafka
connector consumes this message.
The delay is closely related to checkpointInterval and/or
minPauseBetweenCheckpoints meening that the MAX delay when consuming a
message from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick


RE: checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao 
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 
statebackend like rocksdb.

   For the checkpoint timeout, AFAIK there should be no large changes after 
1.9.2. There may be different issues for checkpoint timeout, and one possible 
one might be there are back-pressure due to some operator could not process its 
records  in time, which would block the checkpoints. I think you might check 
the back-pressure [1] first, and if there is indeed back pressure, then you 
might try unaligned checkpoints or solve the back pressure by increasing the 
parallelism of slow operators.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/monitoring/back_pressure.html



--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Mon Dec 21 17:50:15 2020
Recipients:user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject:checkpointing seems to be throttled.
Using session cluster with three taskmanagers, cluster.evenly-spread-out-slots 
is set to true.  13 jobs running.  Average parallelism of each job is 4.
Flink version 1.11.2, Java 11.
Running on AWS EC2 instances with EFS for high-availability.storageDir.


We are seeing very high checkpoint times and experiencing timeouts.  The 
checkpoint timeout is the default 10 minutes.   This does not seem to be 
related to EFS limits/throttling .  We started experiencing these timeouts 
after upgrading from Flink 1.9.2/Java 8.  Are there any known issues which 
cause very high checkpoint times?

Also I noticed we did not set state.checkpoints.dir, I assume it is using 
high-availability.storageDir.  Is that correct?

For now we plan on setting

execution.checkpointing.timeout:
 60 min

execution.checkpointing.tolerable-failed-checkpoints:12

execution.checkpointing.unaligned
  true
and also explicitly set
state.checkpoints.dir



Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick



Re: RE: checkpointing seems to be throttled.

2020-12-21 Thread Yun Gao
Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said 

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun



 --Original Mail --
Sender:Colletta, Edward 
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao , user@flink.apache.org 

Subject:RE: checkpointing seems to be throttled.

Thanks for the quick response.
We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any. 
From: Yun Gao  
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: checkpointing seems to be throttled.
This email is from an external source -exercise caution regarding links and 
attachments.
Hi Edward,

For the second issue, have you also set the statebackend type? I'm asking 
so because except for the default heap statebackend, other statebackends should 
throws exception if the state.checkpoint.dir is not set. Since heap 
statebackend stores all the snapshots in the JM's memory, it could not be 
recovered after JM failover, which makes it not suitable for production usage. 
Therefore, if used in production env then it might better to switch to 
stateback

RE: RE: checkpointing seems to be throttled.

2020-12-21 Thread Colletta, Edward
Doh!   Yeah, we set the state backend in code and I read the flink-conf.yaml 
file and use the high-availability storage dir.


From: Yun Gao 
Sent: Monday, December 21, 2020 11:28 AM
To: Colletta, Edward ; user@flink.apache.org
Subject: Re: RE: checkpointing seems to be throttled.

This email is from an external source - exercise caution regarding links and 
attachments.

Hi Edward,

   Are you setting the FSStateBackend via code or flink-conf.yaml ? If via code 
it requires a path parameter and the path would be the state.checkpoint.dir. If 
via flink-conf.yaml, I tried on 1.12 by setting   state.backend: filesystem in 
config file and enable checkpoint, it indeed threw an exception said

  org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Cannot create the file system state backend: The configuration 
does not specify the checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
​ at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
​ at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
​ at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
​ at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
​ at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
​ at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
​ at java.security.AccessController.doPrivileged(Native Method)
​ at javax.security.auth.Subject.doAs(Subject.java:422)
​ at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
​ at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
​ at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.checkpoints.dir'
​ at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
​ at 
org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:122)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.loadStateBackend(StreamExecutionEnvironment.java:863)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:819)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.(StreamExecutionEnvironment.java:237)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.(StreamContextEnvironment.java:67)
​ at 
org.apache.flink.client.program.StreamContextEnvironment.lambda$setAsContext$4(StreamContextEnvironment.java:156)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$getExecutionEnvironment$12(StreamExecutionEnvironment.java:2089)
​ at java.util.Optional.map(Optional.java:215)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2089)
​ at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:2070)
​ at CheckpointTest.main(CheckpointTest.java:26)
​ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
​ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
​ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
​ at java.lang.reflect.Method.invoke(Method.java:498)
​ at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
​ ... 11 more
​
​
   For the timeout, if there are no backpressure, I think it might be helpful 
to see the time decompostion for the checkpoint in the checkpoint history page 
in WEB UI to see which phase takes too long time.


Best,
 Yun


--Original Mail --
Sender:Colletta, Edward 
mailto:edward.colle...@fmr.com>>
Send Date:Tue Dec 22 00:04:03 2020
Recipients:Yun Gao mailto:yungao...@aliyun.com>>, 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject:RE: checkpointing seems to be throttled.
Thanks for the quick response.

We are using FsStateBackend, and I did see checkpoint files and directories in 
the EFS mounted directory.
We do monitor backpressure through rest api periodically and we do not see any.


From: Yun Gao mailto:yungao...@aliyun.com>>
Sent: Monday, December 21, 2020 10:40 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org
Subject: Re: checkpointing se

Re: checkpoint delay consume message

2020-12-21 Thread nick toker
hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com
‬‏>:‬

>  Hi Nick,
>
> Are you using EXACTLY_ONCE semantics ? If so the sink would use
> transactions, and only commit the transaction on checkpoint complete to
> ensure end-to-end exactly-once. A detailed description could be find in [1]
>
>
> Best,
>  Yun
>
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> --
> Sender:nick toker
> Date:2020/12/21 23:52:34
> Recipient:user
> Theme:checkpoint delay consume message
>
> Hello,
>
> We noticed the following behavior:
> If we enable the flink checkpoints, we saw that there is a delay between
> the time we write a message to the KAFKA topic and the time the flink kafka
> connector consumes this message.
> The delay is closely related to checkpointInterval and/or
> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
> message from KAFKA will be one of these parameters.
>
> Could you please advise how we can remove/control this delay?
>
> we use flink 1.11.2
>
> BR
> nick
>
>


NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Yuval Itzchakov
Hi,

While trying to execute a query via TableEnvironment.sqlQuery in Flink
1.12, I receive the following exception:

java.lang.NullPointerException
:114, RelMetadataQuery (org.apache.calcite.rel.metadata)
:76, RelMetadataQuery (org.apache.calcite.rel.metadata)
get:39, FlinkRelOptClusterFactory$$anon$1
(org.apache.flink.table.planner.calcite)
get:38, FlinkRelOptClusterFactory$$anon$1
(org.apache.flink.table.planner.calcite)
getMetadataQuery:178, RelOptCluster (org.apache.calcite.plan)
create:108, LogicalFilter (org.apache.calcite.rel.logical)
createFilter:344, RelFactories$FilterFactoryImpl
(org.apache.calcite.rel.core)
convertWhere:1042, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:666, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:644, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3438, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:570, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:165,
FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:157, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:823, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
convertSqlQuery:795, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
convert:250, SqlToOperationConverter
(org.apache.flink.table.planner.operations)
parse:78, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:639, TableEnvironmentImpl (org.apache.flink.table.api.internal)
$anonfun$translateTemplate$2:476, Foo$ (Foo)
apply:-1, 644680650 (ai.hunters.pipeline.Processors$$$Lambda$1597)
evaluateNow:361, FiberContext (zio.internal)
$anonfun$evaluateLater$1:778, FiberContext (zio.internal)
run:-1, 289594359 (zio.internal.FiberContext$$Lambda$617)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)

This seems to be coming from the FlinkRelMetadataQuery class attempting to
initialize all handlers:

[image: image.png]

This seems to be coming from the calcite shaded JAR
inside "flink-table-planner-blink-1.12"

Has anyone ran into this issue? I saw a thread in the chinese user group
but I don't understand what's been said there (
https://www.mail-archive.com/user-zh@flink.apache.org/msg05874.html)
-- 
Best Regards,
Yuval Itzchakov.


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
I don't know how to reproduce it but what I've observed are three kinds of
termination when connectivity with zookeeper is somehow disrupted. I don't
think its an issue with zookeeper as it supports a much bigger kafka cluster
since a few years. 

1. The first kind is exactly this -
https://github.com/apache/flink/pull/11338. Basically temporary loss of
connectivity or rolling upgrade of zookeeper will cause job to terminate. It
will restart eventually from where it left off.
2. The second kind is when job terminates and restarts for the same reason
but is unable to recover from checkpoint. I think its similar to this -
https://issues.apache.org/jira/browse/FLINK-19154. If upgrading to 1.12.0
(from 1.11.2) will fix the second issue then I'll upgrade. 
3. The third kind is where it repeatedly restarts as its unable to establish
a session with Zookeeper. I don't know if reducing session timeout will help
here but in this case, I'm forced to disable zookeeper HA entirely as the
job cannot even restart here. 

I could create a JIRA ticket for discussion zookeeper itself if you suggest
but the issue of zookeeper and savepoints are related as I'm not sure what
will happen in each of the above.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


numRecordsOutPerSecond metric and side outputs

2020-12-21 Thread Alexey Trenikhun
Hello,
Does numRecordsOutPerSecond metric takes into account number of records send to 
side output or it provides rate only for main output?

Thanks,
Alexey


Re: NullPointerException while calling TableEnvironment.sqlQuery in Flink 1.12

2020-12-21 Thread Danny Chan
Hi Yuval Itzchakov ~

The thread you paste has a different stake trace with your case.

In the pasted thread, the JaninoRelMetadataProvider was missed because we
only set it once in a thread local variable, when the RelMetadataQuery was
used in a different working thread, the JaninoRelMetadataProvider caused an
NPE.

For your case, based on the stack trace, this line throws ~

RelMetadataQuery line 114:

super(null);

But actually this line allows an empty argument and it should not throw.

Can you give a re-producecable case here so that we can debug and find more
evidence ?

Yuval Itzchakov  于2020年12月22日周二 上午1:52写道:

> Hi,
>
> While trying to execute a query via TableEnvironment.sqlQuery in Flink
> 1.12, I receive the following exception:
>
> java.lang.NullPointerException
> :114, RelMetadataQuery (org.apache.calcite.rel.metadata)
> :76, RelMetadataQuery (org.apache.calcite.rel.metadata)
> get:39, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> get:38, FlinkRelOptClusterFactory$$anon$1
> (org.apache.flink.table.planner.calcite)
> getMetadataQuery:178, RelOptCluster (org.apache.calcite.plan)
> create:108, LogicalFilter (org.apache.calcite.rel.logical)
> createFilter:344, RelFactories$FilterFactoryImpl
> (org.apache.calcite.rel.core)
> convertWhere:1042, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelectImpl:666, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertSelect:644, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQueryRecursive:3438, SqlToRelConverter (org.apache.calcite.sql2rel)
> convertQuery:570, SqlToRelConverter (org.apache.calcite.sql2rel)
> org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:165,
> FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> rel:157, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
> toQueryOperation:823, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convertSqlQuery:795, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> convert:250, SqlToOperationConverter
> (org.apache.flink.table.planner.operations)
> parse:78, ParserImpl (org.apache.flink.table.planner.delegation)
> sqlQuery:639, TableEnvironmentImpl (org.apache.flink.table.api.internal)
> $anonfun$translateTemplate$2:476, Foo$ (Foo)
> apply:-1, 644680650 (ai.hunters.pipeline.Processors$$$Lambda$1597)
> evaluateNow:361, FiberContext (zio.internal)
> $anonfun$evaluateLater$1:778, FiberContext (zio.internal)
> run:-1, 289594359 (zio.internal.FiberContext$$Lambda$617)
> runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
> run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
> run:748, Thread (java.lang)
>
> This seems to be coming from the FlinkRelMetadataQuery class attempting to
> initialize all handlers:
>
> [image: image.png]
>
> This seems to be coming from the calcite shaded JAR
> inside "flink-table-planner-blink-1.12"
>
> Has anyone ran into this issue? I saw a thread in the chinese user group
> but I don't understand what's been said there (
> https://www.mail-archive.com/user-zh@flink.apache.org/msg05874.html)
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: a question about KubernetesConfigOptions

2020-12-21 Thread Yang Wang
Hi Debasish Ghosh,

Thanks for the attention on native K8s integration of Flink.

1. For volumes and volumes mount, it is not supported now. And we are
trying to get it done via pod template. Refer here[1] for more information.

2. Currently, on different deployments, Flink has different cpu config
options. But for the memory, all the deployments share the same config
options. You could find more information here[2].
* yarn.appmaster.vcores
* yarn.containers.vcores
* kubernetes.jobmanager.cpu
* kubernetes.taskmanager.cpu

3. You are right. This class is PublicEvolving and we may introduce more
config options in the future(e.g. pod template related).


[1].
https://lists.apache.org/thread.html/rf2e7b9be96f2bd5106d08ffb573d55f70a8acfb0b814a21d8b50d747%40%3Cdev.flink.apache.org%3E
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup.html

Debasish Ghosh  于2020年12月21日周一 下午3:21写道:

> Hello -
>
> In
> https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
> the various supported options are declared as constants.
>
> I see that there is no support for options like Volumes and VolumeMounts.
> Also I see entries for JOB_MANANGER_CPU and TASK_MANAGER_CPU but not for
> JOB_MANAGER_MEMORY and TASK_MANAGER_MEMORY. How do we accommodate these if
> we want to pass them as well ? I see that the class is annotated
> with @PublicEvolving - just wanted to clarify if these are planned to be
> added in future.
>
> regards.
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Re: Re: checkpoint delay consume message

2020-12-21 Thread Yun Gao
Hi nick,

   Sorry I initially think that the data is also write into Kafka with flink . 
So it could be ensured that there is no delay in the write side, right ? Does 
the delay in the read side keeps existing ?

Best,
 Yun




 --Original Mail --
Sender:nick toker 
Send Date:Tue Dec 22 01:43:50 2020
Recipients:Yun Gao 
CC:user 
Subject:Re: checkpoint delay consume message

hi

i am confused

the delay in in the source when reading message not on the sink

nick

‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪yungao...@aliyun.com‬‏>:‬

 Hi Nick,

Are you using EXACTLY_ONCE semantics ? If so the sink would use 
transactions, and only commit the transaction on checkpoint complete to ensure 
end-to-end exactly-once. A detailed description could be find in [1]


Best,
 Yun


[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

--
Sender:nick toker
Date:2020/12/21 23:52:34
Recipient:user
Theme:checkpoint delay consume message

Hello,

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the 
time we write a message to the KAFKA topic and the time the flink kafka 
connector consumes this message.
The delay is closely related to checkpointInterval and/or 
minPauseBetweenCheckpoints meening that the MAX delay when consuming a message 
from KAFKA will be one of these parameters.

Could you please advise how we can remove/control this delay?

we use flink 1.11.2

BR
nick