PLC/Scada/Sensor anomaly detection

2016-05-03 Thread Ivan
Hello!
Has anyone used Flink in "production" for PLC's sanomaly detections?
Any pointers/docs to check?


Best regards,

Iván Venzor C.


is there ways to enable checkpoint from flink-conf.yaml?

2017-07-26 Thread Ivan

Hi , Flink users

we are using Flink as the runtime of our beam jobs which works great, 
recently we want to enable restart strategy 
 
in our flink cluster, from the document I see restart strategy will only 
work when checkpointing is enabled. I'm trying to find out if it's 
possible to enable checkpointing from flink-conf.yaml which is 
equivalent to the call " 
flinkStreamEnv.enableCheckpointing(checkpointInterval);" in 
StreamExecutionEnvironment.


the reason we want to config it through flink-conf.yaml is that we use 
helm to create flink cluster ondemand for dedicated job which works 
great on kubernetes env. with beam if we want to enable checkpointing , 
we have to create FlinkPipelineOptions which is cross platform. (like  
you use Hibernate Session in JPA code). so we are trying to find a way 
to enable it from flink-conf.yaml .


sample flink-conf.yaml as below.

  flink-conf.yaml: |
blob.server.port: 6124
jobmanager.rpc.address: address-cache-flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 756
taskmanager.numberOfTaskSlots: 4
parallelism.default: 16
metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9100-9101
metrics.reporter.prom.prefix: flink_jm_
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 300 s
state.backend: filesystem
state.backend.fs.checkpointdir: 
file:///var/nfs/ephem_store/flink/checkpoints




Re: is there ways to enable checkpoint from flink-conf.yaml?

2017-07-26 Thread Ivan
one more thing I found is a little confusing for Restart Strategies 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html> 
document is



/The default restart strategy is set via Flink’s configuration file 
flink-conf.yaml. The configuration parameter restart-strategy defines 
which strategy is taken.If checkpointing is not enabled, the “no 
restart” strategy is used. If checkpointing is activated and the restart 
strategy has not been configured, the fixed-delay strategy is used with 
Integer.MAX_VALUE restart attempts. See the following list of available 
restart strategies to learn what values are supported./


I think the document marked as blue will only take effect when no 
restart strategy is defined in flink-conf.yaml. it's quite easy to 
misunderstand that the restart strategies is only enabled when 
checkpointing is enabled, otherwise "no restart" strategy is used.



Thanks

On 26/07/2017 10:32 PM, Chesnay Schepler wrote:

There is no option that enables checkpointing for all jobs.

If you have control over///all/ jobs, as a *hack*, you could load the 
configuration manually (I don't think it is exposed through the 
execution environment) using 
"GlobalConfiguration.loadConfiguration()", manually check it for 
whatever setting you put in the config,

and enable checkpointing based on that.

Note that we discourage the use of the GlobalConfiguration in general, 
and the above may not work anymore in an upcoming version.


On 26.07.2017 16:15, Ivan wrote:


Hi , Flink users

we are using Flink as the runtime of our beam jobs which works great, 
recently we want to enable restart strategy 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html> 
in our flink cluster, from the document I see restart strategy will 
only work when checkpointing is enabled. I'm trying to find out if 
it's possible to enable checkpointing from flink-conf.yaml which is 
equivalent to the call " 
flinkStreamEnv.enableCheckpointing(checkpointInterval);" in 
StreamExecutionEnvironment.


the reason we want to config it through flink-conf.yaml is that we 
use helm to create flink cluster ondemand for dedicated job which 
works great on kubernetes env. with beam if we want to enable 
checkpointing , we have to create FlinkPipelineOptions which is cross 
platform. (like  you use Hibernate Session in JPA code). so we are 
trying to find a way to enable it from flink-conf.yaml .


sample flink-conf.yaml as below.

  flink-conf.yaml: |
blob.server.port: 6124
jobmanager.rpc.address: address-cache-flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 756
taskmanager.numberOfTaskSlots: 4
parallelism.default: 16
metrics.reporters: prom
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 9100-9101
metrics.reporter.prom.prefix: flink_jm_
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 300 s
state.backend: filesystem
state.backend.fs.checkpointdir: 
file:///var/nfs/ephem_store/flink/checkpoints








Exception on s3 committer

2020-08-28 Thread Ivan Yang
Hi all,

We got this exception after a job restart. Does anyone know what may lead to 
this situation? and how to get pass this Checkpoint issue? Prior to this, the 
job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
writing out 10K files to s3 every 10 minutes using StreamingFileSink/BulkFormat 
to various s3 prefixes. Thanks in advance. -Ivan

2020-08-28 15:17:58
java.io.IOException: Recovering commit failed for object 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object does 
not exist and MultiPart Upload 
3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
 is not valid.
at 
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804: 
com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does 
not exist. The upload ID may be invalid, or the upload may have been aborted or 
completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; 
Request ID: 9A99AFAD80A8F202; S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=), 
S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:222)
at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:267)
at 
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.commitMultiPartUpload(HadoopS3AccessHelper.java:84)
at 
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:85)
... 23 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified 
upload does not exist. The upload ID may be

Re: Exception on s3 committer

2020-08-31 Thread Ivan Yang
Hi Yun,

Thank you so much for you suggestion.

(1) The job couldn’t restore from the last checkpoint. The exception is in my 
original email.
(2) No, I didn’t change any multipart upload settings. 
(3) The file is gone. I have another batch process that reads Flink output s3 
bucket and pushes objects to another bucket. Upon success read and write, The 
batch job will delete the file. What’s puzzling me is if Flink hasn’t 
successfully commit the multipart file, it should not be visible to the batch 
job. It looks the situation is while Flink tried to commit the multipart file, 
it crashed and restarted. The file is committed on s3 successfully, but not 
acknowledge recorded on Flink side. In between, the batch job consumed the 
file. I don’t know if that’s possible.

Thanks
Ivan

> On Aug 30, 2020, at 11:10 PM, Yun Gao  wrote:
> 
> 
> Hi Ivan,
> 
>I think there might be some points to check:
> 
>1. Is the job restored from the latest successful checkpoint after restart 
> ? 
>2. Have you ever changed the timeout settings for uncompleted multipart 
> upload ?
>3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 
> exist or not ?
> 
> Best,
>  Yun
> 
> --Original Mail --
> Sender:Ivan Yang 
> Send Date:Sat Aug 29 12:43:28 2020
> Recipients:user 
> Subject:Exception on s3 committer
> Hi all,
> 
> We got this exception after a job restart. Does anyone know what may lead to 
> this situation? and how to get pass this Checkpoint issue? Prior to this, the 
> job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
> writing out 10K files to s3 every 10 minutes using 
> StreamingFileSink/BulkFormat to various s3 prefixes. Thanks in advance. -Ivan
> 
> 2020-08-28 15:17:58
> java.io <http://java.io/>.IOException: Recovering commit failed for object 
> cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object 
> does not exist and MultiPart Upload 
> 3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
>  is not valid.
> at 
> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream

Flink performance tuning on operators

2020-05-14 Thread Ivan Yang
Hi,

We have a Flink job that reads data from an input stream, then converts each 
event from JSON string Avro object, finally writes to parquet files using 
StreamingFileSink with OnCheckPointRollingPolicy of 5 mins. Basically a 
stateless job. Initially, we use one map operator to convert Json string to 
Avro object, Inside the map function, it goes form String -> JsonObject -> Avro 
object. 

DataStream avroData = data.map(new JsonToAVRO());

When we try to break the map operator to two, one for String to JsonObject, 
another for JsonObject to Avro. 

DataStream JsonData = data.map(new StringToJson());
DataStream avroData = rawDataAsJson.map(new 
JsonToAvroSchema())

The benchmark shows significant performance hit when breaking down to two 
operators. We try to understand the Flink internal on why such a big 
difference. The setup is using state backend = filesystem. Checkpoint = s3 
bucket. Our event object has 300+ attributes.


Thanks
Ivan

Flink on Kubernetes

2020-05-21 Thread Ivan Yang
Hi,

I have setup Filnk 1.9.1 on Kubernetes on AWS EKS with one job manager pod, 10 
task manager pods, one pod per EC2 instance. Job runs fine. After a while, for 
some reason, one pod (task manager) crashed, then the pod restarted. After 
that, the job got into a bad state. All the parallelisms are showing different 
color (orange, purple) on the console. I had to basically stop the entire job. 
My question is should a task manager restart affect the entire cluster/job? Or 
should it join back gracefully?

Second question is regarding to auto scaling Flink cluster on kubernetes. If I 
add more nodes/pods (task manager containers) to the cluster, will a running 
Flink job redistribute load to the additional resources or I have to stop to a 
savepoint, and restart the job?

Thanks and regards.
Ivan

Completed Job List in Flink UI

2020-06-18 Thread Ivan Yang
Hello,

In Flink web UI Overview tab, "Completed Job List” displays recent completed or 
cancelled job only for short period of time. After a while, they are gone. The 
Job Manager is up and never restarted. Is there a config key to keep job 
history in the Completed Job List for longer time? I am using Flink 1.9

Thank you in advance.

Ivan

Flink 1.11 job stop with save point timeout error

2020-07-23 Thread Ivan Yang
Hello everyone,

We recently upgrade FLINK from 1.9.1 to 1.11.0. Found one strange behavior when 
we stop a job to a save point got following time out error.
I checked Flink web console, the save point is created in s3 in 1 second.The 
job is fairly simple, so 1 second for savepoint generation is expected. We use 
kubernetes deployment. I clocked it, it’s about 60 seconds when it returns this 
error. So afterwards, the job is hanging (it still says running, but actually 
not doing anything). I need run another command to cancel it. Anyone has idea 
what’s going on here? BTW, “flink stop works” in 1.19.1 for us before



flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ flink stop 
88d9b46f59d131428e2a18c9c7b3aa3f
Suspending job "88d9b46f59d131428e2a18c9c7b3aa3f" with a savepoint.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"88d9b46f59d131428e2a18c9c7b3aa3f".
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.TimeoutException
at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
... 9 more
flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ 


Thanks in advance,
Ivan

Re: Flink 1.11 job stop with save point timeout error

2020-07-24 Thread Ivan Yang
Hi Robert,
Below is the job manager log after issuing the “flink stop” command


2020-07-24 19:24:12,388 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 1 (type=CHECKPOINT) @ 1595618652138 for job 
853c59916ac33dfbf46503b33289929e.
2020-07-24 19:24:13,914 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 1 for job 853c59916ac33dfbf46503b33289929e (7146 bytes in 1774 ms).
2020-07-24 19:27:59,299 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Triggering stop-with-savepoint for job 
853c59916ac33dfbf46503b33289929e.
2020-07-24 19:27:59,655 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 2 (type=SYNC_SAVEPOINT) @ 1595618879302 for job 
853c59916ac33dfbf46503b33289929e.
2020-07-24 19:28:00,962 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 2 for job 853c59916ac33dfbf46503b33289929e (7147 bytes in 1240 ms).
==

It looks normal to me. 

In the kubernetes deployment cluster, we set up a metric reporter, it has these 
keys in the flink-config.yaml

# Metrics Reporter Configuration
metrics.reporters: wavefront
metrics.reporter.wavefront.interval: 60 SECONDS
metrics.reporter.wavefront.env: prod
metrics.reporter.wavefront.class: com.x.flink.monitor.WavefrontReporter
metrics.reporter.wavefront.host: xx
metrics.reporter.wavefront.token: xx
metrics.scope.tm: flink.taskmanager

