[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams
[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420638#comment-15420638 ] Gyula Fora commented on FLINK-4391: --- We have also done something similar at King for handling heavy db operations in the following way: We had a parallel processing operator that could process in both the main thread (like now) or in 1 background thread. there are dedicated processing methods for both and the user code decides for every record where to process it. Either in the main thread in a blocking way or send it to the background thread queue. We also set a size limit on the number of queued elements. State access and knowing what keys to set in the backend becomes a little tricky if we want to keel good performance. > Provide support for asynchronous operations over streams > > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Jamie Grier > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4413) Improve savepoint restore error messages
Gyula Fora created FLINK-4413: - Summary: Improve savepoint restore error messages Key: FLINK-4413 URL: https://issues.apache.org/jira/browse/FLINK-4413 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing, Streaming Affects Versions: 1.1.0 Reporter: Gyula Fora Priority: Minor Currently when savepoint restore fails due to some problems with parallelism or the assigned uids the error messages contain all the job vertex id of the problematic task. This makes these kind of problems very difficult to debug for more complex topologies. I propose to add the user assigned task names to these error messages to make this much easier for users. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4413) Improve savepoint restore error messages
[ https://issues.apache.org/jira/browse/FLINK-4413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-4413: -- Description: Currently when savepoint restore fails due to some problems with parallelism or the assigned uids the error messages contain only the job vertex id of the problematic task. This makes these kind of problems very difficult to debug for more complex topologies. I propose to add the user assigned task names to these error messages to make this much easier for users. was: Currently when savepoint restore fails due to some problems with parallelism or the assigned uids the error messages contain all the job vertex id of the problematic task. This makes these kind of problems very difficult to debug for more complex topologies. I propose to add the user assigned task names to these error messages to make this much easier for users. > Improve savepoint restore error messages > > > Key: FLINK-4413 > URL: https://issues.apache.org/jira/browse/FLINK-4413 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.1.0 >Reporter: Gyula Fora >Priority: Minor > > Currently when savepoint restore fails due to some problems with parallelism > or the assigned uids the error messages contain only the job vertex id of the > problematic task. > This makes these kind of problems very difficult to debug for more complex > topologies. > I propose to add the user assigned task names to these error messages to make > this much easier for users. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4441) RocksDB statebackend makes all operators appear stateful
Gyula Fora created FLINK-4441: - Summary: RocksDB statebackend makes all operators appear stateful Key: FLINK-4441 URL: https://issues.apache.org/jira/browse/FLINK-4441 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Streaming Affects Versions: 1.1.1 Reporter: Gyula Fora Assignee: Gyula Fora Priority: Blocker When the state is empty the rocks db state backend returns an empty hashmap instead of a null in the snapshotPartitionedState method. This means that these operators always appear stateful to the flink runtime which makes it impossible for instance to remove a stateless operator using the savepoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4445) Ignore unmatched state when restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-4445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430336#comment-15430336 ] Gyula Fora commented on FLINK-4445: --- Hi Ufuk, My personal experience is that it's very easy to run into mistakes when dealing with more complex stateful job such as forget uids on kafka source/sink and other built-in stateful operators. Ignoring the unmatched state by default would be super dangerous and would have caused me serious issues in the past. I think adding a force ignore flag (option 1) would be the good way to go and is also very useful :) Cheers, Gyula > Ignore unmatched state when restoring from savepoint > > > Key: FLINK-4445 > URL: https://issues.apache.org/jira/browse/FLINK-4445 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.1.1 >Reporter: Ufuk Celebi > > When currently submitting a job with a savepoint, we require that all state > is matched to the new job. Many users have noted that this is overly strict. > I would like to loosen this and allow savepoints to be restored without > matching all state. > The following options come to mind: > (1) Keep the current behaviour, but add a flag to allow ignoring state when > restoring, e.g. {{bin/flink -s --ignoreUnmatchedState}}. This > would be non-API breaking. > (2) Ignore unmatched state and continue. Additionally add a flag to be strict > about checking the state, e.g. {{bin/flink -s --strict}}. This > would be API-breaking as the default behaviour would change. Users might be > confused by this because there is no straight forward way to notice that > nothing has been restored. > I'm not sure what's the best thing here. [~gyfora], [~aljoscha] What do you > think? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4441) RocksDB statebackend makes all operators appear stateful
[ https://issues.apache.org/jira/browse/FLINK-4441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora resolved FLINK-4441. --- Resolution: Fixed > RocksDB statebackend makes all operators appear stateful > > > Key: FLINK-4441 > URL: https://issues.apache.org/jira/browse/FLINK-4441 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.1.1 >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Blocker > > When the state is empty the rocks db state backend returns an empty hashmap > instead of a null in the snapshotPartitionedState method. > This means that these operators always appear stateful to the flink runtime > which makes it impossible for instance to remove a stateless operator using > the savepoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4471) Rocks backend fully async snapshot restore fails with custom serializers
Gyula Fora created FLINK-4471: - Summary: Rocks backend fully async snapshot restore fails with custom serializers Key: FLINK-4471 URL: https://issues.apache.org/jira/browse/FLINK-4471 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Streaming Affects Versions: 1.1.0 Reporter: Gyula Fora Priority: Critical The StateDescriptor is not deserialized with the usercode classloader. This makes restore fail with NoClassDefFound error if the user uses custom type serializers: https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L583 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4471) Rocks backend fully async snapshot restore fails with custom serializers
[ https://issues.apache.org/jira/browse/FLINK-4471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-4471: - Assignee: Gyula Fora > Rocks backend fully async snapshot restore fails with custom serializers > > > Key: FLINK-4471 > URL: https://issues.apache.org/jira/browse/FLINK-4471 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.1.0 >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Critical > > The StateDescriptor is not deserialized with the usercode classloader. This > makes restore fail with NoClassDefFound error if the user uses custom type > serializers: > https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L583 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4471) Rocks backend fully async snapshot restore fails with custom serializers
[ https://issues.apache.org/jira/browse/FLINK-4471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-4471. - Resolution: Won't Fix > Rocks backend fully async snapshot restore fails with custom serializers > > > Key: FLINK-4471 > URL: https://issues.apache.org/jira/browse/FLINK-4471 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.1.0 >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Critical > > The StateDescriptor is not deserialized with the usercode classloader. This > makes restore fail with NoClassDefFound error if the user uses custom type > serializers: > https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L583 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938170#comment-15938170 ] Gyula Fora commented on FLINK-6006: --- I have a slight feeling that this didnt completely solve the issue. I saw this occur again today using the 0.8 connector from the 1.2 branch built with the fix. > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938297#comment-15938297 ] Gyula Fora commented on FLINK-6006: --- I tried restarting the consumer when I was sure that the brokers have completely recovered and that still didnt help. > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938567#comment-15938567 ] Gyula Fora commented on FLINK-6006: --- Unfortunately I only have the JM log and that doesnt seem to contain any relevant information :( > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940010#comment-15940010 ] Gyula Fora commented on FLINK-6006: --- Don't be sorry, there is a high probability that I did something bad, or our kafka is screwed up in weird ways :) Adding some logging for this (maybe Info level) would probably help if this comes up again. Thanks for all the work > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6262) UnknownTopicOrPartitionException Kafka consumer error on broker restart/failure
Gyula Fora created FLINK-6262: - Summary: UnknownTopicOrPartitionException Kafka consumer error on broker restart/failure Key: FLINK-6262 URL: https://issues.apache.org/jira/browse/FLINK-6262 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.1.4, 1.2.0 Reporter: Gyula Fora The Kafka consumer fails on broker restarts/failures with the following error: java.io.IOException: Error while fetching from broker 'Node(22, kafka22.sto.midasplayer.com, 9092)': Exception for event.bifrost.log:10: kafka.common.UnknownTopicOrPartitionException at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:292) at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:313) We should have some restart logic around this -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6263) Leader error in Kafka producer on leader change (broker restart/failrue)
Gyula Fora created FLINK-6263: - Summary: Leader error in Kafka producer on leader change (broker restart/failrue) Key: FLINK-6263 URL: https://issues.apache.org/jira/browse/FLINK-6263 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.2.0 Reporter: Gyula Fora We have observed the following error in the Kafka producer java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:376) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:293) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6264) Kafka consumer fails if can't find leader for partition
Gyula Fora created FLINK-6264: - Summary: Kafka consumer fails if can't find leader for partition Key: FLINK-6264 URL: https://issues.apache.org/jira/browse/FLINK-6264 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.2.0 Reporter: Gyula Fora We have observed the following error many times when brokers failed/were restarted: java.lang.RuntimeException: Unable to find a leader for partitions: [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, KafkaPartitionHandle=[mytopic,10], offset=-1] at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5068) YARN HA: Job scheduled before TM is up leading to "Not enough free slots" error
Gyula Fora created FLINK-5068: - Summary: YARN HA: Job scheduled before TM is up leading to "Not enough free slots" error Key: FLINK-5068 URL: https://issues.apache.org/jira/browse/FLINK-5068 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.1.3 Reporter: Gyula Fora On one occasion after a job manager failure, the job was could not be recovered as it was scheduled before the TM was up causing an unrecoverable failure. So I ended up with a running yarn jm + tm with enough slots with a job failed (and not restarting) Please drop me a mail for the logs :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
Gyula Fora created FLINK-5071: - Summary: YARN: yarn.containers.vcores config not respected when checking for vcores Key: FLINK-5071 URL: https://issues.apache.org/jira/browse/FLINK-5071 Project: Flink Issue Type: Bug Components: YARN Client Reporter: Gyula Fora Fix For: 1.1.3 The YarnClient validates whether the number of task slots is less then the max vcores settings of yarn but seems to ignore the yarn.containers.vcores flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.
[ https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667283#comment-15667283 ] Gyula Fora commented on FLINK-4182: --- Ah sorry those jobs were 1.1.3 (I am not sure if the error was fixed since then) > HA recovery not working properly under ApplicationMaster failures. > -- > > Key: FLINK-4182 > URL: https://issues.apache.org/jira/browse/FLINK-4182 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Priority: Blocker > > When randomly killing TaskManager and ApplicationMaster, a job sometimes does > not properly recover in HA mode. > There can be different symptoms for this. For example, in one case the job is > dying with the following exception: > {code} > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Cannot set up the user code libraries: Cannot get library > with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381) > at > da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set > up the user code libraries: Cannot get library with hash > 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch
[jira] [Commented] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.
[ https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667277#comment-15667277 ] Gyula Fora commented on FLINK-4182: --- Hi, I have observed the same scenario in many of my jobs under AM failiures, I saved some logs I am running 1.1.4 > HA recovery not working properly under ApplicationMaster failures. > -- > > Key: FLINK-4182 > URL: https://issues.apache.org/jira/browse/FLINK-4182 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, State Backends, Checkpointing >Affects Versions: 1.0.3 >Reporter: Stefan Richter >Priority: Blocker > > When randomly killing TaskManager and ApplicationMaster, a job sometimes does > not properly recover in HA mode. > There can be different symptoms for this. For example, in one case the job is > dying with the following exception: > {code} > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Cannot set up the user code libraries: Cannot get library > with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381) > at > da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set > up the user code libraries: Cannot get library with hash > 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(M
[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores
[ https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683135#comment-15683135 ] Gyula Fora commented on FLINK-5071: --- Both actually. The problem is here: https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L308 which only compares the allowed number of vcores (in yarn config) with the number of slots, assuming that requested vcores == numslots > YARN: yarn.containers.vcores config not respected when checking for vcores > -- > > Key: FLINK-5071 > URL: https://issues.apache.org/jira/browse/FLINK-5071 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Gyula Fora > Fix For: 1.1.3 > > > The YarnClient validates whether the number of task slots is less then the > max vcores settings of yarn but seems to ignore the yarn.containers.vcores > flink config which should be used instead of the slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3397) Failed streaming jobs should fall back to the most recent checkpoint/savepoint
[ https://issues.apache.org/jira/browse/FLINK-3397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346080#comment-15346080 ] Gyula Fora commented on FLINK-3397: --- You are always free to give it a shot, to be honest I am not perfectly sure wether other people are working in this direction. Maybe [~uce] can help me out here as he knows well what's going on with savepoints... > Failed streaming jobs should fall back to the most recent checkpoint/savepoint > -- > > Key: FLINK-3397 > URL: https://issues.apache.org/jira/browse/FLINK-3397 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Gyula Fora >Priority: Minor > > The current fallback behaviour in case of a streaming job failure is slightly > counterintuitive: > If a job fails it will fall back to the most recent checkpoint (if any) even > if there were more recent savepoint taken. This means that savepoints are not > regarded as checkpoints by the system only points from where a job can be > manually restarted. > I suggest to change this so that savepoints are also regarded as checkpoints > in case of a failure and they will also be used to automatically restore the > streaming job. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4120) Lightweight fault tolerance through recomputing lost state
[ https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350565#comment-15350565 ] Gyula Fora commented on FLINK-4120: --- Hi, Thanks for the interesting proposal. I have several questions that popped into my mind while trying to understand this: How would you know what states can be "easily recomputed"? I think in many cases the state is computed over a long period of time (weeks/months) and the overhead of checkpointing is irrelevant compared to restreaming the records. Let's assume you can recompute a state for some operators but others would restore from their checkpoints. As you described these would wait for the latest checkpoint barreier before continuing to process records, however the barriers are not deterministic so this would not work. This also assumes a lot about the persistent storage, as in many cases Kafka for instance does not hold the data forever (or long enough for the null checkpoint). Cheers, Gyula > Lightweight fault tolerance through recomputing lost state > -- > > Key: FLINK-4120 > URL: https://issues.apache.org/jira/browse/FLINK-4120 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Dénes Vadász > > The current fault tolerance mechanism requires that stateful operators write > their internal state to stable storage during a checkpoint. > This proposal aims at optimizing out this operation in the cases where the > operator state can be recomputed from a finite (practically: small) set of > source records, and those records are already on checkpoint-aware persistent > storage (e.g. in Kafka). > The rationale behind the proposal is that the cost of reprocessing is paid > only on recovery from (presumably rare) failures, whereas the cost of > persisting the state is paid on every checkpoint. Eliminating the need for > persistent storage will also simplify system setup and operation. > In the cases where this optimisation is applicable, the state of the > operators can be restored by restarting the pipeline from a checkpoint taken > before the pipeline ingested any of the records required for the state > re-computation of the operators (call this the "null-state checkpoint"), as > opposed to a restart from the "latest checkpoint". > The "latest checkpoint" is still relevant for the recovery: the barriers > belonging to that checkpoint must be inserted into the source streams in the > position they were originally inserted. Sinks must discard all records until > this barrier reaches them. > Note the inherent relationship between the "latest" and the "null-state" > checkpoints: the pipeline must be restarted from the latter to restore the > state at the former. > For the stateful operators for which this optimization is applicable we can > define the notion of "current null-state watermark" as the watermark such > that the operator can correctly (re)compute its current state merely from > records after this watermark. > > For the checkpoint-coordinator to be able to compute the null-state > checkpoint, each stateful operator should report its "current null-state > watermark" as part of acknowledging the ongoing checkpoint. The null-state > checkpoint of the ongoing checkpoint is the most recent checkpoint preceding > all the received null-state watermarks (assuming the pipeline preserves the > relative order of barriers and watermarks). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4120) Lightweight fault tolerance through recomputing lost state
[ https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-4120: -- Priority: Minor (was: Major) > Lightweight fault tolerance through recomputing lost state > -- > > Key: FLINK-4120 > URL: https://issues.apache.org/jira/browse/FLINK-4120 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Dénes Vadász >Priority: Minor > > The current fault tolerance mechanism requires that stateful operators write > their internal state to stable storage during a checkpoint. > This proposal aims at optimizing out this operation in the cases where the > operator state can be recomputed from a finite (practically: small) set of > source records, and those records are already on checkpoint-aware persistent > storage (e.g. in Kafka). > The rationale behind the proposal is that the cost of reprocessing is paid > only on recovery from (presumably rare) failures, whereas the cost of > persisting the state is paid on every checkpoint. Eliminating the need for > persistent storage will also simplify system setup and operation. > In the cases where this optimisation is applicable, the state of the > operators can be restored by restarting the pipeline from a checkpoint taken > before the pipeline ingested any of the records required for the state > re-computation of the operators (call this the "null-state checkpoint"), as > opposed to a restart from the "latest checkpoint". > The "latest checkpoint" is still relevant for the recovery: the barriers > belonging to that checkpoint must be inserted into the source streams in the > position they were originally inserted. Sinks must discard all records until > this barrier reaches them. > Note the inherent relationship between the "latest" and the "null-state" > checkpoints: the pipeline must be restarted from the latter to restore the > state at the former. > For the stateful operators for which this optimization is applicable we can > define the notion of "current null-state watermark" as the watermark such > that the operator can correctly (re)compute its current state merely from > records after this watermark. > > For the checkpoint-coordinator to be able to compute the null-state > checkpoint, each stateful operator should report its "current null-state > watermark" as part of acknowledging the ongoing checkpoint. The null-state > checkpoint of the ongoing checkpoint is the most recent checkpoint preceding > all the received null-state watermarks (assuming the pipeline preserves the > relative order of barriers and watermarks). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4120) Lightweight fault tolerance through recomputing lost state
[ https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-4120: -- Component/s: (was: Core) State Backends, Checkpointing > Lightweight fault tolerance through recomputing lost state > -- > > Key: FLINK-4120 > URL: https://issues.apache.org/jira/browse/FLINK-4120 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Dénes Vadász > > The current fault tolerance mechanism requires that stateful operators write > their internal state to stable storage during a checkpoint. > This proposal aims at optimizing out this operation in the cases where the > operator state can be recomputed from a finite (practically: small) set of > source records, and those records are already on checkpoint-aware persistent > storage (e.g. in Kafka). > The rationale behind the proposal is that the cost of reprocessing is paid > only on recovery from (presumably rare) failures, whereas the cost of > persisting the state is paid on every checkpoint. Eliminating the need for > persistent storage will also simplify system setup and operation. > In the cases where this optimisation is applicable, the state of the > operators can be restored by restarting the pipeline from a checkpoint taken > before the pipeline ingested any of the records required for the state > re-computation of the operators (call this the "null-state checkpoint"), as > opposed to a restart from the "latest checkpoint". > The "latest checkpoint" is still relevant for the recovery: the barriers > belonging to that checkpoint must be inserted into the source streams in the > position they were originally inserted. Sinks must discard all records until > this barrier reaches them. > Note the inherent relationship between the "latest" and the "null-state" > checkpoints: the pipeline must be restarted from the latter to restore the > state at the former. > For the stateful operators for which this optimization is applicable we can > define the notion of "current null-state watermark" as the watermark such > that the operator can correctly (re)compute its current state merely from > records after this watermark. > > For the checkpoint-coordinator to be able to compute the null-state > checkpoint, each stateful operator should report its "current null-state > watermark" as part of acknowledging the ongoing checkpoint. The null-state > checkpoint of the ongoing checkpoint is the most recent checkpoint preceding > all the received null-state watermarks (assuming the pipeline preserves the > relative order of barriers and watermarks). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4155) Get Kafka producer partition info in open method instead of constructor
Gyula Fora created FLINK-4155: - Summary: Get Kafka producer partition info in open method instead of constructor Key: FLINK-4155 URL: https://issues.apache.org/jira/browse/FLINK-4155 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.0.3, 1.1.0 Reporter: Gyula Fora Currently the Flink Kafka producer does not really do any error handling if something is wrong with the partition metadata as it is serialized with the user function. This means that in some cases the job can go into an error loop when using the checkpoints. Getting the partition info in the open method would solve this problem (like restarting from a savepoint which re-runs the constructor). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
Gyula Fora created FLINK-4193: - Summary: Task manager JVM crashes while deploying cancelling jobs Key: FLINK-4193 URL: https://issues.apache.org/jira/browse/FLINK-4193 Project: Flink Issue Type: Bug Components: Streaming, TaskManager Reporter: Gyula Fora Priority: Critical We have observed several TM crashes while deploying larger stateful streaming jobs that use the RocksDB state backend. As the JVMs crash the logs don't show anything but I have uploaded all the info I have got from the standard output. This indicates some GC and possibly some RocksDB issues underneath but we could not really figure out much more. GC segfault https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 Other crashes (maybe rocks related) https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371139#comment-15371139 ] Gyula Fora commented on FLINK-4193: --- I know that rocks is bundled within the application jar, but I have also built that yesterday or two days ago, and assuming that maven has a pretty much up-to-date snapshot version this should not be a problem. > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371134#comment-15371134 ] Gyula Fora commented on FLINK-4193: --- We are using a pretty recent version, just a couple of days old. The server is at commit 3ab9e36 (jul 7) > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371524#comment-15371524 ] Gyula Fora commented on FLINK-4193: --- It happened multiple times in a row on the same cluster but on different machines. But it does not occur always. The total parallelism that we were deploying was about 200. Basically what we did was to run a script that deploys 5 jobs one ofter the other with 10 sec in between. It usually crashed somewhere in the middle. > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372638#comment-15372638 ] Gyula Fora commented on FLINK-4193: --- I can probably try a different GC later but I have only seen That error once. Now I have got another error that happened after some jobs crashed on a Kafka issue: https://gist.github.com/gyfora/ea80631abdf59b6eb2e198e9bfa5c520 > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3006) TypeExtractor fails on custom type
[ https://issues.apache.org/jira/browse/FLINK-3006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395198#comment-15395198 ] Gyula Fora commented on FLINK-3006: --- I agree, Stephan, closing it. > TypeExtractor fails on custom type > -- > > Key: FLINK-3006 > URL: https://issues.apache.org/jira/browse/FLINK-3006 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10.0, 1.0.0 >Reporter: Gyula Fora > > I get a weird error when I try to execute my job on the cluster. Locally this > works fine but running it from the command line fails during typeextraction: > input1.union(input2, input3).map(Either:: > Left).returns(eventOrLongType); > The UserEvent type is a subclass of Tuple4 with > no extra fields. And the Either type is a regular pojo with 2 public nullable > fields and a a default constructor. > This fails when trying to extract the output type from the mapper, but I > wouldnt actually care about that because I am providing my custom type > implementation for this Either type. > The error: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) > at org.apache.flink.client.program.Client.runBlocking(Client.java:250) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:971) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1021) > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(ArrayList.java:418) > at java.util.ArrayList.get(ArrayList.java:431) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:599) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:493) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1392) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1273) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:560) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:389) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:273) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110) > at > org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3006) TypeExtractor fails on custom type
[ https://issues.apache.org/jira/browse/FLINK-3006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-3006. - Resolution: Not A Problem > TypeExtractor fails on custom type > -- > > Key: FLINK-3006 > URL: https://issues.apache.org/jira/browse/FLINK-3006 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.10.0, 1.0.0 >Reporter: Gyula Fora > > I get a weird error when I try to execute my job on the cluster. Locally this > works fine but running it from the command line fails during typeextraction: > input1.union(input2, input3).map(Either:: > Left).returns(eventOrLongType); > The UserEvent type is a subclass of Tuple4 with > no extra fields. And the Either type is a regular pojo with 2 public nullable > fields and a a default constructor. > This fails when trying to extract the output type from the mapper, but I > wouldnt actually care about that because I am providing my custom type > implementation for this Either type. > The error: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) > at org.apache.flink.client.program.Client.runBlocking(Client.java:250) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:971) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1021) > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 > at java.util.ArrayList.elementData(ArrayList.java:418) > at java.util.ArrayList.get(ArrayList.java:431) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:599) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:493) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1392) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1273) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:560) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:389) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:273) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110) > at > org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4275) Generic Folding, Reducing and List states behave differently from other state backends
Gyula Fora created FLINK-4275: - Summary: Generic Folding, Reducing and List states behave differently from other state backends Key: FLINK-4275 URL: https://issues.apache.org/jira/browse/FLINK-4275 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Streaming Reporter: Gyula Fora Priority: Critical In https://github.com/apache/flink/commit/12bf7c1a0b81d199085fe874c64763c51a93b3bf the expected behaviour of Folding/Reducing/List states have been changed to return null instead of empty collections/default values. This was adapted for the included state backends (Memory, FS, Rocks) but not for the Generic state wrappers. As there are no tests for the Generic backend using the StateBackendTestBase this didnt show. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4275) Generic Folding, Reducing and List states behave differently from other state backends
[ https://issues.apache.org/jira/browse/FLINK-4275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399143#comment-15399143 ] Gyula Fora commented on FLINK-4275: --- I would not remove them in any case. They provide a very nice way to use custom state implementations without having to implement everything you don't care about. (We also use them extensively) > Generic Folding, Reducing and List states behave differently from other state > backends > -- > > Key: FLINK-4275 > URL: https://issues.apache.org/jira/browse/FLINK-4275 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming >Reporter: Gyula Fora >Priority: Critical > > In > https://github.com/apache/flink/commit/12bf7c1a0b81d199085fe874c64763c51a93b3bf > the expected behaviour of Folding/Reducing/List states have been changed to > return null instead of empty collections/default values. > This was adapted for the included state backends (Memory, FS, Rocks) but not > for the Generic state wrappers. As there are no tests for the Generic backend > using the StateBackendTestBase this didnt show. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-6291) Internal Timer service cannot be "removed"
Gyula Fora created FLINK-6291: - Summary: Internal Timer service cannot be "removed" Key: FLINK-6291 URL: https://issues.apache.org/jira/browse/FLINK-6291 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Streaming Affects Versions: 1.2.0 Reporter: Gyula Fora Currently it is not possible to register an internal timer service in one job and remove it after a savepoint as a nullpointer exception is thrown in the next savepoint: Caused by: java.lang.Exception: Could not write timer service of MyOperator (17/60) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:418) at com.king.rbea.backend.operators.scriptexecution.RBEAOperator.snapshotState(RBEAOperator.java:327) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) ... 13 more Caused by: java.lang.NullPointerException at org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(HeapInternalTimerService.java:294) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:414) ... 15 more The timer serializer is null in this case as the timer service has never been started properly. We should probably discard the timers for the services that are not reregistered after restore so we can get rid of the state completely. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6291) Internal Timer service cannot be "removed"
[ https://issues.apache.org/jira/browse/FLINK-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972650#comment-15972650 ] Gyula Fora commented on FLINK-6291: --- Sounds good :) > Internal Timer service cannot be "removed" > -- > > Key: FLINK-6291 > URL: https://issues.apache.org/jira/browse/FLINK-6291 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > Currently it is not possible to register an internal timer service in one job > and remove it after a savepoint as a nullpointer exception is thrown in the > next savepoint: > Caused by: java.lang.Exception: Could not write timer service of MyOperator > (17/60) to checkpoint state stream. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:418) > at > com.king.rbea.backend.operators.scriptexecution.RBEAOperator.snapshotState(RBEAOperator.java:327) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 13 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(HeapInternalTimerService.java:294) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:414) > ... 15 more > The timer serializer is null in this case as the timer service has never been > started properly. > We should probably discard the timers for the services that are not > reregistered after restore so we can get rid of the state completely. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition
[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976253#comment-15976253 ] Gyula Fora commented on FLINK-6264: --- I am not sure what causes the exception, probably [~tzulitai] is right. In any case I think we should recover from this problem without crashing the job because this seems to be something occuring relatively frequently. This probably means we have to retry fetching the metadata (for the affected partitions or all) a couple of times with some backoff maybe to give some time for Kafka to recover as well (if that causes the problem.) In many cases we have noticed that after a problematic leader change/broker death it takes some time (seconds or minutes) until Kafka goes back in a state that will operate normally, we should try to bridge these gaps without crashing because that's much worse. > Kafka consumer fails if can't find leader for partition > --- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985119#comment-15985119 ] Gyula Fora commented on FLINK-6390: --- Hi Stephan, This looks pretty useful. One thing that came to my mind about this whether it makes sense to add a hook when all tasks have completeted their local snapshot but before completing the full snapshot. (To implement a 2 phase committing logic for instance which could be used backends that present the data externally) Gyula > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its complet
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985122#comment-15985122 ] Gyula Fora commented on FLINK-6390: --- we could call it completeCheckpoint for example > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by completing the > future. For hooks that do not >* need to store data, the future would be completed with null. >* >* @param checkpointId The ID (logical timestamp, monotonously > increasing) of the checkpoint >* @param timestamp The wall clock timest
[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator
[ https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986084#comment-15986084 ] Gyula Fora commented on FLINK-6390: --- Not a big deal but Stephan I think you missed my comment :) > Add Trigger Hooks to the Checkpoint Coordinator > --- > > Key: FLINK-6390 > URL: https://issues.apache.org/jira/browse/FLINK-6390 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Some source systems require to be notified prior to starting a checkpoint, in > order to do preparatory work for the checkpoint. > I propose to add an interface to allow sources to register hooks that are > called by the checkpoint coordinator when triggering / restoring a checkpoint. > These hooks may produce state that is stores with the checkpoint metadata. > Envisioned interface for the hooks > {code} > /** > * The interface for hooks that can be called by the checkpoint coordinator > when triggering or > * restoring a checkpoint. Such a hook is useful for example when preparing > external systems for > * taking or restoring checkpoints. > * > * The {@link #triggerCheckpoint(long, long, Executor)} method (called > when triggering a checkpoint) > * can return a result (via a future) that will be stored as part of the > checkpoint metadata. > * When restoring a checkpoint, that stored result will be given to the > {@link #restoreCheckpoint(long, Object)} > * method. The hook's {@link #getIdentifier() identifier} is used to map data > to hook in the presence > * of multiple hooks, and when resuming a savepoint that was potentially > created by a different job. > * The identifier has a similar role as for example the operator UID in the > streaming API. > * > * The MasterTriggerRestoreHook is defined when creating the streaming > dataflow graph. It is attached > * to the job graph, which gets sent to the cluster for execution. To avoid > having to make the hook > * itself serializable, these hooks are attached to the job graph via a > {@link MasterTriggerRestoreHook.Factory}. > * > * @param The type of the data produced by the hook and stored as part of > the checkpoint metadata. > *If the hook never stores any data, this can be typed to {@code > Void}. > */ > public interface MasterTriggerRestoreHook { > /** >* Gets the identifier of this hook. The identifier is used to identify > a specific hook in the >* presence of multiple hooks and to give it the correct checkpointed > data upon checkpoint restoration. >* >* The identifier should be unique between different hooks of a job, > but deterministic/constant >* so that upon resuming a savepoint, the hook will get the correct > data. >* For example, if the hook calls into another storage system and > persists namespace/schema specific >* information, then the name of the storage system, together with the > namespace/schema name could >* be an appropriate identifier. >* >* When multiple hooks of the same name are created and attached to > a job graph, only the first >* one is actually used. This can be exploited to deduplicate hooks > that would do the same thing. >* >* @return The identifier of the hook. >*/ > String getIdentifier(); > /** >* This method is called by the checkpoint coordinator prior when > triggering a checkpoint, prior >* to sending the "trigger checkpoint" messages to the source tasks. >* >* If the hook implementation wants to store data as part of the > checkpoint, it may return >* that data via a future, otherwise it should return null. The data is > stored as part of >* the checkpoint metadata under the hooks identifier (see {@link > #getIdentifier()}). >* >* If the action by this hook needs to be executed synchronously, > then this method should >* directly execute the action synchronously and block until it is > complete. The returned future >* (if any) would typically be a completed future. >* >* If the action should be executed asynchronously and only needs to > complete before the >* checkpoint is considered completed, then the method may use the > given executor to execute the >* actual action and would signal its completion by completing the > future. For hooks that do not >* need to store data, the future would be completed with null. >* >* @param checkpointId The ID (logical timestamp, monotonously > increasing) of the checkpoint >* @param timestamp The wall
[jira] [Created] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing
Gyula Fora created FLINK-4873: - Summary: Add config option to specify "home directory" for YARN client resource sharing Key: FLINK-4873 URL: https://issues.apache.org/jira/browse/FLINK-4873 Project: Flink Issue Type: Improvement Components: YARN Client Affects Versions: 1.2.0, 1.1.3 Reporter: Gyula Fora The YARN client currently uses FileSystem.getHomeDirectory() to store the jar files that needed to be shared on the cluster. This pretty much forces users to run HDFS or something compatible with the Hadoop FS api on the cluster. In some production environments file systems (distributed or simply shared) are simply mounted under the same path and do not require the use of the hadoop api for convenience. If we want to run Flink on YARN in these cases we would need to be able to define the "home directory" where Flink should copy the files for sharing. It could be something like: yarn.resource.upload.dir in the flink-conf.yaml -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints
[ https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15612892#comment-15612892 ] Gyula Fora commented on FLINK-4948: --- I think in general this would be very nice but what about tools that might want to alter the checkpoints intentionally for some reasons. Maybe cleaning/transforming the state. So I think we should aim for some flexible checking logic here. > Consider using checksums or similar to detect bad checkpoints > - > > Key: FLINK-4948 > URL: https://issues.apache.org/jira/browse/FLINK-4948 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jamie Grier > Fix For: 1.2.0 > > > We should consider proactively checking to verify that checkpoints are valid > when reading (and maybe writing). This should help prevent any possible > state corruption issues that might otherwise go undetected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15622787#comment-15622787 ] Gyula Fora commented on FLINK-4193: --- Yes, still an issue that we could not figure out. We are now moving to yarn so the ocassional TM crashes don't cause chain failures of jobs. One thing to note is that I also observed this to happen on Flink 1.0.3 once. Seems to be RocksDB related but mostly manifests as errors during GC > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs
[ https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624754#comment-15624754 ] Gyula Fora commented on FLINK-4193: --- These issues usually happened inside the RocksDB.open(...) method during initialization of the state backend. If you think that the refactoring can affect this then we might get lucky :) We are running this in production applications and haven't ported them to 1.2 but in a week or two I will start working on that. > Task manager JVM crashes while deploying cancelling jobs > > > Key: FLINK-4193 > URL: https://issues.apache.org/jira/browse/FLINK-4193 > Project: Flink > Issue Type: Bug > Components: Streaming, TaskManager >Reporter: Gyula Fora >Priority: Critical > > We have observed several TM crashes while deploying larger stateful streaming > jobs that use the RocksDB state backend. > As the JVMs crash the logs don't show anything but I have uploaded all the > info I have got from the standard output. > This indicates some GC and possibly some RocksDB issues underneath but we > could not really figure out much more. > GC segfault > https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125 > Other crashes (maybe rocks related) > https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa > https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818 > The third link shows 2 issues that happened in parallel... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15625086#comment-15625086 ] Gyula Fora commented on FLINK-3659: --- Hi, Would this work in a way as connected streams work now but without the partitioning limitation and the Value state access would be "blocked" by some flag when we are in the method for processing the broadcast input. Or do we assume the broadcast state to be the same on all nodes for checkpointing? > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15625089#comment-15625089 ] Gyula Fora commented on FLINK-3659: --- (I have a workshop after lunch so might be slow to responds :() > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation
[ https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628205#comment-15628205 ] Gyula Fora commented on FLINK-3659: --- I agree that we should try to keep the API as it is now (if possible), and just do runtime checks whether the user is reading/updating the correct types of states from the correct inputs. This goes back to the direction of your original PR with some additional runtime checks. > Add ConnectWithBroadcast Operation > -- > > Key: FLINK-3659 > URL: https://issues.apache.org/jira/browse/FLINK-3659 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We should add a new operation that has a main input that can be keyed (but > doesn't have to be) and a second input that is always broadcast. This is > similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to > be keyed or non-keyed. > This builds on FLINK-4940 which aims at adding broadcast/global state. When > processing an element from the broadcast input only access to broadcast state > is allowed. When processing an element from the main input access both the > regular keyed state and the broadcast state can be accessed. > I'm proposing this as an intermediate/low-level operation because it will > probably take a while until we add support for side-inputs in the API. This > new operation would allow expressing new patterns that cannot be expressed > with the currently expressed operations. > This is the new proposed API (names are non-final): > 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and > {{KeyedStream.connectWithBroadcast(DataStream)}} > 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/ > 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user > functions. > Sketch of the user function: > {code} > interface BroadcastFlatMapFunction { > public void flatMap(IN in, Collector out); > public void processBroadcastInput(BIN in); > } > {code} > The API names, function names are a bit verbose and we have to add two new > different ones but I don't see a way around this with the current way the > Flink API works. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4992) Expose String parameter for timers in Timely functions and TimerService
Gyula Fora created FLINK-4992: - Summary: Expose String parameter for timers in Timely functions and TimerService Key: FLINK-4992 URL: https://issues.apache.org/jira/browse/FLINK-4992 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.2.0 Reporter: Gyula Fora Priority: Minor Currently it is very hard to register and execute multiple different types timers from the same user function because timers don't carry any metadata. We propose to extend the timer registration and onTimer logic by attaching a String argument so users of these features can implement functionality that depends on this addtitional metadata. The proposed new methods: In the TimerService: void registerProcessingTimeTimer(long time, String label); void registerEventTimeTimer(long time, String label); In the TimelyFunctions: void onTimer(long timestamp, String label, TimeDomain timeDomain, TimerService timerService...); This extended functionality can be mapped to a String namespace for the internal timer service. I suggest we don't use the term "namespace" here because it just complicates things for the users, I think "label" or "id" or "name" is much simpler to understand. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4992) Expose String parameter for timers in Timely functions and TimerService
[ https://issues.apache.org/jira/browse/FLINK-4992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630678#comment-15630678 ] Gyula Fora commented on FLINK-4992: --- Well, it doesn't have to be Strings but there should be a way of attaching some sort of metadata to the registered timer (like the namespace internally). Can you please explain what you meant by different timer services? Do you mean attaching metadata to the timerservice instead of the actual registered timer? > Expose String parameter for timers in Timely functions and TimerService > --- > > Key: FLINK-4992 > URL: https://issues.apache.org/jira/browse/FLINK-4992 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Minor > > Currently it is very hard to register and execute multiple different types > timers from the same user function because timers don't carry any metadata. > We propose to extend the timer registration and onTimer logic by attaching a > String argument so users of these features can implement functionality that > depends on this addtitional metadata. > The proposed new methods: > In the TimerService: > void registerProcessingTimeTimer(long time, String label); > void registerEventTimeTimer(long time, String label); > In the TimelyFunctions: > void onTimer(long timestamp, String label, TimeDomain timeDomain, > TimerService timerService...); > This extended functionality can be mapped to a String namespace for the > internal timer service. I suggest we don't use the term "namespace" here > because it just complicates things for the users, I think "label" or "id" or > "name" is much simpler to understand. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
Gyula Fora created FLINK-5985: - Summary: Flink treats every task as stateful (making topology changes impossible) Key: FLINK-5985 URL: https://issues.apache.org/jira/browse/FLINK-5985 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Gyula Fora Priority: Critical It seems that Flink treats every Task as stateful so changing the topology is not possible without setting uid on every single operator. If the topology has an iteration this is virtually impossible (or at least gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900980#comment-15900980 ] Gyula Fora commented on FLINK-5985: --- It should be possible to set it for *only* the stateful ones but it doesn work. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900995#comment-15900995 ] Gyula Fora commented on FLINK-5985: --- even something as simple as stream.filter(event -> true) requires a uid. Also the iteration tasks which are impossible to assign a uid for in the API > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5996) Jobmanager HA should not crash on lost ZK node
Gyula Fora created FLINK-5996: - Summary: Jobmanager HA should not crash on lost ZK node Key: FLINK-5996 URL: https://issues.apache.org/jira/browse/FLINK-5996 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.2.0 Reporter: Gyula Fora Even if there are multiple zk hosts configured the jobmanager crashes if one of them is lost: org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.flink.shaded.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) at org.apache.flink.shaded.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) at org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:477) at org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:302) at org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:291) at org.apache.flink.shaded.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) at org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.pathInForeground(GetDataBuilderImpl.java:288) at org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:279) at org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:41) at org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.readValue(SharedValue.java:244) at org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.access$100(SharedValue.java:44) at org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue$1.process(SharedValue.java:61) at org.apache.flink.shaded.org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498) We should have some retry logic there -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901105#comment-15901105 ] Gyula Fora commented on FLINK-5985: --- Hi Ufuk, I did exactly this yesterday: First got an error like: Cannot map savepoint state for operator xxx to the new program, because the operator is not available in the new program. Then I added the debugging to see which actual operator was misbehaving. It showed a stateless map, so I was like whatever and set the uidHash(xxx) and I tried again. Now it gave the same error for a different stateless operator (lets say a filter). I kept adding the hashes for the stateless operators until I reached to the point where it complained about not being able to map back the Iteration sink, that's when I gave up. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5996) Jobmanager HA should not crash on lost ZK node
[ https://issues.apache.org/jira/browse/FLINK-5996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901108#comment-15901108 ] Gyula Fora commented on FLINK-5996: --- That is correct. The jobmanager fails and recovers successfully but the running jobs are restarted. > Jobmanager HA should not crash on lost ZK node > -- > > Key: FLINK-5996 > URL: https://issues.apache.org/jira/browse/FLINK-5996 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > Even if there are multiple zk hosts configured the jobmanager crashes if one > of them is lost: > org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: > KeeperErrorCode = ConnectionLoss > at > org.apache.flink.shaded.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) > at > org.apache.flink.shaded.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) > at > org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:477) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:302) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:291) > at > org.apache.flink.shaded.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.pathInForeground(GetDataBuilderImpl.java:288) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:279) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:41) > at > org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.readValue(SharedValue.java:244) > at > org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.access$100(SharedValue.java:44) > at > org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue$1.process(SharedValue.java:61) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498) > We should have some retry logic there -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901424#comment-15901424 ] Gyula Fora commented on FLINK-5985: --- I was somehow assuming that this affects all jobs. Unfortunately I cannot send the program but I can try to reproduce it in a minimal example tomorrow. I know this used to work in 1.1 with pretty much the same job. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902973#comment-15902973 ] Gyula Fora commented on FLINK-6006: --- Thank you for looking into this! > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903110#comment-15903110 ] Gyula Fora commented on FLINK-5985: --- You can easily reproduce the bug locally by running the following simple program: https://gist.github.com/gyfora/b5ccf836dc9e95bab2b1f5f483cb8bf6 Take a savepoint and uncomment the stateless map before the sink to change the sink hash then restore. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots
[ https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010393#comment-16010393 ] Gyula Fora commented on FLINK-6537: --- Hi Stefan! Thank you for all the work :) I have tested your changes today and some of the issues have been resolved. Incremental checkpoints now succeed but cannot be restored from them (ether after cacnellation or under normal failure scenarios) Savepoints still fail with the previous error. And also there seems to be some error while discarding subsumed checkpoints. I am attaching the logs containing the errors: In the beginning you can see the failed savepoint, then a couple of successful checkpoints with the warnings and at the end I killed the TM to trigger a restore which lead to file not found errors in HDFS: https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086 I hope this helps :) Gyula > Umbrella issue for fixes to incremental snapshots > - > > Key: FLINK-6537 > URL: https://issues.apache.org/jira/browse/FLINK-6537 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > This issue tracks ongoing fixes in the incremental checkpointing feature for > the 1.3 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore
[ https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017374#comment-16017374 ] Gyula Fora commented on FLINK-6633: --- I have tried you fix, restore now works for incremental checkpoints but fails on the first checkpoint afterwards: org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 4. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:853) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:772) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: class org.apache.flink.runtime.state.PlaceholderStreamStateHandle at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66) at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199) at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:287) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:843) full log: https://gist.github.com/gyfora/cf1894158ddbc5bbba2c0cc70d69b505 > Register with shared state registry before adding to CompletedCheckpointStore > - > > Key: FLINK-6633 > URL: https://issues.apache.org/jira/browse/FLINK-6633 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > Fix For: 1.3.0 > > > Introducing placeholders for previously existing shared state requires a > change that shared state is first registering with {{SharedStateregistry}} > (thereby being consolidated) and only after that added to a > {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written > to stable storage. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots
[ https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019611#comment-16019611 ] Gyula Fora commented on FLINK-6537: --- Hi Stefan! I can confirm that the incremental checkpointing and recovery now seems to work as expected :) Great job on the fixes! Savepointing still gives the same error regardless whether incremental checkpointing is used or not. Is there a JIRA for tracking that issue? I have no idea what might cause it :( Thank you! Gyula > Umbrella issue for fixes to incremental snapshots > - > > Key: FLINK-6537 > URL: https://issues.apache.org/jira/browse/FLINK-6537 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > This issue tracks ongoing fixes in the incremental checkpointing feature for > the 1.3 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots
[ https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1601#comment-1601 ] Gyula Fora commented on FLINK-6537: --- You can look at all the logs I have posted. For example: https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086#file-gistfile1-txt-L197 This exact error happens at every single savepoint attempt. > Umbrella issue for fixes to incremental snapshots > - > > Key: FLINK-6537 > URL: https://issues.apache.org/jira/browse/FLINK-6537 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > This issue tracks ongoing fixes in the incremental checkpointing feature for > the 1.3 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots
[ https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1602#comment-1602 ] Gyula Fora commented on FLINK-6537: --- Thanks a lot for looking into this. I wont be able to verify any fix today as I am travelling to London, but I can try to do it tomorrow!!+ > Umbrella issue for fixes to incremental snapshots > - > > Key: FLINK-6537 > URL: https://issues.apache.org/jira/browse/FLINK-6537 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > This issue tracks ongoing fixes in the incremental checkpointing feature for > the 1.3 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6685) SafetyNetCloseableRegistry is closed prematurely in Task::triggerCheckpointBarrier
[ https://issues.apache.org/jira/browse/FLINK-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023140#comment-16023140 ] Gyula Fora commented on FLINK-6685: --- Thanks Stefan, awesome job! I can confirm that this solves our savepoint issues :) > SafetyNetCloseableRegistry is closed prematurely in > Task::triggerCheckpointBarrier > -- > > Key: FLINK-6685 > URL: https://issues.apache.org/jira/browse/FLINK-6685 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Blocker > > The {{SafetyNetCloseableRegistry}} is closed to early in > {{triggerCheckpointBarrier(...)}}. Right now, it seems like the code assumes > that {{statefulTask.triggerCheckpoint(...)}} is blocking - which it is not. > Like this, the registry can be closed while the checkpoint is still running. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6742) Savepoint conversion might fail if task ids change
Gyula Fora created FLINK-6742: - Summary: Savepoint conversion might fail if task ids change Key: FLINK-6742 URL: https://issues.apache.org/jira/browse/FLINK-6742 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Gyula Fora Priority: Critical Caused by: java.lang.NullPointerException at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171) at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6742) Savepoint conversion might fail if operators change
[ https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-6742: -- Summary: Savepoint conversion might fail if operators change (was: Savepoint conversion might fail if task ids change) > Savepoint conversion might fail if operators change > --- > > Key: FLINK-6742 > URL: https://issues.apache.org/jira/browse/FLINK-6742 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Gyula Fora >Priority: Critical > > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change
[ https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027477#comment-16027477 ] Gyula Fora commented on FLINK-6742: --- what's the reason for not allowing the removal / new operators here? > Savepoint conversion might fail if operators change > --- > > Key: FLINK-6742 > URL: https://issues.apache.org/jira/browse/FLINK-6742 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Gyula Fora >Priority: Critical > > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change
[ https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027495#comment-16027495 ] Gyula Fora commented on FLINK-6742: --- In my case I would like to remove 2 operators while migrating because the state for those is not compatible (basically just changing the uids for those). In this case it actually becomes a techincal hurdle :D > Savepoint conversion might fail if operators change > --- > > Key: FLINK-6742 > URL: https://issues.apache.org/jira/browse/FLINK-6742 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Gyula Fora >Priority: Critical > > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change
[ https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027498#comment-16027498 ] Gyula Fora commented on FLINK-6742: --- They are incompatible due to some custom state backend code, that's my problem really :) I like option 2, but now I just went with adding an extra null check to the conversion step to avoid the nullpointer. > Savepoint conversion might fail if operators change > --- > > Key: FLINK-6742 > URL: https://issues.apache.org/jira/browse/FLINK-6742 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Gyula Fora >Priority: Critical > > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6742) Savepoint conversion might fail if operators change
[ https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-6742: -- Priority: Minor (was: Critical) > Savepoint conversion might fail if operators change > --- > > Key: FLINK-6742 > URL: https://issues.apache.org/jira/browse/FLINK-6742 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Gyula Fora >Priority: Minor > > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change
[ https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027499#comment-16027499 ] Gyula Fora commented on FLINK-6742: --- Thank you for the help Chesnay! Should we close this JIRA? > Savepoint conversion might fail if operators change > --- > > Key: FLINK-6742 > URL: https://issues.apache.org/jira/browse/FLINK-6742 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Gyula Fora >Priority: Critical > > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6754) Savepoints don't respect client timeout config
Gyula Fora created FLINK-6754: - Summary: Savepoints don't respect client timeout config Key: FLINK-6754 URL: https://issues.apache.org/jira/browse/FLINK-6754 Project: Flink Issue Type: Bug Components: Client, Configuration Reporter: Gyula Fora Priority: Trivial Savepoints have a hardcoded timeout: Future response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)), new FiniteDuration(1, TimeUnit.HOURS)); . . . result = Await.result(response, FiniteDuration.Inf()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-1313) Add support for out-of-place aggregations for streaming
[ https://issues.apache.org/jira/browse/FLINK-1313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-1313. - Resolution: Won't Fix > Add support for out-of-place aggregations for streaming > --- > > Key: FLINK-1313 > URL: https://issues.apache.org/jira/browse/FLINK-1313 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Minor > > There is an ongoing effort to implement a new aggregation api for batch > processing: https://issues.apache.org/jira/browse/FLINK-1293 > The streaming api should implement the same aggregation logic as well to keep > the two apis as close as possible. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6755) Allow triggering Checkpoints through command line client
Gyula Fora created FLINK-6755: - Summary: Allow triggering Checkpoints through command line client Key: FLINK-6755 URL: https://issues.apache.org/jira/browse/FLINK-6755 Project: Flink Issue Type: New Feature Components: Client, State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Gyula Fora The command line client currently only allows triggering (and canceling with) Savepoints. While this is good if we want to fork or modify the pipelines in a non-checkpoint compatible way, now with incremental checkpoints this becomes wasteful for simple job restarts/pipeline updates. I suggest we add a new command: ./bin/flink checkpoint [checkpointDirectory] and a new flag -c for the cancel command to indicate we want to trigger a checkpoint: ./bin/flink cancel -c [targetDirectory] Otherwise this can work similar to the current savepoint taking logic, we could probably even piggyback on the current messages by adding boolean flag indicating whether it should be a savepoint or a checkpoint. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6754) Savepoints don't respect client timeout config
[ https://issues.apache.org/jira/browse/FLINK-6754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-6754: -- Affects Version/s: 1.3.0 1.2.1 > Savepoints don't respect client timeout config > -- > > Key: FLINK-6754 > URL: https://issues.apache.org/jira/browse/FLINK-6754 > Project: Flink > Issue Type: Bug > Components: Client, Configuration >Affects Versions: 1.3.0, 1.2.1 >Reporter: Gyula Fora >Priority: Trivial > > Savepoints have a hardcoded timeout: > Future response = jobManager.ask(new TriggerSavepoint(jobId, > Option.apply(savepointDirectory)), new FiniteDuration(1, TimeUnit.HOURS)); > . > . > . > result = Await.result(response, FiniteDuration.Inf()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6762) Cannot rescale externalized incremental checkpoints
Gyula Fora created FLINK-6762: - Summary: Cannot rescale externalized incremental checkpoints Key: FLINK-6762 URL: https://issues.apache.org/jira/browse/FLINK-6762 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Gyula Fora Priority: Critical When a job is rescaled from an externalized incremental checkpoint, the subsequent checkpoints fail with the following error: org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 3205. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:861) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:776) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: class org.apache.flink.runtime.state.PlaceholderStreamStateHandle at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66) at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199) at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164) at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:286) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:851) Full log: https://gist.github.com/gyfora/693b9a720aace843ff4570e504c4a242 Rescaling with savepoints work. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-10387) StateBackend create methods should return interface not abstract KeyedStateBackend classes
Gyula Fora created FLINK-10387: -- Summary: StateBackend create methods should return interface not abstract KeyedStateBackend classes Key: FLINK-10387 URL: https://issues.apache.org/jira/browse/FLINK-10387 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.6.0 Reporter: Gyula Fora Currently the createKeyedStateBackend(...) methods return AbstractKeyedStateBackend instead of an interface. This makes it virtually impossible to write nice extensions to StateBackends that add additional functionality to existing backends while delegating other method calls. It should be easy enough to add a new interface that extends everything that the AbstractKeyedStateBackend does and the method should return that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-1003) Spread out scheduling strategy
[ https://issues.apache.org/jira/browse/FLINK-1003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-1003. - Resolution: Won't Fix > Spread out scheduling strategy > -- > > Key: FLINK-1003 > URL: https://issues.apache.org/jira/browse/FLINK-1003 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Gyula Fora >Priority: Major > > Currently the Flink scheduler tries to fill one instance completely before > the tasks are deployed to another instance. This is a good behaviour in > multi-user and multi-job scenarios but it wastes resources if one wants to > use the complete cluster. Therefore, another scheduling strategy where the > load among the different instances is kept balanced might be useful. This > spread out strategy will deploy the tasks such that the overall work is > equally distributed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks
[ https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635168#comment-16635168 ] Gyula Fora commented on FLINK-9635: --- Should we consider this issue a blocker? I know the proper fix is very hard and a lot of effort but the current state is very unsafe as well. > Local recovery scheduling can cause spread out of tasks > --- > > Key: FLINK-9635 > URL: https://issues.apache.org/jira/browse/FLINK-9635 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.7.0 > > > In order to make local recovery work, Flink's scheduling was changed such > that it tries to be rescheduled to its previous location. In order to not > occupy slots which have state of other tasks cached, the strategy will > request a new slot if the old slot identified by the previous allocation id > is no longer present. This also applies to newly allocated slots because > there is no distinction between new or already used. This behaviour can cause > that every tasks gets deployed to its own slot if the {{SlotPool}} has > released all slots in the meantime, for example. The consequence could be > that a job can no longer be executed after a failure because it needs more > slots than before. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917819#comment-16917819 ] Gyula Fora commented on FLINK-13874: I added some extra logging to figure out what the problem is. What seems to happen is that the lease revoke timeout is reached before before we move to the truncate, so the file is never closed before truncating: [https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L341] I wonder what might cause it to take so long, and what can we do in these cases. Probably throw an error or at least log a warning? > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.ap
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917833#comment-16917833 ] Gyula Fora commented on FLINK-13874: And while the lease revoking process seem to time out for certain files, for others it runs in 1-2 seconds, within the same taskmanager. > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.secur
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918043#comment-16918043 ] Gyula Fora commented on FLINK-13874: I increased the lease timeout to 5 minutes, it still failed sometimes but it could recover afterwards. Namenode logs don't seem to contain any additional information on what happened, why it was slow. It just looks normal... > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) >
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918526#comment-16918526 ] Gyula Fora commented on FLINK-13874: This seems like an issue with HDFS as the lease cannot be recovered for certain files for a long period of time. Something with the Block recovery process might be off. These files cannot be even closed manually using the hdfs client. Only a restart of HDFS solves the problem in many cases > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) >
[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918528#comment-16918528 ] Gyula Fora commented on FLINK-13874: We will investigate this further in the future to see what in HDFS might cause it, I will close this issue for now. > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(
[jira] [Resolved] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly
[ https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora resolved FLINK-13874. Resolution: Won't Fix This seems to be an issue with HDFS or HDFS configuration, I will close this issue until further investigation is done on the HDFS side. > StreamingFileSink fails to recover (truncate) properly > -- > > Key: FLINK-13874 > URL: https://issues.apache.org/jira/browse/FLINK-13874 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Gyula Fora >Priority: Blocker > > It seems that there might be some problem with the truncate / recovery logic > for the HadoopRecoverableFsDataOutputStream. > I keep hitting the following error: > > {noformat} > java.io.IOException: Problem while truncating file: > hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89) > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) > at > org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to TRUNCATE_FILE > /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d > for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because > DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) > at > org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.
[jira] [Commented] (FLINK-13566) Support checkpoint configuration through flink-conf.yaml
[ https://issues.apache.org/jira/browse/FLINK-13566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923145#comment-16923145 ] Gyula Fora commented on FLINK-13566: Let's wait until the discussion of FLIP-59 [https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration] is over :) > Support checkpoint configuration through flink-conf.yaml > > > Key: FLINK-13566 > URL: https://issues.apache.org/jira/browse/FLINK-13566 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / Configuration >Reporter: Gyula Fora >Priority: Major > > Currently basic checkpointing configuration happens through the > StreamExecutionEnvironment and the CheckpointConfig class. > There is no way to configure checkpointing behaviour purely from the > flink-conf.yaml file (or provide a default checkpointing behaviour) as it > always needs to happen programmatically through the environment. > The checkpoint config settings are then translated down to the > CheckpointCoordinatorConfiguration which will control the runtime behaviour. > As checkpointing related settings are operational features that should not > affect the application logic I think we need to support configuring these > params through the flink-conf yaml. > In order to do this we probably need to rework the CheckpointConfig class so > that it distinguishes parameters that the user actually set from the defaults > (to support overriding what was set in the conf). -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-13979) Translate new streamfilesink docs to chinese
Gyula Fora created FLINK-13979: -- Summary: Translate new streamfilesink docs to chinese Key: FLINK-13979 URL: https://issues.apache.org/jira/browse/FLINK-13979 Project: Flink Issue Type: New Feature Components: chinese-translation, Documentation Reporter: Gyula Fora The StreamFileSink docs have been reworked as part of FLINK-13842 -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Closed] (FLINK-13842) Improve Javadocs and web documentation of the StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-13842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-13842. -- Resolution: Fixed > Improve Javadocs and web documentation of the StreamingFileSink > --- > > Key: FLINK-13842 > URL: https://issues.apache.org/jira/browse/FLINK-13842 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem, Documentation >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Both the javadocs and the web docs of the StreamingFileSink and associated > components should be improved for a smooth user experience with more detailed > explanations and examples. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic
Gyula Fora created FLINK-14002: -- Summary: FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic Key: FLINK-14002 URL: https://issues.apache.org/jira/browse/FLINK-14002 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Gyula Fora When the KafkaSerializationSchema is used the user has the to provide the topic always when they create the ProducerRecord. The defaultTopic specified in the constructor (and enforced not to be null) will always be ignored, this is very misleading. We should depracate these constructors and create new ones without defaultTopic. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic
[ https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16925398#comment-16925398 ] Gyula Fora commented on FLINK-14002: Actually the default topic is used in case of KafkaContextAware serialization schemas which might cause further problems. > FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt > take default topic > -- > > Key: FLINK-14002 > URL: https://issues.apache.org/jira/browse/FLINK-14002 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Gyula Fora >Priority: Major > > When the KafkaSerializationSchema is used the user has the to provide the > topic always when they create the ProducerRecord. > The defaultTopic specified in the constructor (and enforced not to be null) > will always be ignored, this is very misleading. > We should depracate these constructors and create new ones without > defaultTopic. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing
[ https://issues.apache.org/jira/browse/FLINK-4873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16926473#comment-16926473 ] Gyula Fora commented on FLINK-4873: --- Hm, looks like I had the change commit on my master branch which I have overwritten recently... but we can definitely add this feature. Should be simple enough :) > Add config option to specify "home directory" for YARN client resource sharing > -- > > Key: FLINK-4873 > URL: https://issues.apache.org/jira/browse/FLINK-4873 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.1.3, 1.2.0 >Reporter: Gyula Fora >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The YARN client currently uses FileSystem.getHomeDirectory() to store the jar > files that needed to be shared on the cluster. This pretty much forces users > to run HDFS or something compatible with the Hadoop FS api on the cluster. > In some production environments file systems (distributed or simply shared) > are simply mounted under the same path and do not require the use of the > hadoop api for convenience. If we want to run Flink on YARN in these cases we > would need to be able to define the "home directory" where Flink should copy > the files for sharing. > It could be something like: > yarn.resource.upload.dir in the flink-conf.yaml -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing
[ https://issues.apache.org/jira/browse/FLINK-4873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-4873: - Assignee: Matyas Orhidi > Add config option to specify "home directory" for YARN client resource sharing > -- > > Key: FLINK-4873 > URL: https://issues.apache.org/jira/browse/FLINK-4873 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.1.3, 1.2.0 >Reporter: Gyula Fora >Assignee: Matyas Orhidi >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The YARN client currently uses FileSystem.getHomeDirectory() to store the jar > files that needed to be shared on the cluster. This pretty much forces users > to run HDFS or something compatible with the Hadoop FS api on the cluster. > In some production environments file systems (distributed or simply shared) > are simply mounted under the same path and do not require the use of the > hadoop api for convenience. If we want to run Flink on YARN in these cases we > would need to be able to define the "home directory" where Flink should copy > the files for sharing. > It could be something like: > yarn.resource.upload.dir in the flink-conf.yaml -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment
Gyula Fora created FLINK-14048: -- Summary: Flink client hangs after trying to kill Yarn Job during deployment Key: FLINK-14048 URL: https://issues.apache.org/jira/browse/FLINK-14048 Project: Flink Issue Type: Improvement Components: Client / Job Submission, Deployment / YARN Reporter: Gyula Fora If we kill the flink client run command from the terminal while deploying to YARN (let's say we realize we used the wrong parameters), the YARN application will be killed immediately but the client won't shut down. We get the following messages over and over: 19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The client is stopped, while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 failover attempts. Trying to failover after sleeping for 16296ms. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment
[ https://issues.apache.org/jira/browse/FLINK-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927313#comment-16927313 ] Gyula Fora commented on FLINK-14048: yes it was in per-job mode > Flink client hangs after trying to kill Yarn Job during deployment > -- > > Key: FLINK-14048 > URL: https://issues.apache.org/jira/browse/FLINK-14048 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Deployment / YARN >Reporter: Gyula Fora >Priority: Major > > If we kill the flink client run command from the terminal while deploying to > YARN (let's say we realize we used the wrong parameters), the YARN > application will be killed immediately but the client won't shut down. > We get the following messages over and over: > 19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The > client is stopped, while invoking > ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 > failover attempts. Trying to failover after sleeping for 16296ms. > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic
[ https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928513#comment-16928513 ] Gyula Fora commented on FLINK-14002: I am bit busy in the next few days but I can definitely fix this before the next release if no one picks it up earlier :) > FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt > take default topic > -- > > Key: FLINK-14002 > URL: https://issues.apache.org/jira/browse/FLINK-14002 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Gyula Fora >Priority: Major > > When the KafkaSerializationSchema is used the user has the to provide the > topic always when they create the ProducerRecord. > The defaultTopic specified in the constructor (and enforced not to be null) > will always be ignored, this is very misleading. > We should depracate these constructors and create new ones without > defaultTopic. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (FLINK-14145) getLatestCheckpoint(true) returns wrong checkpoint
[ https://issues.apache.org/jira/browse/FLINK-14145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-14145: -- Assignee: Gyula Fora > getLatestCheckpoint(true) returns wrong checkpoint > -- > > Key: FLINK-14145 > URL: https://issues.apache.org/jira/browse/FLINK-14145 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.0 >Reporter: Ufuk Celebi >Assignee: Gyula Fora >Priority: Major > Attachments: > 0001-FLINK-14145-runtime-Add-getLatestCheckpoint-test.patch > > > The flag to prefer checkpoints for recovery introduced in FLINK-11159 returns > the wrong checkpoint if: > * checkpoints are preferred ({{getLatestCheckpoint(true)}}), > * the latest checkpoint is *not* a savepoint, > * more than a single checkpoint is retained. > The current implementation assumes that the latest checkpoint is a savepoint > and skips over it. I attached a patch for > {{StandaloneCompletedCheckpointStoreTest}} that demonstrates the issue. > You can apply the patch via {{git am -3 < *.patch}}. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14145) getLatestCheckpoint(true) returns wrong checkpoint
[ https://issues.apache.org/jira/browse/FLINK-14145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934222#comment-16934222 ] Gyula Fora commented on FLINK-14145: Thanks [~uce] for catching this, I should have tested this feature more thoroughly > getLatestCheckpoint(true) returns wrong checkpoint > -- > > Key: FLINK-14145 > URL: https://issues.apache.org/jira/browse/FLINK-14145 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.0 >Reporter: Ufuk Celebi >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-14145-runtime-Add-getLatestCheckpoint-test.patch > > Time Spent: 10m > Remaining Estimate: 0h > > The flag to prefer checkpoints for recovery introduced in FLINK-11159 returns > the wrong checkpoint if: > * checkpoints are preferred ({{getLatestCheckpoint(true)}}), > * the latest checkpoint is *not* a savepoint, > * more than a single checkpoint is retained. > The current implementation assumes that the latest checkpoint is a savepoint > and skips over it. I attached a patch for > {{StandaloneCompletedCheckpointStoreTest}} that demonstrates the issue. > You can apply the patch via {{git am -3 < *.patch}}. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-14145) getLatestCheckpoint(true) returns wrong checkpoint
[ https://issues.apache.org/jira/browse/FLINK-14145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora resolved FLINK-14145. Resolution: Fixed > getLatestCheckpoint(true) returns wrong checkpoint > -- > > Key: FLINK-14145 > URL: https://issues.apache.org/jira/browse/FLINK-14145 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.0 >Reporter: Ufuk Celebi >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Attachments: > 0001-FLINK-14145-runtime-Add-getLatestCheckpoint-test.patch > > Time Spent: 20m > Remaining Estimate: 0h > > The flag to prefer checkpoints for recovery introduced in FLINK-11159 returns > the wrong checkpoint if: > * checkpoints are preferred ({{getLatestCheckpoint(true)}}), > * the latest checkpoint is *not* a savepoint, > * more than a single checkpoint is retained. > The current implementation assumes that the latest checkpoint is a savepoint > and skips over it. I attached a patch for > {{StandaloneCompletedCheckpointStoreTest}} that demonstrates the issue. > You can apply the patch via {{git am -3 < *.patch}}. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6755) Allow triggering Checkpoints through command line client
[ https://issues.apache.org/jira/browse/FLINK-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840198#comment-16840198 ] Gyula Fora commented on FLINK-6755: --- I agree with the leaked abstraction, I dont see this as a problem though. [~yanghua] I think selecting either automatic/manual is a bit too restrictive. The periodic checkpoints are nice, but sometimes manually triggering one right before a restart is just a great feature. > Allow triggering Checkpoints through command line client > > > Key: FLINK-6755 > URL: https://issues.apache.org/jira/browse/FLINK-6755 > Project: Flink > Issue Type: New Feature > Components: Command Line Client, Runtime / Checkpointing >Affects Versions: 1.3.0 >Reporter: Gyula Fora >Assignee: vinoyang >Priority: Major > > The command line client currently only allows triggering (and canceling with) > Savepoints. > While this is good if we want to fork or modify the pipelines in a > non-checkpoint compatible way, now with incremental checkpoints this becomes > wasteful for simple job restarts/pipeline updates. > I suggest we add a new command: > ./bin/flink checkpoint [checkpointDirectory] > and a new flag -c for the cancel command to indicate we want to trigger a > checkpoint: > ./bin/flink cancel -c [targetDirectory] > Otherwise this can work similar to the current savepoint taking logic, we > could probably even piggyback on the current messages by adding boolean flag > indicating whether it should be a savepoint or a checkpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)