Could this reporter interval interfere the job manager? I test the same job in 
a standalone 
Flink 1.11.0 without the reporter, Flink stop worked, and no hanging nor 
timeout. Also the same reporter is used in 1.9.1 version where we didn’t have 
issue on “flink stop”.

Thanks 
Ivan


> On Jul 24, 2020, at 5:15 AM, Robert Metzger  wrote:
> 
> Hi Ivan,
> thanks a lot for your message. Can you post the JobManager log here as well? 
> It might contain additional information on the reason for the timeout.
> 
> On Fri, Jul 24, 2020 at 4:03 AM Ivan Yang  <mailto:ivanygy...@gmail.com>> wrote:
> Hello everyone,
> 
> We recently upgrade FLINK from 1.9.1 to 1.11.0. Found one strange behavior 
> when we stop a job to a save point got following time out error.
> I checked Flink web console, the save point is created in s3 in 1 second.The 
> job is fairly simple, so 1 second for savepoint generation is expected. We 
> use kubernetes deployment. I clocked it, it’s about 60 seconds when it 
> returns this error. So afterwards, the job is hanging (it still says running, 
> but actually not doing anything). I need run another command to cancel it. 
> Anyone has idea what’s going on here? BTW, “flink stop works” in 1.19.1 for 
> us before
> 
> 
> 
> flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ flink stop 
> 88d9b46f59d131428e2a18c9c7b3aa3f
> Suspending job "88d9b46f59d131428e2a18c9c7b3aa3f" with a savepoint.
> 
> 
>  The program finished with the following exception:
> 
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "88d9b46f59d131428e2a18c9c7b3aa3f".
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>   at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>   ... 9 more
> flink@flink-jobmanager-fcf5d84c5-sz4wk:~$ 
> 
> 
> Thanks in advance,
> Ivan



Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Ivan Yang
Hi Rahul,

Try to increase taskmanager.network.memory.max to 1GB, basically double what 
you have now. However, you only have 4GB RAM for the entire TM, seems out of 
proportion to have 1GB network buffer with 4GB total RAM. Reducing number of 
shuffling will require less network buffer. But if your job need the shuffling, 
then you may consider to add more memory to TM.

Thanks,
Ivan

> On Jul 31, 2020, at 2:02 PM, Rahul Patwari  wrote:
> 
> Hi,
> 
> We are observing "Insufficient number of Network Buffers" issue Sporadically 
> when Flink is upgraded from 1.4.2 to 1.8.2.
> The state of the tasks with this issue translated from DEPLOYING to FAILED. 
> Whenever this issue occurs, the job manager restarts. Sometimes, the issue 
> goes away after the restart.
> As we are not getting the issue consistently, we are in a dilemma of whether 
> to change the memory configurations or not.
> 
> Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
> The exception says that 13112 no. of network buffers are present, which is 6x 
> the recommendation.
> 
> Is reducing the no. of shuffles the only way to reduce the no. of network 
> buffers required?
> 
> Thanks,
> Rahul 
> 
> configs:
> env: Kubernetes 
> Flink: 1.8.2
> using default configs for memory.fraction, memory.min, memory.max.
> using 8 TM, 8 slots/TM
> Each TM is running with 1 core, 4 GB Memory.
> 
> Exception:
> java.io.IOException: Insufficient number of network buffers: required 2, but 
> only 0 available. The total number of network buffers is currently set to 
> 13112 of 32768 bytes each. You can increase this number by setting the 
> configuration keys 'taskmanager.network.memory.fraction', 
> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
> at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
> at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
> at java.lang.Thread.run(Thread.java:748)



Re: sporadic "Insufficient no of network buffers" issue

2020-07-31 Thread Ivan Yang
Yes, increase the taskmanager.network.memory.fraction in your case. Also reduce 
the parallelism will reduce number of network buffer required for your job. I 
never used 1.4.x, so don’t know about it.

Ivan

> On Jul 31, 2020, at 11:37 PM, Rahul Patwari  
> wrote:
> 
> Thanks for your reply, Ivan.
> 
> I think taskmanager.network.memory.max is by default 1GB. 
> In my case, the network buffers memory is 13112 * 32768 = around 400MB which 
> is 10% of the TM memory as by default taskmanager.network.memory.fraction is 
> 0.1.
> Do you mean to increase taskmanager.network.memory.fraction?
> If Flink is upgraded from 1.4.2 to 1.8.2 does the application need more 
> network buffers?
> Can this issue happen sporadically? sometimes this issue is not seen when the 
> job manager is restarted.
> I am thinking whether having fewer network buffers is the root cause (or) if 
> the root cause is something else which triggers this issue.
> 
> On Sat, Aug 1, 2020 at 9:36 AM Ivan Yang  <mailto:ivanygy...@gmail.com>> wrote:
> Hi Rahul,
> 
> Try to increase taskmanager.network.memory.max to 1GB, basically double what 
> you have now. However, you only have 4GB RAM for the entire TM, seems out of 
> proportion to have 1GB network buffer with 4GB total RAM. Reducing number of 
> shuffling will require less network buffer. But if your job need the 
> shuffling, then you may consider to add more memory to TM.
> 
> Thanks,
> Ivan
> 
>> On Jul 31, 2020, at 2:02 PM, Rahul Patwari > <mailto:rahulpatwari8...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> We are observing "Insufficient number of Network Buffers" issue Sporadically 
>> when Flink is upgraded from 1.4.2 to 1.8.2.
>> The state of the tasks with this issue translated from DEPLOYING to FAILED. 
>> Whenever this issue occurs, the job manager restarts. Sometimes, the issue 
>> goes away after the restart.
>> As we are not getting the issue consistently, we are in a dilemma of whether 
>> to change the memory configurations or not.
>> 
>> Min recommended No. of Network Buffers: (8 * 8) * 8 * 4 = 2048
>> The exception says that 13112 no. of network buffers are present, which is 
>> 6x the recommendation.
>> 
>> Is reducing the no. of shuffles the only way to reduce the no. of network 
>> buffers required?
>> 
>> Thanks,
>> Rahul 
>> 
>> configs:
>> env: Kubernetes 
>> Flink: 1.8.2
>> using default configs for memory.fraction, memory.min, memory.max.
>> using 8 TM, 8 slots/TM
>> Each TM is running with 1 core, 4 GB Memory.
>> 
>> Exception:
>> java.io.IOException: Insufficient number of network buffers: required 2, but 
>> only 0 available. The total number of network buffers is currently set to 
>> 13112 of 32768 bytes each. You can increase this number by setting the 
>> configuration keys 'taskmanager.network.memory.fraction', 
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:138)
>> at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:311)
>> at 
>> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:271)
>> at 
>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
>> at java.lang.Thread.run(Thread.java:748)
> 



Flink Kubernetes HA

2021-06-22 Thread Ivan Yang
Hi Dear Flink users,

We recently implemented enabled the zookeeper less HA in our kubernetes Flink 
deployment. The set up has
high-availability.storageDir: s3://some-bucket/recovery 


Since we have a retention policy on the s3 bucket, relatively short 7 days. So 
the HA will fail if the submittedJobGraph 
<https://s3.console.aws.amazon.com/s3/object/flink-checkpointing-prod-eu-central-1?region=eu-central-1&prefix=recovery/default/submittedJobGraph5b30c5214899>xx
 is deleted by s3. If we remove the retention policy, completedCheckpoint 
<https://s3.console.aws.amazon.com/s3/object/flink-checkpointing-prod-eu-central-1?prefix=recovery/default/completedCheckpoint001fd6e39810>
 files will keep growing. The only way I can think of is to use a patterned 
based file retention policy in s3. Before I do that, Is there any config keys 
available in Flink I can tune to not keep the all the completeCheckpoint* in HA?

Thanks,
Ivan


 

Re: Flink Kubernetes HA

2021-06-23 Thread Ivan Yang
Thanks for the reply. Yes, We are seeing all the completedCheckpoint and 
they keep growing. We will revisit our k8s set up, configmap etc

> On Jun 23, 2021, at 2:09 AM, Yang Wang  wrote:
> 
> Hi Ivan,
> 
> For completedCheckpoint files will keep growing, do you mean too many 
> files exist in the S3 bucket?
> 
> AFAIK, if the K8s HA services work normally, only one completedCheckpoint 
> file will be retained. Once a
> new one is generated, the old one will be deleted.
> 
> 
> Best,
> Yang
> 
> Ivan Yang mailto:ivanygy...@gmail.com>> 于2021年6月23日周三 
> 上午12:31写道:
> Hi Dear Flink users,
> 
> We recently implemented enabled the zookeeper less HA in our kubernetes Flink 
> deployment. The set up has
> high-availability.storageDir: s3://some-bucket/recovery <>
> 
> Since we have a retention policy on the s3 bucket, relatively short 7 days. 
> So the HA will fail if the submittedJobGraph 
> <https://s3.console.aws.amazon.com/s3/object/flink-checkpointing-prod-eu-central-1?region=eu-central-1&prefix=recovery/default/submittedJobGraph5b30c5214899>xx
>  is deleted by s3. If we remove the retention policy, completedCheckpoint 
> <https://s3.console.aws.amazon.com/s3/object/flink-checkpointing-prod-eu-central-1?prefix=recovery/default/completedCheckpoint001fd6e39810>
>  files will keep growing. The only way I can think of is to use a patterned 
> based file retention policy in s3. Before I do that, Is there any config keys 
> available in Flink I can tune to not keep the all the completeCheckpoint* in 
> HA?
> 
> Thanks,
> Ivan
> 
> 
>  



TaskManager crash after cancelling a job

2021-07-26 Thread Ivan Yang
Dear Flink experts,

We recently ran into an issue during a job cancellation after upgraded to 1.13. 
After we issue a cancel (from Flink console or flink cancel {jobid}), a few 
subtasks stuck in cancelling state. Once it gets to that situation, the 
behavior is consistent. Those “cancelling tasks will never become canceled. 
After 3 minutes, The job stopped, as a result, number of task manages were 
lost. It will take about another 5 minute for the those lost task manager to 
rejoin the Job manager. Then we can restart the job from the previous 
checkpoint. Found an exception from the hanging (cancelling) Task Manager.
==
sun.misc.Unsafe.park(Native Method) 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
 java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) 
java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637) 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
java.lang.Thread.run(Thread.java:748)
===

Here are some background information about our job and setup.
1) The job is relatively large, we have 500+ parallelism and 2000+ subtasks. 
It’s mainly reading from a Kinesis stream and perform some transformation and 
fanout to multiple output s3 buckets. It’s a stateless ETL job.
2) The same code and setup running on smaller environments don’t seem to have 
this cancel failure problem. 
3) We have been using Flink 1.11.0 for the same job, and never seen this cancel 
failure and killing Task Manager problem.
4) With upgrading to 1.13, we also added Kubernetes HA (zookeeperless). 
Pervious we don’t not use HA. 

The cancel and restart from previous checkpoint is our regular procedure to 
support daily operation. With this 10 minutes TM restart cycle, it basically 
slowed down our throughput. I try to understand what leads into this situation. 
Hoping maybe some configuration change will smooth things out. Also any 
suggestion to shorten the waiting. It appears to be some timeout on the 
TaskManager and JobManager can be  adjusted to speed it up. But really want to 
avoid stuck in cancellation if we can. 

Thanks you, hoping to get some insight knowledge here.

Ivan

Re: TaskManager crash after cancelling a job

2021-07-28 Thread Ivan Yang
Hi Yangze,

I deployed 1.13.1, same problem exists. It seems like that the cancel logic has 
changed since 1.11.0 (which was the one we have been running for almost 1 
year). In 1.11.0, during the cancellation, we saw some subtask stays in the 
cancelling state for sometime, but eventually the job will be cancelled, and no 
task manager were lost. So we can start the job right away. In the new version 
1.13.x, it will kill the task managers where those stuck sub tasks were running 
on, then takes another 4-5 minutes for the task manager to rejoin.  Can you 
point me the code that manages the job cancellation routine? Want to understand 
the logic there.

Thanks,
Ivan

> On Jul 26, 2021, at 7:22 PM, Yangze Guo  wrote:
> 
> Hi, Ivan
> 
> My gut feeling is that it is related to FLINK-22535. Could @Yun Gao
> take another look? If that is the case, you can upgrade to 1.13.1.
> 
> Best,
> Yangze Guo
> 
> On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang  wrote:
>> 
>> Dear Flink experts,
>> 
>> We recently ran into an issue during a job cancellation after upgraded to 
>> 1.13. After we issue a cancel (from Flink console or flink cancel {jobid}), 
>> a few subtasks stuck in cancelling state. Once it gets to that situation, 
>> the behavior is consistent. Those “cancelling tasks will never become 
>> canceled. After 3 minutes, The job stopped, as a result, number of task 
>> manages were lost. It will take about another 5 minute for the those lost 
>> task manager to rejoin the Job manager. Then we can restart the job from the 
>> previous checkpoint. Found an exception from the hanging (cancelling) Task 
>> Manager.
>> ==
>>sun.misc.Unsafe.park(Native Method) 
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>>  java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>>  
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>>  
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
>>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
>> java.lang.Thread.run(Thread.java:748)
>> ===
>> 
>> Here are some background information about our job and setup.
>> 1) The job is relatively large, we have 500+ parallelism and 2000+ subtasks. 
>> It’s mainly reading from a Kinesis stream and perform some transformation 
>> and fanout to multiple output s3 buckets. It’s a stateless ETL job.
>> 2) The same code and setup running on smaller environments don’t seem to 
>> have this cancel failure problem.
>> 3) We have been using Flink 1.11.0 for the same job, and never seen this 
>> cancel failure and killing Task Manager problem.
>> 4) With upgrading to 1.13, we also added Kubernetes HA (zookeeperless). 
>> Pervious we don’t not use HA.
>> 
>> The cancel and restart from previous checkpoint is our regular procedure to 
>> support daily operation. With this 10 minutes TM restart cycle, it basically 
>> slowed down our throughput. I try to understand what leads into this 
>> situation. Hoping maybe some configuration change will smooth things out. 
>> Also any suggestion to shorten the waiting. It appears to be some timeout on 
>> the TaskManager and JobManager can be  adjusted to speed it up. But really 
>> want to avoid stuck in cancellation if we can.
>> 
>> Thanks you, hoping to get some insight knowledge here.
>> 
>> Ivan



Just failed while starting

2021-08-18 Thread Ivan Yang
 have 4 jobs running in the 
cluster, 3 small jobs never had the problem. The issue only happened to the one 
large job, which is 10 time more parallelisms then other smaller jobs. Can 
someone help me on what the root cause of the issue and how to avoid it.

Thanks,
Ivan




Field names must be unique. Found duplicates

2021-11-28 Thread Ivan Budanaev
I am getting the *Field names must be unique. Found duplicates* error when
aggregating over the descriptor columns in HOP windowing function.
A full description of the error can be found here
.
Is there a way to use the descriptor column in aggregation without
duplicating it?
Thank you.


How to get top N elements in a DataSet?

2017-01-24 Thread Ivan Mushketyk
Hi,

I have a dataset of tuples with two fields ids and ratings and I need to
find 10 elements with the highest rating in this dataset. I found a
solution, but I think it's suboptimal and I think there should be a better
way to do it.

The best thing that I came up with is to partition dataset by rating, sort
locally and write the partitioned dataset to disk:

dataset
.partitionCustom(new Partitioner() {
  @Override
  public int partition(Double key, int numPartitions) {
return key.intValue() % numPartitions;
  }
}, 1) . // partition by rating
.setParallelism(5)
.sortPartition(1, Order.DESCENDING) // locally sort by rating
.writeAsText("..."); // write the partitioned dataset to disk

This will store tuples in sorted files with names 5, 4, 3, ... that contain
ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted data
from disk and and N elements with the highest rating.
Is there a way to do the same but without writing a partitioned dataset to
a disk?

I tried to use "first(10)" but it seems to give top 10 items from a random
partition. Is there a way to get top N elements from every partition? Then
I could locally sort top values from every partition and find top 10 global
values.

Best regards,
Ivan.


Re: How to get top N elements in a DataSet?

2017-01-24 Thread Ivan Mushketyk
Hi @Fabian, @Gabor, and @Aljoscha,

Thank you for your help! It works as expected.

Best regards,
Ivan.

On Tue, 24 Jan 2017 at 17:04 Fabian Hueske  wrote:

> Aljoscha, you are right.
> The second mapPartition() needs to have parallelism(1), but the
> sortPartition() as well:
>
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> Anyway, as Gabor pointed out, this solution is very in efficient.
>
> 2017-01-24 17:52 GMT+01:00 Aljoscha Krettek :
>
> @Fabian, I think there's a typo in your code, shouldn't it be
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen()).parallelism(1)
>
> i.e. the second MapPartition has to be parallelism=1
>
>
> On Tue, 24 Jan 2017 at 11:57 Fabian Hueske  wrote:
>
> You are of course right Gabor.
> @Ivan, you can use a heap in the MapPartitionFunction to collect the top
> 10 elements (note that you need to create deep-copies if object reuse is
> enabled [1]).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#operating-on-data-objects-in-functions
>
>
> 2017-01-24 11:49 GMT+01:00 Gábor Gévay :
>
> Hello,
>
> Btw. there is a Jira about this:
> https://issues.apache.org/jira/browse/FLINK-2549
> Note that the discussion there suggests a more efficient approach,
> which doesn't involve sorting the entire partitions.
>
> And if I remember correctly, this question comes up from time to time
> on the mailing list.
>
> Best,
> Gábor
>
>
>
> 2017-01-24 11:35 GMT+01:00 Fabian Hueske :
> > Hi Ivan,
> >
> > I think you can use MapPartition for that.
> > So basically:
> >
> > dataset // assuming some partitioning that can be reused to avoid a
> shuffle
> >   .sortPartition(1, Order.DESCENDING)
> >   .mapPartition(new ReturnFirstTen())
> >   .sortPartition(1, Order.DESCENDING).parallelism(1)
> >   .mapPartition(new ReturnFirstTen())
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk :
> >>
> >> Hi,
> >>
> >> I have a dataset of tuples with two fields ids and ratings and I need to
> >> find 10 elements with the highest rating in this dataset. I found a
> >> solution, but I think it's suboptimal and I think there should be a
> better
> >> way to do it.
> >>
> >> The best thing that I came up with is to partition dataset by rating,
> sort
> >> locally and write the partitioned dataset to disk:
> >>
> >> dataset
> >> .partitionCustom(new Partitioner() {
> >>   @Override
> >>   public int partition(Double key, int numPartitions) {
> >> return key.intValue() % numPartitions;
> >>   }
> >> }, 1) . // partition by rating
> >> .setParallelism(5)
> >> .sortPartition(1, Order.DESCENDING) // locally sort by rating
> >> .writeAsText("..."); // write the partitioned dataset to disk
> >>
> >> This will store tuples in sorted files with names 5, 4, 3, ... that
> >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read
> sorted
> >> data from disk and and N elements with the highest rating.
> >> Is there a way to do the same but without writing a partitioned dataset
> to
> >> a disk?
> >>
> >> I tried to use "first(10)" but it seems to give top 10 items from a
> random
> >> partition. Is there a way to get top N elements from every partition?
> Then I
> >> could locally sort top values from every partition and find top 10
> global
> >> values.
> >>
> >> Best regards,
> >> Ivan.
> >>
> >>
> >
>
>
>
>


AvroParquetWriter may cause task managers to get lost

2017-11-07 Thread Ivan Budincevic
Hi all,

We recently implemented a feature in our streaming flink job in which we have a 
AvroParquetWriter which we build every time the overridden “write” method from 
org.apache.flink.streaming.connectors.fs.Writer gets called. We had to do this 
because the schema of each record is potentially different and we have to get 
the schema for the AvroParquetWriter out of the record itself first. Previously 
this builder was built only one time in the “open” method and from then only 
the write method was called per record.

Since implementing this our job crashes with “Connection unexpectedly closed by 
remote task manager ‘internal company url’. This might indicate that the remote 
task manager was lost.”

We did not run into any issues on our test environments, so we are suspecting 
this problem occurs only on higher loads as we have on our production 
environment. Unfortunately we still don’t have a proper means of reproducing 
this much load on our test environment to debug.

Would having the AvroParquetWriter being built on every write be causing the 
problem and if so why would that be the case?

Any help in getting to the bottom of the issue would be really appreciated. 
Bellow there is a code snippet of the class which uses the AvroParquetWriter.

Best regards,
Ivan Budincevic
Software engineer, bol.com
Netherlands

package com.bol.measure.timeblocks.files;

import com.bol.measure.timeblocks.measurement.SlottedMeasurements;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

public class SlottedMeasurementsWriter implements Writer {
  private transient ParquetWriter parquetWriter;
  private boolean overwrite;
  private Path path;

  public SlottedMeasurementsWriter(boolean overwrite) {
this.overwrite = overwrite;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
this.path = path;
  }

  @Override
  public long flush() throws IOException {
return parquetWriter.getDataSize();
  }

  @Override
  public long getPos() throws IOException {
return parquetWriter.getDataSize();
  }

  @Override
  public void close() throws IOException {
parquetWriter.close();
  }

  @Override
  public void write(SlottedMeasurements slot) throws IOException {

final AvroParquetWriter.Builder writerBuilder =
  AvroParquetWriter
.builder(path)
.withSchema(slot.getMeasurements().get(0).getSchema())
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.withDictionaryEncoding(true)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0);
if (overwrite) {
  writerBuilder.withWriteMode(ParquetFileWriter.Mode.OVERWRITE);
}

parquetWriter = writerBuilder.build();

for (GenericRecord measurement : slot.getMeasurements()) {
  parquetWriter.write(measurement);
}
  }


  @Override
  public Writer duplicate() {
return new SlottedMeasurementsWriter(this.overwrite);
  }
}




assign time attribute after first window group when using Flink SQL

2018-04-08 Thread Ivan Wang
Hi all,



I'd like to use 2 window group in a chain in my program as below.



Table myTable = cTable
.window(Tumble.*over*("15.seconds").on("timeMill").as("w1"))
.groupBy("symbol, w1").select("w1.start as start, w1.end as end,
symbol, price.max as p_max, price.min as p_min")
.window(Slide.*over*("150.rows").every("1.rows").on("start").as("w2"
))
.groupBy("symbol, w2").select("w2.start, w2.end, symbol, p_max.max,
p_min.min")
;





However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows, 1.rows)
is invalid: Sliding window expects a time attribute for grouping in a
stream environment.

 at
org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:149)

 at
org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:658)

 at
org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1159)

 at
org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1179)

 at minno.gundam.ReadPattern.main(ReadPattern.java:156)



Is there any way to assign time attribute after the first groupBy (w1)?



Thanks

Ivan


Is Flink able to do real time stock market analysis?

2018-04-10 Thread Ivan Wang
Hi all,

I've spent nearly 2 weeks trying to figure a solution to my requirement as
below. If anyone can advise, that would be great.

1. There're going to be 2000 transactions per second as StreamRaw, I'm
going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm
going to countWindow StreamA as StreamB, let's say every 20 events.

2. For every event in  StreamRaw as E, I need to find exact one event in
StreamB which is earlier than E and closest to E. Then some comparison will
be proceeded. For example, if timestamp in E is 9:46:38, there should be an
event in StreamB with timestamp 9:46:30 because I use 15 seconds interval.

I tried CEP using StreamRaw, however, I didn't figure out how to involve
StreamB and get the exact one event in condition method.

I tried tableAPI and SQL, it throws time attribute error during the second
window method.

*window(Tumble).group().select().window(Slide).group().select()*

Seems there's no way to tell Flink the time attribute after the first
window.group(). I then tried to convert it into table first then
leftoutJoin them. But Flink tells me it's not supported.

Is Flink able to do this? If not, I'll go for other alternatives. Thanks
again if someone can help.


Re: Is Flink able to do real time stock market analysis?

2018-04-12 Thread Ivan Wang
Thanks Michael very much, it helps a lot!

I tried what you suggest and now I can compare smoothed data with raw date in 
coFlat method.
However, it cannot ensure that the smoothed data is coming in the expected way. 
 Basically for every raw event, I’d like to refer to the early but closest 
event in smoothed data. However, it cannot be guaranteed by default. For 
example, we raw event comes with event time 13:01:39, I’d like to refer to 
smoothed event with event time 13:01:30 due to 15 seconds interval. But the 
latter only arrives after raw event 13:01:58, this happens at least in batch 
processing when I did historical analysis.

I corrected the order by using key state in coFlatMap method. I stored the 
latest smoothed event and queued raw event if they arrive too early.

My question is that is there any better and straightforward way to correct the 
order? Because it makes the code hard to read. I’m thinking about watermark, 
but not sure how to do this.


--
Thanks
Ivan
From: TechnoMage 
Date: Thursday, 12 April 2018 at 3:21 AM
To: Ivan Wang 
Cc: "user@flink.apache.org" 
Subject: Re: Is Flink able to do real time stock market analysis?

I am new to Flink so others may have more complete answer or correct me.

If you are counting the events in a tumbling window you will get output at the 
end of each tumbling window, so a running count of events/window.  It sounds 
like you want to compare the raw data to the smoothed data?  You can use a 
CoFlatMap to receive both streams and output any records you like, say a Tuple 
with the raw and smoothed value.  If you use a RichCoFlatMap you can track 
state, so you could keep a list of the last 20 or so raw and smoothed values so 
you can align them.

Michael


On Apr 10, 2018, at 6:40 PM, Ivan Wang 
mailto:ivan.wang2...@gmail.com>> wrote:

Hi all,

I've spent nearly 2 weeks trying to figure a solution to my requirement as 
below. If anyone can advise, that would be great.

1. There're going to be 2000 transactions per second as StreamRaw, I'm going to 
tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to 
countWindow StreamA as StreamB, let's say every 20 events.

2. For every event in  StreamRaw as E, I need to find exact one event in 
StreamB which is earlier than E and closest to E. Then some comparison will be 
proceeded. For example, if timestamp in E is 9:46:38, there should be an event 
in StreamB with timestamp 9:46:30 because I use 15 seconds interval.


I tried CEP using StreamRaw, however, I didn't figure out how to involve 
StreamB and get the exact one event in condition method.


I tried tableAPI and SQL, it throws time attribute error during the second 
window method.


window(Tumble).group().select().window(Slide).group().select()


Seems there's no way to tell Flink the time attribute after the first 
window.group(). I then tried to convert it into table first then leftoutJoin 
them. But Flink tells me it's not supported.

Is Flink able to do this? If not, I'll go for other alternatives. Thanks again 
if someone can help.













Re: assign time attribute after first window group when using Flink SQL

2018-04-17 Thread Ivan Wang
Thanks Fabian. I tried to use "rowtime" and Flink tells me below exception:

*Exception in thread "main" org.apache.flink.table.api.ValidationException:
SlidingGroupWindow('w2, 'end, 150.rows, 1.rows) is invalid: Event-time
grouping windows on row intervals in a stream environment are currently not
supported.*

Then I tried to OverWindows, luckily it can serve my requirement as well.
Now my table query is like below

.window(Tumble.over("15.seconds").on("timeMill").as("w1"))
.groupBy("symbol, w1").select("(w1.rowtime) as end, symbol, price.max
as p_max, price.min as p_min")
.window(Over.partitionBy("symbol").orderBy("end").preceding("149.rows").as("w2"))
.select("symbol as symbol_, end, p_max.max over w2 as max, p_min.min
over w2 as min");


It works and I can get what I want. However, the result is not ordered by
the rowtime (here I use "end" as alias). Is this by default and any thing
to get it ordered?

Below is the entire requirement,

Basically there's one raw stream (r1), and I group it first by time as w1
then by window count as w2. I'd like to compare the "price" field in every
raw event with the same field in the most close preceding event in w2.
If condition meets, I'd like to use the price value and timestamp in that
event to get one matching event from another raw stream (r2).

CEP sounds to be a good idea. But I need to refer to event in other stream
(r2) in current pattern condition (r1). Is it possible to do this using CEP?

Thanks
Ivan



On Mon, Apr 16, 2018 at 4:01 PM, Fabian Hueske  wrote:

> Sorry, I forgot to CC the user mailing list in my reply.
>
> 2018-04-12 17:27 GMT+02:00 Fabian Hueske :
>
>> Hi,
>>
>> Assuming you are using event time, the right function to generate a row
>> time attribute from a window would be "w1.rowtime" instead of "w1.start".
>>
>> The reason why Flink is picky about this is that we must ensure that the
>> result rows of the windows are aligned with the watermarks of the stream.
>>
>> Best, Fabian
>>
>>
>> Ivan Wang  schrieb am So., 8. Apr. 2018, 22:26:
>>
>>> Hi all,
>>>
>>>
>>>
>>> I'd like to use 2 window group in a chain in my program as below.
>>>
>>>
>>>
>>> Table myTable = cTable
>>> .window(Tumble.*over*("15.seconds").on("timeMill").as("w1"))
>>> .groupBy("symbol, w1").select("w1.start as start, w1.end as
>>> end, symbol, price.max as p_max, price.min as p_min")
>>> .window(Slide.*over*("150.rows").every("1.rows").on("start").as(
>>> "w2"))
>>> .groupBy("symbol, w2").select("w2.start, w2.end, symbol,
>>> p_max.max, p_min.min")
>>> ;
>>>
>>>
>>>
>>>
>>>
>>> However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows,
>>> 1.rows) is invalid: Sliding window expects a time attribute for grouping in
>>> a stream environment.
>>>
>>>  at org.apache.flink.table.plan.logical.LogicalNode.failValidati
>>> on(LogicalNode.scala:149)
>>>
>>>  at org.apache.flink.table.plan.logical.WindowAggregate.validate
>>> (operators.scala:658)
>>>
>>>  at org.apache.flink.table.api.WindowGroupedTable.select(table.
>>> scala:1159)
>>>
>>>  at org.apache.flink.table.api.WindowGroupedTable.select(table.
>>> scala:1179)
>>>
>>>  at minno.gundam.ReadPattern.main(ReadPattern.java:156)
>>>
>>>
>>>
>>> Is there any way to assign time attribute after the first groupBy (w1)?
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ivan
>>>
>>>
>>>
>>>
>


TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
work. I have reproduced the exact same code in Java and it works!

Is this a pyflink bug? If so - how can I report it? If not - what can I try to 
do?

Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11

Code to reproduce. I expect this code to print:  all 
the time. But it prints  and state value

```python
import time

from datetime import datetime

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .cleanup_incrementally(cleanup_size=10, 
run_cleanup_for_every_record=True)
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        current_state = self.state.value()

        print(datetime.now(), current_state)

        if current_state is None:
            self.state.update(str(datetime.now()))

        time.sleep(1.5)


if __name__ == "__main__":
    # - Init environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Setup pipeline

    (
        environment.set_parallelism(1)
        .from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())



    )

    # - Execute pipeline

    environment.execute("ttl_test")



```

```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class GameHistoryProcessor extends KeyedProcessFunction {


    private transient ValueState state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
//                .cleanupFullSnapshot()
                .cleanupIncrementally(10, true)
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);

        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector 
collector) throws IOException, InterruptedException {
        var state = state.value();
        System.out.println("State: " + state);

        if (state == null) {
            state = LocalDateTime.now().toString();
            state.update(state);
        }

        Thread.sleep(1500);
    }
}```


Re: TTL in pyflink does not seem to work

2024-03-07 Thread Ivan Petrarka
Note, that in Java code, it prints `State: Null`, `State: Null`, as I was 
expecting in, unlike pyflink code
On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , wrote:
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I try 
> to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print:  all 
> the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         state_descriptor = ValueStateDescriptor(
>             name="my_state",
>             value_type_info=Types.STRING(),
>         )
>
>         state_descriptor.enable_time_to_live(
>             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
>             .cleanup_incrementally(cleanup_size=10, 
> run_cleanup_for_every_record=True)
>             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build()
>         )
>
>         self.state = runtime_context.get_state(state_descriptor)
>
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         current_state = self.state.value()
>
>         print(datetime.now(), current_state)
>
>         if current_state is None:
>             self.state.update(str(datetime.now()))
>
>         time.sleep(1.5)
>
>
> if __name__ == "__main__":
>     # - Init environment
>
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
>     # - Setup pipeline
>
>     (
>         environment.set_parallelism(1)
>         .from_collection(
>             collection=list(range(10)),
>         )
>         .key_by(lambda value: 0)
>         .process(Processor())
>
>
>
>     )
>
>     # - Execute pipeline
>
>     environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
>     private transient ValueState state;
>
>
>     @Override
>     public void open(Configuration parameters) {
>         var stateTtlConfig = StateTtlConfig
>                 .newBuilder(Time.seconds(1))
> //                .cleanupFullSnapshot()
>                 .cleanupIncrementally(10, true)
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         var stateDescriptor = new ValueStateDescriptor<>("state", 
> String.class);
>         stateDescriptor.enableTimeToLive(stateTtlConfig);
>
>         state = getRuntimeContext().getState(stateDescriptor);
>
>     }
>
>     @Override
>     public void processElement(String event, Context context, 
> Collector collector) throws IOException, InterruptedException {
>         var state = state.value();
>         System.out.println("State: " + state);
>
>         if (state == null) {
>             state = LocalDateTime.now().toString();
>             state.update(state);
>         }
>
>         Thread.sleep(1500);
>     }
> }```


Re: TTL in pyflink does not seem to work

2024-03-11 Thread Ivan Petrarka
Thanks! We’ve created and issue for that: 
https://issues.apache.org/jira/browse/FLINK-34625

Yeap, planning to use timers as workaround for now
On Mar 10, 2024 at 02:59 +0400, David Anderson , wrote:
> My guess is that this only fails when pyflink is used with the heap state 
> backend, in which case one possible workaround is to use the RocksDB state 
> backend instead. Another workaround would be to rely on timers in the process 
> function, and clear the state yourself.
>
> David
>
> > On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user 
> >  wrote:
> > > Hello Ivan!
> > >
> > > Could you please create a JIRA issue out of this?
> > > That seem the proper place where to discuss this.
> > >
> > > It seems a bug as the two versions of the code you posted look identical, 
> > > and the behavior should be consistent.
> > > On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka , 
> > > wrote:
> > > > Note, that in Java code, it prints `State: Null`, `State: Null`, as I 
> > > > was expecting in, unlike pyflink code
> > > > On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka , 
> > > > wrote:
> > > > > Hi! I’ve created a basic pyflink pipeline with ttl and it does not 
> > > > > seem to work. I have reproduced the exact same code in Java and it 
> > > > > works!
> > > > >
> > > > > Is this a pyflink bug? If so - how can I report it? If not - what can 
> > > > > I try to do?
> > > > >
> > > > > Flink: 1.18.0
> > > > > image: flink:1.18.0-scala_2.12-java11
> > > > >
> > > > > Code to reproduce. I expect this code to print:  > > > > None> all the time. But it prints  and state value
> > > > >
> > > > > ```python
> > > > > import time
> > > > >
> > > > > from datetime import datetime
> > > > >
> > > > > from pyflink.common import Time, Types
> > > > > from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> > > > > StreamExecutionEnvironment, TimeCharacteristic
> > > > > from pyflink.datastream.state import StateTtlConfig, 
> > > > > ValueStateDescriptor
> > > > >
> > > > >
> > > > > class Processor(KeyedProcessFunction):
> > > > >     def open(self, runtime_context: RuntimeContext):
> > > > >         state_descriptor = ValueStateDescriptor(
> > > > >             name="my_state",
> > > > >             value_type_info=Types.STRING(),
> > > > >         )
> > > > >
> > > > >         state_descriptor.enable_time_to_live(
> > > > >             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> > > > >             .cleanup_incrementally(cleanup_size=10, 
> > > > > run_cleanup_for_every_record=True)
> > > > >             
> > > > > .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > > > >             
> > > > > .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > > > >             .build()
> > > > >         )
> > > > >
> > > > >         self.state = runtime_context.get_state(state_descriptor)
> > > > >
> > > > >     def process_element(self, value: int, ctx: 
> > > > > KeyedProcessFunction.Context):
> > > > >         current_state = self.state.value()
> > > > >
> > > > >         print(datetime.now(), current_state)
> > > > >
> > > > >         if current_state is None:
> > > > >             self.state.update(str(datetime.now()))
> > > > >
> > > > >         time.sleep(1.5)
> > > > >
> > > > >
> > > > > if __name__ == "__main__":
> > > > >     # - Init environment
> > > > >
> > > > >     environment = 
> > > > > StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> > > > >
> > > > >     # - Setup pipeline
> > > > >
> > > > >     (
> > > > >         environment.set_parallelism(1)
> > > > >         .from_collection(
> > > > >             collection=list(range(10)),
> > > > >         )
> > > > >         .key_by

Developing Beam applications using Flink checkpoints

2020-05-15 Thread Ivan San Jose
Hi, we are starting to use Beam with Flink as runner on our
applications, and recently we would like to get advantages that Flink
checkpoiting provides, but it seems we are not understanding it
clearly.

Simplifying, our application does the following:
  - Read meesages from a couple of Kafka topics
  - Combine them
  - Write combination result to a sink (Exasol DB)

As application is processing messages using event time, and one of the
topics is almost idle, the first time application is started messages
are stuck in the combiner because watermark don't advance until we have
messages arriving onto idled topic (we know this and is not a problem
for us though).

The problem is that we've observed, if a checkpoint is triggered when
messages are still stuck in the combiner, surprisingly for us, the
checkpoint finishes successfully (and offsets committed to Kafka) even
messages haven't progressed to the sink yet. Is this expected?

The thing is that, if in the future, we make not state compatible
changes in application source code, checkpoint taken couldn't be
restored. So we would like to start the application without using any
checkpoint but without losing data.
Problem here would be that data loss would happen because messages
stuck in combiner are already committed to Kafka and application would
start to read from latest commited offset in Kafka if we don't use any
checkpoint, thus those messages are not going to be read from the
source again.

So, I guess our question is how are you doing in order to not lose data
when developing applications, because sooner or later you are going to
add breaking changes...

For example, we've seen those two errors so far:
  - After changing an operator name:

2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
leadership with session id ----.
...
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
not set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRu
nner.java:152)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.crea
teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerR
unner$5(Dispatcher.java:375)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(Check
edSupplier.java:34)
... 7 more
Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
Cannot map checkpoint/savepoint state for operator
f476451c6210bd2783f36fa331b9da5e to the new program, because the
operator is not available in the new program. If you want to allow to
skip this, you can set the --allowNonRestoredState option on the CLI.
at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoi
nt(Checkpoints.java:205)
...

  - After modifying a Java model class involved in a combine:
org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(
HeapKeyedStateBackendBuilder.java:116)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedSta
teBackend(FsStateBackend.java:529)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem
ptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat
eAndRestore(BackendRestorerProcedure.java:121)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initial
izeState(AbstractStreamOperator.java:253)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(Str
eamTask.java:881)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j
ava:395)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException:
internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; local
class incompatible: stream classdesc serialVersionUID =
8366890161513008789, local class serialVersionUID = 174312384610985998
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)


Apologies in advance as we are new to Flink, so may be w

Re: Developing Beam applications using Flink checkpoints

2020-05-19 Thread Ivan San Jose
Thanks for your complete answer Arvid, we will try to approach all
things you mentioned, but take into account we are using Beam on top of
Flink, so, to be honest, I don't know how could we implement the custom
serialization thing (
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
) there. Could you please give us some hints? Thanks

On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> First let's address the issue with idle partitions. The solution is
> to use a watermark assigner that also emits a watermark with some
> idle timeout [1].
>
> Now the second question, on why Kafka commits are committed for in-
> flight, checkpointed data. The basic idea is that you are not losing
> data while avoiding replicated output.
> So if you commit offsets only after data has been fully processed,
> upon crash the same data point would be reprocessed jointly with the
> restored in-flight data, so you get duplicate messages in your
> system.
> To avoid duplicates data needs to be more or less completely flushed
> out the system before a checkpoint is performed. That would produce a
> huge downtime.
> Instead, we assume that we can always resume from the checkpoints.
>
> Which leads to the last question on what to do when your pipeline has
> breaking changes.
> First strategy is to avoid breaking changes as much as possible.
> State could for example also be stored as Avro to allow schema
> evolution. Minor things like renamed operators will not happen with a
> bit more expertise.
> Second strategy is to use state migration [2]. Alternatively, you can
> manually convert state with state processor API [3].
> Last option is to do a full reprocessing of data. This can be done on
> a non-production cluster and then a savepoint can be used to
> bootstrap the production cluster quickly. This option needs to be
> available anyways for the case that you find any logic error. But of
> course, this option has the highest implications (may need to purge
> sink beforehand).
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> isanj...@theworkshop.com> wrote:
> > Hi, we are starting to use Beam with Flink as runner on our
> > applications, and recently we would like to get advantages that
> > Flink
> > checkpoiting provides, but it seems we are not understanding it
> > clearly.
> >
> > Simplifying, our application does the following:
> >   - Read meesages from a couple of Kafka topics
> >   - Combine them
> >   - Write combination result to a sink (Exasol DB)
> >
> > As application is processing messages using event time, and one of
> > the
> > topics is almost idle, the first time application is started
> > messages
> > are stuck in the combiner because watermark don't advance until we
> > have
> > messages arriving onto idled topic (we know this and is not a
> > problem
> > for us though).
> >
> > The problem is that we've observed, if a checkpoint is triggered
> > when
> > messages are still stuck in the combiner, surprisingly for us, the
> > checkpoint finishes successfully (and offsets committed to Kafka)
> > even
> > messages haven't progressed to the sink yet. Is this expected?
> >
> > The thing is that, if in the future, we make not state compatible
> > changes in application source code, checkpoint taken couldn't be
> > restored. So we would like to start the application without using
> > any
> > checkpoint but without losing data.
> > Problem here would be that data loss would happen because messages
> > stuck in combiner are already committed to Kafka and application
> > would
> > start to read from latest commited offset in Kafka if we don't use
> > any
> > checkpoint, thus those messages are not going to be read from the
> > source again.
> >
> > So, I guess our question is how are you doing in order to not lose
> > data
> > when developing applications, because sooner or later you are going
> > to
> > add breaking changes...
> >
> > For example, we've seen those two errors so far:
> >   - After changing an opera

Re: Developing Beam applications using Flink checkpoints

2020-05-19 Thread Ivan San Jose
Yep, sorry if I'm bothering you but I think I'm still not getting this,
how could I tell Beam to tell Flink to use that serializer instead of
Java standard one, because I think Beam is abstracting us from Flink
checkpointing mechanism, so I'm afraid that if we use Flink API
directly we might break other things that Beam is hidding for us...

On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> The easiest way is to use some implementation that's already there
> [1]. I already mentioned Avro and would strongly recommend giving it
> a go. If you make sure to provide a default value for as many fields
> as possible, you can always remove them later giving you great
> flexibility. I can give you more hints if you decide to go this
> route.
>
> If you want to have a custom implementation, I'd start at looking of
> one of the simpler implementations like MapSerializerSnapshot [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> (see known implementing classes).
> [2]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
>
> On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> isanj...@theworkshop.com> wrote:
> > Thanks for your complete answer Arvid, we will try to approach all
> > things you mentioned, but take into account we are using Beam on
> > top of
> > Flink, so, to be honest, I don't know how could we implement the
> > custom
> > serialization thing (
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > ) there. Could you please give us some hints? Thanks
> >
> > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > Hi Ivan,
> > >
> > > First let's address the issue with idle partitions. The solution
> > is
> > > to use a watermark assigner that also emits a watermark with some
> > > idle timeout [1].
> > >
> > > Now the second question, on why Kafka commits are committed for
> > in-
> > > flight, checkpointed data. The basic idea is that you are not
> > losing
> > > data while avoiding replicated output.
> > > So if you commit offsets only after data has been fully
> > processed,
> > > upon crash the same data point would be reprocessed jointly with
> > the
> > > restored in-flight data, so you get duplicate messages in your
> > > system.
> > > To avoid duplicates data needs to be more or less completely
> > flushed
> > > out the system before a checkpoint is performed. That would
> > produce a
> > > huge downtime.
> > > Instead, we assume that we can always resume from the
> > checkpoints.
> > >
> > > Which leads to the last question on what to do when your pipeline
> > has
> > > breaking changes.
> > > First strategy is to avoid breaking changes as much as possible.
> > > State could for example also be stored as Avro to allow schema
> > > evolution. Minor things like renamed operators will not happen
> > with a
> > > bit more expertise.
> > > Second strategy is to use state migration [2]. Alternatively, you
> > can
> > > manually convert state with state processor API [3].
> > > Last option is to do a full reprocessing of data. This can be
> > done on
> > > a non-production cluster and then a savepoint can be used to
> > > bootstrap the production cluster quickly. This option needs to be
> > > available anyways for the case that you find any logic error. But
> > of
> > > course, this option has the highest implications (may need to
> > purge
> > > sink beforehand).
> > >
> > > [1]
> > >
> > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > [2]
> > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > [3]
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > >
> > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > isanj...@theworkshop.com> wrote:
> > > > Hi, we are starting to use Beam with Flink as runner on our
> > > > applications, and recently we would li

Re: Developing Beam applications using Flink checkpoints

2020-05-19 Thread Ivan San Jose
Actually I'm also thinking about how Beam coders are related with
runner's serialization... I mean, on Beam you specify a coder per each
Java type in order to Beam be able to serialize/deserialize that type,
but then, is the runner used under the hood serializing/deserializing
again the result, so that is doing a double serialization, does it make
sense? Or how does it work?

On Tue, 2020-05-19 at 08:54 +, Ivan San Jose wrote:
> Yep, sorry if I'm bothering you but I think I'm still not getting
> this,
> how could I tell Beam to tell Flink to use that serializer instead of
> Java standard one, because I think Beam is abstracting us from Flink
> checkpointing mechanism, so I'm afraid that if we use Flink API
> directly we might break other things that Beam is hidding for us...
>
> On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > Hi Ivan,
> >
> > The easiest way is to use some implementation that's already there
> > [1]. I already mentioned Avro and would strongly recommend giving
> > it
> > a go. If you make sure to provide a default value for as many
> > fields
> > as possible, you can always remove them later giving you great
> > flexibility. I can give you more hints if you decide to go this
> > route.
> >
> > If you want to have a custom implementation, I'd start at looking
> > of
> > one of the simpler implementations like MapSerializerSnapshot [2].
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > (see known implementing classes).
> > [2]
> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> >
> > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > isanj...@theworkshop.com> wrote:
> > > Thanks for your complete answer Arvid, we will try to approach
> > > all
> > > things you mentioned, but take into account we are using Beam on
> > > top of
> > > Flink, so, to be honest, I don't know how could we implement the
> > > custom
> > > serialization thing (
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > ) there. Could you please give us some hints? Thanks
> > >
> > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > Hi Ivan,
> > > >
> > > > First let's address the issue with idle partitions. The
> > > > solution
> > > is
> > > > to use a watermark assigner that also emits a watermark with
> > > > some
> > > > idle timeout [1].
> > > >
> > > > Now the second question, on why Kafka commits are committed for
> > > in-
> > > > flight, checkpointed data. The basic idea is that you are not
> > > losing
> > > > data while avoiding replicated output.
> > > > So if you commit offsets only after data has been fully
> > > processed,
> > > > upon crash the same data point would be reprocessed jointly
> > > > with
> > > the
> > > > restored in-flight data, so you get duplicate messages in your
> > > > system.
> > > > To avoid duplicates data needs to be more or less completely
> > > flushed
> > > > out the system before a checkpoint is performed. That would
> > > produce a
> > > > huge downtime.
> > > > Instead, we assume that we can always resume from the
> > > checkpoints.
> > > > Which leads to the last question on what to do when your
> > > > pipeline
> > > has
> > > > breaking changes.
> > > > First strategy is to avoid breaking changes as much as
> > > > possible.
> > > > State could for example also be stored as Avro to allow schema
> > > > evolution. Minor things like renamed operators will not happen
> > > with a
> > > > bit more expertise.
> > > > Second strategy is to use state migration [2]. Alternatively,
> > > > you
> > > can
> > > > manually convert state with state processor API [3].
> > > > Last option is to do a full reprocessing of data. This can be
> > > done on
> > > > a non-production cluster and then a savepoint can be used to
> > > > bootstrap the production cluster quickly. This option needs to
> > > > be
> > > > available anyways for the ca

Re: Developing Beam applications using Flink checkpoints

2020-05-19 Thread Ivan San Jose
Perfect, thank you so much Arvid, I'd expect more people using Beam on
top of Flink, but it seems is not so popular.

On Tue, 2020-05-19 at 12:46 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> I'm fearing that only a few mailing list users have actually deeper
> Beam experience. I only used it briefly 3 years ago. Most users here
> are using Flink directly to avoid these kinds of double-abstraction
> issues.
>
> It might be better to switch to the Beam mailing list if you have
> Beam-specific questions including how the Flink runner actually
> translates the Beam program to Flink.
>
> On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <
> isanj...@theworkshop.com> wrote:
> > Actually I'm also thinking about how Beam coders are related with
> > runner's serialization... I mean, on Beam you specify a coder per
> > each
> > Java type in order to Beam be able to serialize/deserialize that
> > type,
> > but then, is the runner used under the hood
> > serializing/deserializing
> > again the result, so that is doing a double serialization, does it
> > make
> > sense? Or how does it work?
> >
> > On Tue, 2020-05-19 at 08:54 +, Ivan San Jose wrote:
> > > Yep, sorry if I'm bothering you but I think I'm still not getting
> > > this,
> > > how could I tell Beam to tell Flink to use that serializer
> > instead of
> > > Java standard one, because I think Beam is abstracting us from
> > Flink
> > > checkpointing mechanism, so I'm afraid that if we use Flink API
> > > directly we might break other things that Beam is hidding for
> > us...
> > >
> > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > > > Hi Ivan,
> > > >
> > > > The easiest way is to use some implementation that's already
> > there
> > > > [1]. I already mentioned Avro and would strongly recommend
> > giving
> > > > it
> > > > a go. If you make sure to provide a default value for as many
> > > > fields
> > > > as possible, you can always remove them later giving you great
> > > > flexibility. I can give you more hints if you decide to go this
> > > > route.
> > > >
> > > > If you want to have a custom implementation, I'd start at
> > looking
> > > > of
> > > > one of the simpler implementations like MapSerializerSnapshot
> > [2].
> > > >
> > > > [1]
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > > > (see known implementing classes).
> > > > [2]
> > > >
> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> > > >
> > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > > > isanj...@theworkshop.com> wrote:
> > > > > Thanks for your complete answer Arvid, we will try to
> > approach
> > > > > all
> > > > > things you mentioned, but take into account we are using Beam
> > on
> > > > > top of
> > > > > Flink, so, to be honest, I don't know how could we implement
> > the
> > > > > custom
> > > > > serialization thing (
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > ) there. Could you please give us some hints? Thanks
> > > > >
> > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > > > Hi Ivan,
> > > > > >
> > > > > > First let's address the issue with idle partitions. The
> > > > > > solution
> > > > > is
> > > > > > to use a watermark assigner that also emits a watermark
> > with
> > > > > > some
> > > > > > idle timeout [1].
> > > > > >
> > > > > > Now the second question, on why Kafka commits are committed
> > for
> > > > > in-
> > > > > > flight, checkpointed data. The basic idea is that you are
> > not
> > > > > losing
> > > > > > data while avoiding replicated output.
> > > > > > So if you commit offsets only after data has been fully
> > > > > processed,
> > > > > 

Secure Azure Credential Configuration

2023-03-02 Thread Ivan Webber via user
TLDR: I will buy your coffee if you can help me understand to securely 
configure Azure credentials (doc 
page<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/>
 for reference).

I am a junior developer tasked with being the first person to learn the Apache 
Flink framework. I know that storing secrets in flink-conf.yaml in a container 
is a bad idea. I’ve tried exposing Azure storage keys as env vars and using 
`config.setString`, but those properties seem to get overridden. I plan on 
using Flink operator, so if you can show me in that context that’d be ideal.

Thanks, and sorry for bothering everyone. I’ve just exhausted myself and am 
hopeful someone will have mercy for me. I really will Venmo you $5 for coffee 
if you want.

Thanks,

Ivan


Larger code examples:

Setting dynamic properties before executing the job doesn’t work because the 
values seem to get overridden or never forwarded.
```
val config = new Configuration()
config.setString("fs.azure.account.key.mystore1.blob.core.windows.net", 
System.getenv("KEY_1"))
config.setString("fs.azure.account.key.mystore2.blob.core.windows.net", 
System.getenv("KEY_2"))
config.setString("fs.azure.account.key.mystore3.blob.core.windows.net", 
System.getenv("KEY_3"))
val env = 
environment.StreamExecutionEnvironment.getExecutionEnvironment(config)
```

In Flink operator configuration fields can be provided as follows, but then I 
can’t commit the file with a secret inside. Ideally there would be a way to 
reference a secret but the values must be literal strings.
```
spec:
  flinkConfiguration:
fs.azure.account.key.mystore1.blob.core.windows.net: SECRET_STRING
fs.azure.account.key.mystore2.blob.core.windows.net: SECRET_STRING
fs.azure.account.key.mystore3.blob.core.windows.net: SECRET_STRING
```

The last possible solution I can think that I’ll be trying is putting the 
entire flink-conf.yaml into a secret, or having a different container that adds 
secrets to the flink-operator-job.yaml and then does the `kubectl create -f 
flink-operator-job.yaml` (if that’s even possible).


Re: [EXTERNAL] Re: Secure Azure Credential Configuration

2023-03-03 Thread Ivan Webber via user
Thanks Alexis,

I will be trying that out today. If it works I will share back and try adding 
it to the docs.



From: Alexis Sarda-Espinosa 
Sent: Thursday, March 2, 2023 3:33:03 PM
To: Ivan Webber 
Cc: user 
Subject: Re: [EXTERNAL] Re: Secure Azure Credential Configuration

You don't often get email from sarda.espin...@gmail.com. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>
Hi Ivan,

please always include the whole distribution list since answers may help others 
as well.

I would also think about implementing your own provider(s), but some things I 
know:

- There are 2 different KeyProvider interfaces (which isn't explicitly 
documented from what I can tell):
  * org.apache.hadoop.fs.azure.KeyProvider - WASB
  * org.apache.hadoop.fs.azurebfs.services.KeyProvider - ABFS (I think)
- Flink shades the hadoop classes under org.apache.flink.fs.shaded.hadoop3... 
so you would need to implement your providers against the shaded interfaces.
- The documentation for Flink plugins [1] shows an s3 folder with multiple 
jars, so I imagine you could add a jar with your key providers to a folder with 
the azure-fs jar, but I've never tested this.

However, I believe this whole shading and plugin details are only relevant if 
you want Flink to access the azure FS for its checkpoints and/or savepoints, if 
you need to access the FS directly in your code, I imagine you're better off 
including the relevant hadoop jars in your fat jar without going through 
Flink's plugin system.

This is my impression, but maybe someone else can correct me if I'm wrong.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Ffilesystems%2Fplugins%2F&data=05%7C01%7CIvan.Webber%40microsoft.com%7C302a8781f35f4b64cfaf08db1b767f30%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638133968331853429%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000%7C%7C%7C&sdata=8W361g8DGOeDIipbVIHZC4TyzxFs8XiQIMFoW7zLUbQ%3D&reserved=0>

Regards,
Alexis.

Am Do., 2. März 2023 um 23:46 Uhr schrieb Ivan Webber 
mailto:ivan.web...@microsoft.com>>:

Hello Alexis,



I was actually thinking I’d use both WASB and ABFS, but I looked at the source 
for EnvironmentVariableKeyProvider and it only reads a single specific 
environment variable where my pipeline actually needs to bring together data 
stored in different blob and ADLS accounts. I couldn’t find anything about 
providing my own KeyProvider but I considered trying it as an experiment at one 
point.



From: Alexis Sarda-Espinosa<mailto:sarda.espin...@gmail.com>
Sent: Thursday, March 2, 2023 2:38 PM
To: Ivan Webber<mailto:ivan.web...@microsoft.com>
Cc: user<mailto:user@flink.apache.org>
Subject: [EXTERNAL] Re: Secure Azure Credential Configuration



You don't often get email from 
sarda.espin...@gmail.com<mailto:sarda.espin...@gmail.com>. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>

Hi Ivan,



Mercy is always free. Are you using WASB or ABFS? I presume it's the latter, 
since that's the one that can't use EnvironmentVariableKeyProvider, but just to 
be sure.



Regards,

Alexis.



On Thu, 2 Mar 2023, 23:07 Ivan Webber via user, 
mailto:user@flink.apache.org>> wrote:

TLDR: I will buy your coffee if you can help me understand to securely 
configure Azure credentials (doc 
page<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Ffilesystems%2Fazure%2F&data=05%7C01%7CIvan.Webber%40microsoft.com%7C302a8781f35f4b64cfaf08db1b767f30%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638133968331853429%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000%7C%7C%7C&sdata=C3898nZaxzivVsSaN2jMqME93PR%2BicS%2By9MFJKV%2B%2Fyk%3D&reserved=0>
 for reference).



I am a junior developer tasked with being the first person to learn the Apache 
Flink framework. I know that storing secrets in flink-conf.yaml in a container 
is a bad idea. I’ve tried exposing Azure storage keys as env vars and using 
`config.setString`, but those properties seem to get overridden. I plan on 
using Flink operator, so if you can show me in that context that’d be ideal.



Thanks, and sorry for bothering everyone. I’ve just exhausted myself and am 
hopeful someone will have mercy for me. I really will Venmo you $5 for coffee 
if you want.



Thanks,



Ivan





Larger code examples:



Setting dynamic properties before executing the job doesn’t work because the 
values seem to get overridden or never forwarded.

```

val config = new Configuration()

   

RE: [EXTERNAL] Re: Secure Azure Credential Configuration

2023-03-06 Thread Ivan Webber via user
Thanks for the pointers Alexis!

Implementing `org.apache.hadoop.fs.azure.KeyProvider` has helped me make 
progress, but I’m running into a new error:
```
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException:
 org.tryflink.wrappers.TrafficForecastEnvKeyProviderWrapper specified in config 
is not a valid KeyProvider class.
```

I get this error whether I implement the class in Scala or Java, or use ` 
org.apache.hadoop.fs.azure.KeyProvider` or ` 
org.apache.hadoop.fs.azurebfs.services.KeyProvider `. My best guess is that 
it’s something to do with not building against the shaded interface which you 
indicated I should do or possibly different class loaders. To build against the 
shaded interfaces would I import a package that has them?

This is the dependency I added with `org.apache.hadoop.fs.azure.KeyProvider`.
```

org.apache.hadoop
hadoop-azure
3.3.2

```

What I’ve learned so far is that this configuration has more to do with 
configuring Hadoop than Flink as the configuration is 
forwarded<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/#:~:text=Flink%20forwards%20all%20Flink%20configurations%20with%20a%20key%20prefix%20of%20fs.azure%20to%20the%20Hadoop%20configuration%20of%20the%20filesystem>.
 Thus, I tried setting the properties to use Azure Managed 
Identity<https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity>,
 but got an error [1]. If anyone has gotten that to work I’d be interested in 
hearing about it.

Thanks for the help so far; please, anyone who can give pointers send them.

Thanks,

Ivan


[1] - 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException: 
No credentials found for account myblob.blob.core.windows.net in the 
configuration, and its container flink-forecast is not accessible using 
anonymous credentials. Please check if the container exists first. If it is not 
publicly available, you have to provide account credentials.

From: Ivan Webber<mailto:ivan.web...@microsoft.com>
Sent: Friday, March 3, 2023 10:38 AM
To: Alexis Sarda-Espinosa<mailto:sarda.espin...@gmail.com>
Cc: user<mailto:user@flink.apache.org>
Subject: Re: [EXTERNAL] Re: Secure Azure Credential Configuration

Thanks Alexis,

I will be trying that out today. If it works I will share back and try adding 
it to the docs.



From: Alexis Sarda-Espinosa 
Sent: Thursday, March 2, 2023 3:33:03 PM
To: Ivan Webber 
Cc: user 
Subject: Re: [EXTERNAL] Re: Secure Azure Credential Configuration

You don't often get email from sarda.espin...@gmail.com. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>
Hi Ivan,

please always include the whole distribution list since answers may help others 
as well.

I would also think about implementing your own provider(s), but some things I 
know:

- There are 2 different KeyProvider interfaces (which isn't explicitly 
documented from what I can tell):
  * org.apache.hadoop.fs.azure.KeyProvider - WASB
  * org.apache.hadoop.fs.azurebfs.services.KeyProvider - ABFS (I think)
- Flink shades the hadoop classes under org.apache.flink.fs.shaded.hadoop3... 
so you would need to implement your providers against the shaded interfaces.
- The documentation for Flink plugins [1] shows an s3 folder with multiple 
jars, so I imagine you could add a jar with your key providers to a folder with 
the azure-fs jar, but I've never tested this.

However, I believe this whole shading and plugin details are only relevant if 
you want Flink to access the azure FS for its checkpoints and/or savepoints, if 
you need to access the FS directly in your code, I imagine you're better off 
including the relevant hadoop jars in your fat jar without going through 
Flink's plugin system.

This is my impression, but maybe someone else can correct me if I'm wrong.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Ffilesystems%2Fplugins%2F&data=05%7C01%7CIvan.Webber%40microsoft.com%7C302a8781f35f4b64cfaf08db1b767f30%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638133968331853429%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000%7C%7C%7C&sdata=8W361g8DGOeDIipbVIHZC4TyzxFs8XiQIMFoW7zLUbQ%3D&reserved=0>

Regards,
Alexis.

Am Do., 2. März 2023 um 23:46 Uhr schrieb Ivan Webber 
mailto:ivan.web...@microsoft.com>>:

Hello Alexis,



I was actually thinking I’d use both WASB and ABFS, but I looked at the source 
for EnvironmentVariableKeyProvider and it only reads a single specific 
environment variable where my pipeline actually needs to bring together data 
stored in different blob and ADLS accounts. I couldn’t find anyth

Unable to Use spec.flinkVersion v1_17 with Flink Operator

2023-03-28 Thread Ivan Webber via user
Hello Flink Users:

I'm trying to upgrade to use Flink 1.17.0 with my pipeline in order to have 
support for writing to Azure Data Lake Storage. However when I change the 
`spec.flinkVersion` to v1_17 I get an error message:

```bash
The FlinkDeployment "test-replay-streaming-run" is invalid: spec.flinkVersion: 
Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", "v1_16"
```

The documentation 
(here<https://flink.apache.org/downloads/#:~:text=Flink%20version(s)%3A-,1.17.0,-1.16.1>)
 says latest version of Flink Operator (1.4) should support 1.17.0 and I made 
sure to update by running the below commands. I’m wondering if an Enum needs 
updated or if the latest stable Flink Operator doesn’t actually support 1.17.0 
yet. Any pointers would be appreciated.

```bash
helm uninstall flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
helm repo rm flink-operator-repo
helm repo add flink-operator-repo 
https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
```

Thanks,

Ivan Webber



RE: [EXTERNAL] Re: Unable to Use spec.flinkVersion v1_17 with Flink Operator

2023-03-29 Thread Ivan Webber via user
The commands for updating CRD seem to assume they are being run from the 
flink-operator-repo. Accordingly, to run them in my environment I should run 
them as:
```bash
# upgrade CRD
kubectl replace -f 
https://github.com/apache/flink-kubernetes-operator/tree/release-1.4/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
kubectl replace -f 
https://github.com/apache/flink-kubernetes-operator/tree/release-1.4/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
```

However, I got the following error for both commands:
```bash
error: error parsing 
https://github.com/apache/flink-kubernetes-operator/tree/release-1.4/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml:
 error converting YAML to JSON: yaml: line 27: mapping values are not allowed 
in this context
```

I tried the following based on the documentation:
```bash

helm uninstall flink-kubernetes-operator

kubectl delete crd flinkdeployments.flink.apache.org

helm repo update flink-operator-repo
helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
```

Now I do not get the error about `v1_17` and my pipeline is starting. I will 
follow-up if I find any new problems.

Thanks for the help!

Best,

Ivan


From: Gyula Fóra<mailto:gyula.f...@gmail.com>
Sent: Tuesday, March 28, 2023 10:25 PM
To: Ivan Webber<mailto:ivan.web...@microsoft.com>
Cc: Ivan Webber via user<mailto:user@flink.apache.org>
Subject: [EXTERNAL] Re: Unable to Use spec.flinkVersion v1_17 with Flink 
Operator

You don't often get email from gyula.f...@gmail.com. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>
I think you forgot to upgrade the CRD during the upgrade process on your 
cluster.

As you can see here: 
https://github.com/apache/flink-kubernetes-operator/blob/release-1.4/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml#L38-L44
The newer version already contains support for 1.17.
For docs you can refer to: 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd


Cheers,
Gyula

On Tue, Mar 28, 2023 at 10:29 PM Ivan Webber via user 
mailto:user@flink.apache.org>> wrote:
Hello Flink Users:

I'm trying to upgrade to use Flink 1.17.0 with my pipeline in order to have 
support for writing to Azure Data Lake Storage. However when I change the 
`spec.flinkVersion` to v1_17 I get an error message:

```bash
The FlinkDeployment "test-replay-streaming-run" is invalid: spec.flinkVersion: 
Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", "v1_16"
```

The documentation 
(here<https://flink.apache.org/downloads/#:~:text=Flink%20version(s)%3A-,1.17.0,-1.16.1>)
 says latest version of Flink Operator (1.4) should support 1.17.0 and I made 
sure to update by running the below commands. I’m wondering if an Enum needs 
updated or if the latest stable Flink Operator doesn’t actually support 1.17.0 
yet. Any pointers would be appreciated.

```bash
helm uninstall flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
helm repo rm flink-operator-repo
helm repo add flink-operator-repo 
https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
```

Thanks,

Ivan Webber




Failed docker-entrypoint.sh in Flink-Operator Logs

2023-04-04 Thread Ivan Webber via user

I’ve noticed that all jobs I start with `flink-operator` have the following 
message at the top of their logs for both jobmanager and taskmanager pods:

```
sed: couldn't open temporary file /opt/flink/conf/sedRTwsr1: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedcDS30D: Read-only file 
system
/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Starting kubernetes-application as a console application on host 
test-replay-run-b6458d699-nmfvf.
```

It seems these failures are due to the flink Docker images’ entrypoint being 
run by a user without permissions to write to `/opt/flink/conf` (as part of 
`sed -i`) or pipe to files in that folder. However, I’ve made my own container 
based on the docker scripts and even ensuring that all files are owned by 
`flink:flink` with full read-write permissions when running with Flink-operator 
these messages show up. Accordingly, I’m wondering if this is a bug or just 
something to ignore (e.g. flink-operator initialized the files and locked them 
to prevent further changes). If they are just something to ignore, it might be 
good to add an argument to `/docker-entrypoint.sh` to skip running it so there 
aren’t confusing error messages.

Thanks,

Ivan


RE: MSI Auth to Azure Storage Account with Flink Apache Operator not working

2023-05-16 Thread Ivan Webber via user
When you create your cluster you probably need to ensure the following settings 
are set. I briefly looked into MSI but ended up using Azure Key Vault with 
CSI-storage driver for initial prototype 
(https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/aks/csi-secrets-store-driver.md#upgrade-an-existing-aks-cluster-with-azure-key-vault-provider-for-secrets-store-csi-driver-support).

For me it helped to think about it as Hadoop configuration.

If you do get MSI working I would be interested in hearing what made it work 
for you, so be sure to update the docs or put it on this thread.

 To create from scratch
Create an AKS cluster with the required settings.
```bash
# create an AKS cluster with pod-managed identity and Azure CNI
az aks create --resource-group $RESOURCE_GROUP --name $CLUSTER 
--enable-managed-identity --network-plugin azure --enable-pod-identity
```

I hope that is somehow helpful.

Best of luck,

Ivan

From: DEROCCO, CHRISTOPHER<mailto:cd9...@att.com>
Sent: Monday, May 8, 2023 3:40 PM
To: Shammon FY<mailto:zjur...@gmail.com>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: [EXTERNAL] RE: MSI Auth to Azure Storage Account with Flink Apache 
Operator not working

You don't often get email from cd9...@att.com. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>

Shammon,



I’m still having trouble setting the package in my cluster environment. I have 
these lines added to my dockerfile

mkdir ./plugins/azure-fs-hadoop

cp ./opt/flink-azure-fs-hadoop-1.16.0.jar ./plugins/azure-fs-hadoop/

according to the flink docs here 
(https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/)
This should enable the flink-azure-fs-hadoop jar in the environment which has 
the classes to enable the adls2 MSI authentication.
I also have the following dependency in my pom to add it to the FAT Jar.


org.apache.flink
flink-azure-fs-hadoop
${flink.version}


However, I still get the class not found error and the flink job is not able to 
authenticate to the azure storage account to store its checkpoints. I’m not 
sure what other configuration pieces I’m missing. Has anyone had successful 
with writing checkpoints to Azure ADLS2gen Storage with managed service 
identity (MSI) authentication.?



From: Shammon FY 
Sent: Friday, May 5, 2023 8:38 PM
To: DEROCCO, CHRISTOPHER 
Cc: user@flink.apache.org
Subject: Re: MSI Auth to Azure Storage Account with Flink Apache Operator not 
working

Hi DEROCCO,

I think you can check the startup command of the job on k8s to see if the jar 
file is in the classpath.

If your job is DataStream, you need to add hadoop azure dependency in your 
project, and if it is an SQL job, you need to include this jar file in your 
Flink release package. Or you can also add this package in your cluster 
environment.

Best,
Shammon FY


On Fri, May 5, 2023 at 10:21 PM DEROCCO, CHRISTOPHER 
mailto:cd9...@att.com>> wrote:
How can I add the package to the flink job or check if it is there?

From: Shammon FY mailto:zjur...@gmail.com>>
Sent: Thursday, May 4, 2023 9:59 PM
To: DEROCCO, CHRISTOPHER mailto:cd9...@att.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: MSI Auth to Azure Storage Account with Flink Apache Operator not 
working

Hi DEROCCO,

I think you need to check whether there is a hadoop-azure jar file in the 
classpath of your flink job. From an error message 'Caused by: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.', your flink 
job may be missing this package.

Best,
Shammon FY


On Fri, May 5, 2023 at 4:40 AM DEROCCO, CHRISTOPHER 
mailto:cd9...@att.com>> wrote:

I receive the error:  Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.
I’m using flink 1.16 running in Azure Kubernetes using the Flink Apache 
Kubernetes Operator.
I have the following specified in the spec.flinkConfiguration: as per the 
Apache Kubernetes operator documentation.

fs.azure.createRemoteFileSystemDuringInitialization: "true"

fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net<https://urldefense.com/v3/__http:/fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4HpyjvOZFkA$>:
 OAuth

fs.azure.account.oauth.provider.type..dfs.core.windows.net<https://urldefense.com/v3/__http:/dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4Hpycm9yrUw$>:
 org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
fs.azure.account.oauth2.msi.tenant. 
.dfs.core.windows.net<https://urldefense.com/v3/__http:/dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmK

RE: MSI Auth to Azure Storage Account with Flink Apache Operator not working

2023-05-19 Thread Ivan Webber via user
I will provide more details as to how I was able to use AKV with CSI. Also, I 
looked in the Flink source at the ADLS FileSystem factory and I think despite 
what it says in the docs configuration options prefixed with flink.hadoop won’t 
get forwarded.

You can expose the key vault as Kubernetes secrets that can be exposed as 
environment variables (see the docs I previously sent). Then you can provide a 
KeyProvider class (org.apache.hadoop.fs.azure.KeyProvider) that reads from the 
environment variables. In the flink-conf.yaml you can configure Hadoop to use 
the KeyProvider (all keys with fs.azure prefix are forwarded to Hadoop). A jar 
with the KeyProvider should be included in the same directory as the ADLS 
plugin.

```flink-conf.yaml
fs.azure.account.keyprovider..dfs.core.windows.net: 

fs.azure.account.keyprovider..blob.core.windows.net: 

```

Keep in mind that all the available releases of Flink have one of two bugs 
causing problems reading and/or writing to ADLS, so you will need to re-build 
the ADLS plugin from source by checking out the release commit (probably 
1.17.0) and cherry-picking the bug fix (or wait for 1.17.1 or 1.18.0 which will 
have the fixes).

I’m new to using Flink, and it took me a while to figure out this; but 
hopefully it is helpful to you. I get the sense that few people are using ADLS 
with newer Flink versions or something because the docs and support seem 
half-baked.

Let me know if you make progress using MSI.

Best of luck,

Ivan

From: DEROCCO, CHRISTOPHER<mailto:cd9...@att.com>
Sent: Wednesday, May 17, 2023 6:20 AM
To: Ivan Webber<mailto:ivan.web...@microsoft.com>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>; Shammon 
FY<mailto:zjur...@gmail.com>
Subject: [EXTERNAL] RE: MSI Auth to Azure Storage Account with Flink Apache 
Operator not working

You don't often get email from cd9...@att.com. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>
Ivan,

How did you use Azure Key Vault with CSI because the flink operator uses a 
configmap and not a Kubernetes secret to create the flink-conf file? I have 
also tried using pod-identities as well as the new workload identity 
(https://learn.microsoft.com/en-us/azure/aks/workload-identity-overview) to no 
avail. It seems to be an issue with configuring 
flink-azure-fs-hadoop-1.16.0.jar with using the flink operator.

From: Ivan Webber 
Sent: Tuesday, May 16, 2023 8:01 PM
To: DEROCCO, CHRISTOPHER ; Shammon FY 
Cc: user@flink.apache.org
Subject: RE: MSI Auth to Azure Storage Account with Flink Apache Operator not 
working

When you create your cluster you probably need to ensure the following settings 
are set. I briefly looked into MSI but ended up using Azure Key Vault with 
CSI-storage driver for initial prototype 
(https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/aks/csi-secrets-store-driver.md#upgrade-an-existing-aks-cluster-with-azure-key-vault-provider-for-secrets-store-csi-driver-support<https://urldefense.com/v3/__https:/github.com/MicrosoftDocs/azure-docs/blob/main/articles/aks/csi-secrets-store-driver.md*upgrade-an-existing-aks-cluster-with-azure-key-vault-provider-for-secrets-store-csi-driver-support__;Iw!!BhdT!mHwlf9O2NhRKRzvy33T-EsBBZAXaZUufAyM2f5Vb5TGEsM28sEbfx9QcxOo9iJhwAuVMoPGdXSRlw7kmmSCxtw$>).

For me it helped to think about it as Hadoop configuration.

If you do get MSI working I would be interested in hearing what made it work 
for you, so be sure to update the docs or put it on this thread.

 To create from scratch
Create an AKS cluster with the required settings.
```bash
# create an AKS cluster with pod-managed identity and Azure CNI
az aks create --resource-group $RESOURCE_GROUP --name $CLUSTER 
--enable-managed-identity --network-plugin azure --enable-pod-identity
```

I hope that is somehow helpful.

Best of luck,

Ivan

From: DEROCCO, CHRISTOPHER<mailto:cd9...@att.com>
Sent: Monday, May 8, 2023 3:40 PM
To: Shammon FY<mailto:zjur...@gmail.com>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: [EXTERNAL] RE: MSI Auth to Azure Storage Account with Flink Apache 
Operator not working

You don't often get email from cd9...@att.com<mailto:cd9...@att.com>. Learn why 
this is 
important<https://urldefense.com/v3/__https:/aka.ms/LearnAboutSenderIdentification__;!!BhdT!mHwlf9O2NhRKRzvy33T-EsBBZAXaZUufAyM2f5Vb5TGEsM28sEbfx9QcxOo9iJhwAuVMoPGdXSRlw7kwjMvaEQ$>

Shammon,



I’m still having trouble setting the package in my cluster environment. I have 
these lines added to my dockerfile

mkdir ./plugins/azure-fs-hadoop

cp ./opt/flink-azure-fs-hadoop-1.16.0.jar ./plugins/azure-fs-hadoop/

according to the flink docs here 
(https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/__;!!BhdT!mHwlf9O2NhRKRzvy33T-Es

Re: [EXTERNAL] TaskManagers Crushing

2023-11-29 Thread Ivan Webber via user
Were you ever able to find a workaround for this? I also have transient 
failures due to
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException.

From: Kenan Kılıçtepe 
Sent: Saturday, August 19, 2023 5:50 PM
To: user@flink.apache.org 
Subject: [EXTERNAL] TaskManagers Crushing

You don't often get email from kkilict...@gmail.com. Learn why this is 
important
Hi,

I have 4 task manager working on 4 servers.
They all crush at the same time without any useful error logs.
Only log I can see is some disconnection from Kafka for both consumer and 
producers.
Any idea or any help is appreciated.

Some logs from all taskmanagers:

I think first server 4 is crushing and it causes crush for all taskmanagers.

JobManager:

2023-08-18 15:16:46,528 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=47539-enumerator-admin-client] Node 2 
disconnected.
2023-08-18 15:19:00,303 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=tf_25464-enumerator-admin-client] Node 4 
disconnected.
2023-08-18 15:19:16,668 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=cpu_59942-enumerator-admin-client] Node 1 
disconnected.
2023-08-18 15:19:16,764 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=cpu_55128-enumerator-admin-client] Node 3 
disconnected.
2023-08-18 15:19:27,913 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to 
[/10.11.0.51:42778] failed with java.io.IOException: 
Connection reset by peer
2023-08-18 15:19:27,963 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@tef-prod-flink-04:38835] has failed, address is now gated for 
[50] ms. Reason: [Disassociated]
2023-08-18 15:19:27,967 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink-metrics@tef-prod-flink-04:46491] has failed, address is now 
gated for [50] ms. Reason: [Disassociated]
2023-08-18 15:19:29,225 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
RouterReplacementAlgorithm -> kafkaSink_sinkFaultyRouter_windowMode: Writer -> 
kafkaSink_sinkFaultyRouter_windowMode: Committer (3/4) 
(f6fd65e3fc049bd9021093d8f532bbaf_a47f4a3b960228021159de8de51dbb1f_2_0) 
switched from RUNNING to FAILED on 
injection-assia-3-pro-cloud-tef-gcp-europe-west1:39011-b24b1d @ 
injection-assia-3-pro-cloud-tef-gcp-europe-west1 (dataPort=35223).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'tef-prod-flink-04/10.11.0.51:37505 [ 
tef-prod-flink-04:38835-e3ca4d ] '. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 ~