[jira] [Commented] (FLINK-4391) Provide support for asynchronous operations over streams

2016-08-14 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420638#comment-15420638
 ] 

Gyula Fora commented on FLINK-4391:
---

We have also done something similar at King for handling heavy db operations in 
the following way:

We had a parallel processing operator that could process in both the main 
thread (like now) or in 1 background thread.
there are dedicated processing methods for both and the user code decides for 
every record where to process it. Either in the main thread in a blocking way 
or send it to the background thread queue. 

We also set a size limit on the number of queued elements.

State access and knowing what keys to set in the backend becomes a little 
tricky if we want to keel good performance.

> Provide support for asynchronous operations over streams
> 
>
> Key: FLINK-4391
> URL: https://issues.apache.org/jira/browse/FLINK-4391
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Jamie Grier
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4413) Improve savepoint restore error messages

2016-08-17 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4413:
-

 Summary: Improve savepoint restore error messages
 Key: FLINK-4413
 URL: https://issues.apache.org/jira/browse/FLINK-4413
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing, Streaming
Affects Versions: 1.1.0
Reporter: Gyula Fora
Priority: Minor


Currently when savepoint restore fails due to some problems with parallelism or 
the assigned uids the error messages contain all the job vertex id of the 
problematic task.

This makes these kind of problems very difficult to debug for more complex 
topologies.

I propose to add the user assigned task names to these error messages to make 
this much easier for users.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4413) Improve savepoint restore error messages

2016-08-17 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-4413:
--
Description: 
Currently when savepoint restore fails due to some problems with parallelism or 
the assigned uids the error messages contain only the job vertex id of the 
problematic task.

This makes these kind of problems very difficult to debug for more complex 
topologies.

I propose to add the user assigned task names to these error messages to make 
this much easier for users.

  was:
Currently when savepoint restore fails due to some problems with parallelism or 
the assigned uids the error messages contain all the job vertex id of the 
problematic task.

This makes these kind of problems very difficult to debug for more complex 
topologies.

I propose to add the user assigned task names to these error messages to make 
this much easier for users.


> Improve savepoint restore error messages
> 
>
> Key: FLINK-4413
> URL: https://issues.apache.org/jira/browse/FLINK-4413
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.1.0
>Reporter: Gyula Fora
>Priority: Minor
>
> Currently when savepoint restore fails due to some problems with parallelism 
> or the assigned uids the error messages contain only the job vertex id of the 
> problematic task.
> This makes these kind of problems very difficult to debug for more complex 
> topologies.
> I propose to add the user assigned task names to these error messages to make 
> this much easier for users.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4441) RocksDB statebackend makes all operators appear stateful

2016-08-21 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4441:
-

 Summary: RocksDB statebackend makes all operators appear stateful
 Key: FLINK-4441
 URL: https://issues.apache.org/jira/browse/FLINK-4441
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Streaming
Affects Versions: 1.1.1
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Blocker


When the state is empty the rocks db state backend returns an empty hashmap 
instead of a null in the snapshotPartitionedState method.

This means that these operators always appear stateful to the flink runtime 
which makes it impossible for instance to remove a stateless operator using the 
savepoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4445) Ignore unmatched state when restoring from savepoint

2016-08-22 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430336#comment-15430336
 ] 

Gyula Fora commented on FLINK-4445:
---

Hi Ufuk,

My personal experience is that it's very easy to run into mistakes when dealing 
with more complex stateful job such as forget uids on kafka source/sink and 
other built-in stateful operators.

Ignoring the unmatched state by default would be super dangerous and would have 
caused me serious issues in the past. I think adding a force ignore flag 
(option 1) would be the good way to go and is also very useful :)

Cheers,
Gyula 

> Ignore unmatched state when restoring from savepoint
> 
>
> Key: FLINK-4445
> URL: https://issues.apache.org/jira/browse/FLINK-4445
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.1
>Reporter: Ufuk Celebi
>
> When currently submitting a job with a savepoint, we require that all state 
> is matched to the new job. Many users have noted that this is overly strict. 
> I would like to loosen this and allow savepoints to be restored without 
> matching all state.
> The following options come to mind:
> (1) Keep the current behaviour, but add a flag to allow ignoring state when 
> restoring, e.g. {{bin/flink -s  --ignoreUnmatchedState}}. This 
> would be non-API breaking.
> (2) Ignore unmatched state and continue. Additionally add a flag to be strict 
> about checking the state, e.g. {{bin/flink -s  --strict}}. This 
> would be API-breaking as the default behaviour would change. Users might be 
> confused by this because there is no straight forward way to notice that 
> nothing has been restored.
> I'm not sure what's the best thing here. [~gyfora], [~aljoscha] What do you 
> think?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4441) RocksDB statebackend makes all operators appear stateful

2016-08-24 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora resolved FLINK-4441.
---
Resolution: Fixed

> RocksDB statebackend makes all operators appear stateful
> 
>
> Key: FLINK-4441
> URL: https://issues.apache.org/jira/browse/FLINK-4441
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.1.1
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
>
> When the state is empty the rocks db state backend returns an empty hashmap 
> instead of a null in the snapshotPartitionedState method.
> This means that these operators always appear stateful to the flink runtime 
> which makes it impossible for instance to remove a stateless operator using 
> the savepoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4471) Rocks backend fully async snapshot restore fails with custom serializers

2016-08-24 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4471:
-

 Summary: Rocks backend fully async snapshot restore fails with 
custom serializers
 Key: FLINK-4471
 URL: https://issues.apache.org/jira/browse/FLINK-4471
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Streaming
Affects Versions: 1.1.0
Reporter: Gyula Fora
Priority: Critical


The StateDescriptor is not deserialized with the usercode classloader. This 
makes restore fail with NoClassDefFound error if the user uses custom type 
serializers:

https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L583



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4471) Rocks backend fully async snapshot restore fails with custom serializers

2016-08-24 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-4471:
-

Assignee: Gyula Fora

> Rocks backend fully async snapshot restore fails with custom serializers
> 
>
> Key: FLINK-4471
> URL: https://issues.apache.org/jira/browse/FLINK-4471
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.1.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>
> The StateDescriptor is not deserialized with the usercode classloader. This 
> makes restore fail with NoClassDefFound error if the user uses custom type 
> serializers:
> https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L583



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4471) Rocks backend fully async snapshot restore fails with custom serializers

2016-08-25 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-4471.
-
Resolution: Won't Fix

> Rocks backend fully async snapshot restore fails with custom serializers
> 
>
> Key: FLINK-4471
> URL: https://issues.apache.org/jira/browse/FLINK-4471
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.1.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Critical
>
> The StateDescriptor is not deserialized with the usercode classloader. This 
> makes restore fail with NoClassDefFound error if the user uses custom type 
> serializers:
> https://github.com/apache/flink/blob/master/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L583



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-23 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938170#comment-15938170
 ] 

Gyula Fora commented on FLINK-6006:
---

I have a slight feeling that this didnt completely solve the issue.

I saw this occur again today using the 0.8 connector from the 1.2 branch built 
with the fix.

> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-23 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938297#comment-15938297
 ] 

Gyula Fora commented on FLINK-6006:
---

I tried restarting the consumer when I was sure that the brokers have 
completely recovered and that still didnt help.

> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-23 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938567#comment-15938567
 ] 

Gyula Fora commented on FLINK-6006:
---

Unfortunately I only have the JM log and that doesnt seem to contain any 
relevant information :(

> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-24 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940010#comment-15940010
 ] 

Gyula Fora commented on FLINK-6006:
---

Don't be sorry, there is a high probability that I did something bad, or our 
kafka is screwed up in weird ways :) Adding some logging for this (maybe Info 
level) would probably help if this comes up again. Thanks for all the work

> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6262) UnknownTopicOrPartitionException Kafka consumer error on broker restart/failure

2017-04-04 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6262:
-

 Summary: UnknownTopicOrPartitionException Kafka consumer error on 
broker restart/failure
 Key: FLINK-6262
 URL: https://issues.apache.org/jira/browse/FLINK-6262
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.1.4, 1.2.0
Reporter: Gyula Fora


The Kafka consumer fails on broker restarts/failures with the following error:

java.io.IOException: Error while fetching from broker 'Node(22, 
kafka22.sto.midasplayer.com, 9092)': 
Exception for event.bifrost.log:10: 
kafka.common.UnknownTopicOrPartitionException
at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
Source)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.lang.Class.newInstance(Class.java:442)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
at 
org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:292)

at 
org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:313)

We should have some restart logic around this



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6263) Leader error in Kafka producer on leader change (broker restart/failrue)

2017-04-04 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6263:
-

 Summary: Leader error in Kafka producer on leader change (broker 
restart/failrue)
 Key: FLINK-6263
 URL: https://issues.apache.org/jira/browse/FLINK-6263
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.2.0
Reporter: Gyula Fora


We have observed the following error in the Kafka producer

java.lang.Exception: Failed to send data to Kafka: This server is not the 
leader for that topic-partition.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:376)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:293)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
server is not the leader for that topic-partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6264) Kafka consumer fails if can't find leader for partition

2017-04-04 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6264:
-

 Summary: Kafka consumer fails if can't find leader for partition
 Key: FLINK-6264
 URL: https://issues.apache.org/jira/browse/FLINK-6264
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.2.0
Reporter: Gyula Fora


We have observed the following error many times when brokers failed/were 
restarted:

java.lang.RuntimeException: Unable to find a leader for partitions: [Partition: 
KafkaTopicPartition{topic='mytopic', partition=10}, 
KafkaPartitionHandle=[mytopic,10], offset=-1]
at 
org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474)
at 
org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5068) YARN HA: Job scheduled before TM is up leading to "Not enough free slots" error

2016-11-15 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-5068:
-

 Summary: YARN HA: Job scheduled before TM is up leading to "Not 
enough free slots" error
 Key: FLINK-5068
 URL: https://issues.apache.org/jira/browse/FLINK-5068
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.1.3
Reporter: Gyula Fora


On one occasion after a job manager failure, the job was could not be recovered 
as it was scheduled before the TM was up causing an unrecoverable failure.

So I ended up with a running yarn jm + tm with enough slots with a job failed 
(and not restarting)

Please drop me a mail for the logs :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-15 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-5071:
-

 Summary: YARN: yarn.containers.vcores config not respected when 
checking for vcores
 Key: FLINK-5071
 URL: https://issues.apache.org/jira/browse/FLINK-5071
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Reporter: Gyula Fora
 Fix For: 1.1.3


The YarnClient validates whether the number of task slots is less then the max 
vcores settings of yarn but seems to ignore the yarn.containers.vcores flink 
config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-11-15 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667283#comment-15667283
 ] 

Gyula Fora commented on FLINK-4182:
---

Ah sorry those jobs were 1.1.3 (I am not sure if the error was fixed since then)

> HA recovery not working properly under ApplicationMaster failures.
> --
>
> Key: FLINK-4182
> URL: https://issues.apache.org/jira/browse/FLINK-4182
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Priority: Blocker
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>   at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch

[jira] [Commented] (FLINK-4182) HA recovery not working properly under ApplicationMaster failures.

2016-11-15 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667277#comment-15667277
 ] 

Gyula Fora commented on FLINK-4182:
---

Hi, 

I have observed the same scenario in many of my jobs under AM failiures, I 
saved some logs 

I am running 1.1.4

> HA recovery not working properly under ApplicationMaster failures.
> --
>
> Key: FLINK-4182
> URL: https://issues.apache.org/jira/browse/FLINK-4182
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stefan Richter
>Priority: Blocker
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>   at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(M

[jira] [Commented] (FLINK-5071) YARN: yarn.containers.vcores config not respected when checking for vcores

2016-11-21 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683135#comment-15683135
 ] 

Gyula Fora commented on FLINK-5071:
---

Both actually.

The problem is here:
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L308

which only compares the allowed number of vcores (in yarn config) with the 
number of slots, assuming that requested vcores == numslots

> YARN: yarn.containers.vcores config not respected when checking for vcores
> --
>
> Key: FLINK-5071
> URL: https://issues.apache.org/jira/browse/FLINK-5071
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Gyula Fora
> Fix For: 1.1.3
>
>
> The YarnClient validates whether the number of task slots is less then the 
> max vcores settings of yarn but seems to ignore the yarn.containers.vcores 
> flink config which should be used instead of the slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3397) Failed streaming jobs should fall back to the most recent checkpoint/savepoint

2016-06-23 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346080#comment-15346080
 ] 

Gyula Fora commented on FLINK-3397:
---

You are always free to give it a shot, to be honest I am not perfectly sure 
wether other people are working in this direction. Maybe [~uce] can help me out 
here as he knows well what's going on with savepoints...

> Failed streaming jobs should fall back to the most recent checkpoint/savepoint
> --
>
> Key: FLINK-3397
> URL: https://issues.apache.org/jira/browse/FLINK-3397
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Gyula Fora
>Priority: Minor
>
> The current fallback behaviour in case of a streaming job failure is slightly 
> counterintuitive:
> If a job fails it will fall back to the most recent checkpoint (if any) even 
> if there were more recent savepoint taken. This means that savepoints are not 
> regarded as checkpoints by the system only points from where a job can be 
> manually restarted.
> I suggest to change this so that savepoints are also regarded as checkpoints 
> in case of a failure and they will also be used to automatically restore the 
> streaming job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4120) Lightweight fault tolerance through recomputing lost state

2016-06-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350565#comment-15350565
 ] 

Gyula Fora commented on FLINK-4120:
---

Hi,

Thanks for the interesting proposal. I have several questions that popped into 
my mind while trying to understand this:

How would you know what states can be "easily recomputed"? I think in many 
cases the state is computed over a long period of time (weeks/months) and the 
overhead of checkpointing is irrelevant compared to restreaming the records.

Let's assume you can recompute a state for some operators but others would 
restore from their checkpoints. As you described these would wait for the 
latest checkpoint barreier before continuing to process records, however the 
barriers are not deterministic so this would not work.

This also assumes a lot about the persistent storage, as in many cases Kafka 
for instance does not hold the data forever (or long enough for the null 
checkpoint).

Cheers,
Gyula

> Lightweight fault tolerance through recomputing lost state
> --
>
> Key: FLINK-4120
> URL: https://issues.apache.org/jira/browse/FLINK-4120
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Dénes Vadász
>
> The current fault tolerance mechanism requires that stateful operators write 
> their internal state to stable storage during a checkpoint. 
> This proposal aims at optimizing out this operation in the cases where the 
> operator state can be recomputed from a finite (practically: small) set of 
> source records, and those records are already on checkpoint-aware persistent 
> storage (e.g. in Kafka). 
> The rationale behind the proposal is that the cost of reprocessing is paid 
> only on recovery from (presumably rare) failures, whereas the cost of 
> persisting the state is paid on every checkpoint. Eliminating the need for 
> persistent storage will also simplify system setup and operation.
> In the cases where this optimisation is applicable, the state of the 
> operators can be restored by restarting the pipeline from a checkpoint taken 
> before the pipeline ingested any of the records required for the state 
> re-computation of the operators (call this the "null-state checkpoint"), as 
> opposed to a restart from the "latest checkpoint". 
> The "latest checkpoint" is still relevant for the recovery: the barriers 
> belonging to that checkpoint must be inserted into the source streams in the 
> position they were originally inserted. Sinks must discard all records until 
> this barrier reaches them.
> Note the inherent relationship between the "latest" and the "null-state" 
> checkpoints: the pipeline must be restarted from the latter to restore the 
> state at the former.
> For the stateful operators for which this optimization is applicable we can 
> define the notion of "current null-state watermark" as the watermark such 
> that the operator can correctly (re)compute its current state merely from 
> records after this watermark. 
>  
> For the checkpoint-coordinator to be able to compute the null-state 
> checkpoint, each stateful operator should report its "current null-state 
> watermark" as part of acknowledging the ongoing checkpoint. The null-state 
> checkpoint of the ongoing checkpoint is the most recent checkpoint preceding 
> all the received null-state watermarks (assuming the pipeline preserves the 
> relative order of barriers and watermarks).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4120) Lightweight fault tolerance through recomputing lost state

2016-06-27 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-4120:
--
Priority: Minor  (was: Major)

> Lightweight fault tolerance through recomputing lost state
> --
>
> Key: FLINK-4120
> URL: https://issues.apache.org/jira/browse/FLINK-4120
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Dénes Vadász
>Priority: Minor
>
> The current fault tolerance mechanism requires that stateful operators write 
> their internal state to stable storage during a checkpoint. 
> This proposal aims at optimizing out this operation in the cases where the 
> operator state can be recomputed from a finite (practically: small) set of 
> source records, and those records are already on checkpoint-aware persistent 
> storage (e.g. in Kafka). 
> The rationale behind the proposal is that the cost of reprocessing is paid 
> only on recovery from (presumably rare) failures, whereas the cost of 
> persisting the state is paid on every checkpoint. Eliminating the need for 
> persistent storage will also simplify system setup and operation.
> In the cases where this optimisation is applicable, the state of the 
> operators can be restored by restarting the pipeline from a checkpoint taken 
> before the pipeline ingested any of the records required for the state 
> re-computation of the operators (call this the "null-state checkpoint"), as 
> opposed to a restart from the "latest checkpoint". 
> The "latest checkpoint" is still relevant for the recovery: the barriers 
> belonging to that checkpoint must be inserted into the source streams in the 
> position they were originally inserted. Sinks must discard all records until 
> this barrier reaches them.
> Note the inherent relationship between the "latest" and the "null-state" 
> checkpoints: the pipeline must be restarted from the latter to restore the 
> state at the former.
> For the stateful operators for which this optimization is applicable we can 
> define the notion of "current null-state watermark" as the watermark such 
> that the operator can correctly (re)compute its current state merely from 
> records after this watermark. 
>  
> For the checkpoint-coordinator to be able to compute the null-state 
> checkpoint, each stateful operator should report its "current null-state 
> watermark" as part of acknowledging the ongoing checkpoint. The null-state 
> checkpoint of the ongoing checkpoint is the most recent checkpoint preceding 
> all the received null-state watermarks (assuming the pipeline preserves the 
> relative order of barriers and watermarks).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4120) Lightweight fault tolerance through recomputing lost state

2016-06-27 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-4120:
--
Component/s: (was: Core)
 State Backends, Checkpointing

> Lightweight fault tolerance through recomputing lost state
> --
>
> Key: FLINK-4120
> URL: https://issues.apache.org/jira/browse/FLINK-4120
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Dénes Vadász
>
> The current fault tolerance mechanism requires that stateful operators write 
> their internal state to stable storage during a checkpoint. 
> This proposal aims at optimizing out this operation in the cases where the 
> operator state can be recomputed from a finite (practically: small) set of 
> source records, and those records are already on checkpoint-aware persistent 
> storage (e.g. in Kafka). 
> The rationale behind the proposal is that the cost of reprocessing is paid 
> only on recovery from (presumably rare) failures, whereas the cost of 
> persisting the state is paid on every checkpoint. Eliminating the need for 
> persistent storage will also simplify system setup and operation.
> In the cases where this optimisation is applicable, the state of the 
> operators can be restored by restarting the pipeline from a checkpoint taken 
> before the pipeline ingested any of the records required for the state 
> re-computation of the operators (call this the "null-state checkpoint"), as 
> opposed to a restart from the "latest checkpoint". 
> The "latest checkpoint" is still relevant for the recovery: the barriers 
> belonging to that checkpoint must be inserted into the source streams in the 
> position they were originally inserted. Sinks must discard all records until 
> this barrier reaches them.
> Note the inherent relationship between the "latest" and the "null-state" 
> checkpoints: the pipeline must be restarted from the latter to restore the 
> state at the former.
> For the stateful operators for which this optimization is applicable we can 
> define the notion of "current null-state watermark" as the watermark such 
> that the operator can correctly (re)compute its current state merely from 
> records after this watermark. 
>  
> For the checkpoint-coordinator to be able to compute the null-state 
> checkpoint, each stateful operator should report its "current null-state 
> watermark" as part of acknowledging the ongoing checkpoint. The null-state 
> checkpoint of the ongoing checkpoint is the most recent checkpoint preceding 
> all the received null-state watermarks (assuming the pipeline preserves the 
> relative order of barriers and watermarks).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4155) Get Kafka producer partition info in open method instead of constructor

2016-07-05 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4155:
-

 Summary: Get Kafka producer partition info in open method instead 
of constructor
 Key: FLINK-4155
 URL: https://issues.apache.org/jira/browse/FLINK-4155
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.0.3, 1.1.0
Reporter: Gyula Fora


Currently the Flink Kafka producer does not really do any error handling if 
something is wrong with the partition metadata as it is serialized with the 
user function.

This means that in some cases the job can go into an error loop when using the 
checkpoints. Getting the partition info in the open method would solve this 
problem (like restarting from a savepoint which re-runs the constructor).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4193:
-

 Summary: Task manager JVM crashes while deploying cancelling jobs
 Key: FLINK-4193
 URL: https://issues.apache.org/jira/browse/FLINK-4193
 Project: Flink
  Issue Type: Bug
  Components: Streaming, TaskManager
Reporter: Gyula Fora
Priority: Critical


We have observed several TM crashes while deploying larger stateful streaming 
jobs that use the RocksDB state backend.

As the JVMs crash the logs don't show anything but I have uploaded all the info 
I have got from the standard output.

This indicates some GC and possibly some RocksDB issues underneath but we could 
not really figure out much more.

GC segfault
https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125

Other crashes (maybe rocks related)
https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818

The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371139#comment-15371139
 ] 

Gyula Fora commented on FLINK-4193:
---

I know that rocks is bundled within the application jar, but I have also built 
that yesterday or two days ago, and assuming that maven has a pretty much 
up-to-date snapshot version this should not be a problem.

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371134#comment-15371134
 ] 

Gyula Fora commented on FLINK-4193:
---

We are using a pretty recent version, just a couple of days old. The server is 
at commit 3ab9e36 (jul 7)

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371524#comment-15371524
 ] 

Gyula Fora commented on FLINK-4193:
---

It happened multiple times in a row on the same cluster but on different 
machines. But it does not occur always. The total parallelism that we were 
deploying was about 200. 

Basically what we did was to run a script that deploys 5 jobs one ofter the 
other with 10 sec in between. It usually crashed somewhere in the middle.

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-12 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372638#comment-15372638
 ] 

Gyula Fora commented on FLINK-4193:
---

I can probably try a different GC later but I have only seen That error once. 

Now I have got another error that happened after some jobs crashed on a Kafka 
issue:
https://gist.github.com/gyfora/ea80631abdf59b6eb2e198e9bfa5c520

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3006) TypeExtractor fails on custom type

2016-07-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395198#comment-15395198
 ] 

Gyula Fora commented on FLINK-3006:
---

I agree, Stephan, closing it.

> TypeExtractor fails on custom type
> --
>
> Key: FLINK-3006
> URL: https://issues.apache.org/jira/browse/FLINK-3006
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10.0, 1.0.0
>Reporter: Gyula Fora
>
> I get a weird error when I try to execute my job on the cluster. Locally this 
> works fine but running it from the command line fails during typeextraction:
> input1.union(input2, input3).map(Either:: 
> Left).returns(eventOrLongType);
> The UserEvent type is a subclass of Tuple4 with 
> no extra fields. And the Either type is a regular pojo with 2 public nullable 
> fields and a a default constructor.
> This fails when trying to extract the output type from the mapper, but I 
> wouldnt actually care about that because I am providing my custom type 
> implementation for this Either type.
> The error:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:250)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:971)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1021)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>   at java.util.ArrayList.elementData(ArrayList.java:418)
>   at java.util.ArrayList.get(ArrayList.java:431)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:599)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:493)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1392)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1273)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:560)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:389)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:273)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3006) TypeExtractor fails on custom type

2016-07-27 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-3006.
-
Resolution: Not A Problem

> TypeExtractor fails on custom type
> --
>
> Key: FLINK-3006
> URL: https://issues.apache.org/jira/browse/FLINK-3006
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.10.0, 1.0.0
>Reporter: Gyula Fora
>
> I get a weird error when I try to execute my job on the cluster. Locally this 
> works fine but running it from the command line fails during typeextraction:
> input1.union(input2, input3).map(Either:: 
> Left).returns(eventOrLongType);
> The UserEvent type is a subclass of Tuple4 with 
> no extra fields. And the Either type is a regular pojo with 2 public nullable 
> fields and a a default constructor.
> This fails when trying to extract the output type from the mapper, but I 
> wouldnt actually care about that because I am providing my custom type 
> implementation for this Either type.
> The error:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:250)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:320)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:971)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1021)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>   at java.util.ArrayList.elementData(ArrayList.java:418)
>   at java.util.ArrayList.get(ArrayList.java:431)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromInputs(TypeExtractor.java:599)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:493)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1392)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1273)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:560)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:389)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:273)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4275) Generic Folding, Reducing and List states behave differently from other state backends

2016-07-28 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4275:
-

 Summary: Generic Folding, Reducing and List states behave 
differently from other state backends
 Key: FLINK-4275
 URL: https://issues.apache.org/jira/browse/FLINK-4275
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Streaming
Reporter: Gyula Fora
Priority: Critical


In 
https://github.com/apache/flink/commit/12bf7c1a0b81d199085fe874c64763c51a93b3bf 
the expected behaviour of Folding/Reducing/List states have been changed to 
return null instead of empty collections/default values.

This was adapted for the included state backends (Memory, FS, Rocks) but not 
for the Generic state wrappers. As there are no tests for the Generic backend 
using the StateBackendTestBase this didnt show.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4275) Generic Folding, Reducing and List states behave differently from other state backends

2016-07-29 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399143#comment-15399143
 ] 

Gyula Fora commented on FLINK-4275:
---

I would not remove them in any case. They provide a very nice way to use custom 
state implementations without having to implement everything you don't care 
about.

(We also use them extensively)

> Generic Folding, Reducing and List states behave differently from other state 
> backends
> --
>
> Key: FLINK-4275
> URL: https://issues.apache.org/jira/browse/FLINK-4275
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Gyula Fora
>Priority: Critical
>
> In 
> https://github.com/apache/flink/commit/12bf7c1a0b81d199085fe874c64763c51a93b3bf
>  the expected behaviour of Folding/Reducing/List states have been changed to 
> return null instead of empty collections/default values.
> This was adapted for the included state backends (Memory, FS, Rocks) but not 
> for the Generic state wrappers. As there are no tests for the Generic backend 
> using the StateBackendTestBase this didnt show.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-6291) Internal Timer service cannot be "removed"

2017-04-11 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6291:
-

 Summary: Internal Timer service cannot be "removed"
 Key: FLINK-6291
 URL: https://issues.apache.org/jira/browse/FLINK-6291
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Streaming
Affects Versions: 1.2.0
Reporter: Gyula Fora


Currently it is not possible to register an internal timer service in one job 
and remove it after a savepoint as a nullpointer exception is thrown in the 
next savepoint:

Caused by: java.lang.Exception: Could not write timer service of MyOperator 
(17/60) to checkpoint state stream.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:418)
at 
com.king.rbea.backend.operators.scriptexecution.RBEAOperator.snapshotState(RBEAOperator.java:327)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 13 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(HeapInternalTimerService.java:294)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:414)
... 15 more

The timer serializer is null in this case as the timer service has never been 
started properly.

We should probably discard the timers for the services that are not 
reregistered after restore so we can get rid of the state completely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6291) Internal Timer service cannot be "removed"

2017-04-18 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972650#comment-15972650
 ] 

Gyula Fora commented on FLINK-6291:
---

Sounds good :)

> Internal Timer service cannot be "removed"
> --
>
> Key: FLINK-6291
> URL: https://issues.apache.org/jira/browse/FLINK-6291
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> Currently it is not possible to register an internal timer service in one job 
> and remove it after a savepoint as a nullpointer exception is thrown in the 
> next savepoint:
> Caused by: java.lang.Exception: Could not write timer service of MyOperator 
> (17/60) to checkpoint state stream.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:418)
>   at 
> com.king.rbea.backend.operators.scriptexecution.RBEAOperator.snapshotState(RBEAOperator.java:327)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 13 more
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.snapshotTimersForKeyGroup(HeapInternalTimerService.java:294)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:414)
>   ... 15 more
> The timer serializer is null in this case as the timer service has never been 
> started properly.
> We should probably discard the timers for the services that are not 
> reregistered after restore so we can get rid of the state completely.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition

2017-04-20 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976253#comment-15976253
 ] 

Gyula Fora commented on FLINK-6264:
---

I am not sure what causes the exception, probably [~tzulitai] is right.

In any case I think we should recover from this problem without crashing the 
job because this seems to be something occuring relatively frequently.
This probably means we have to retry fetching the metadata (for the affected 
partitions or all) a couple of times with some backoff maybe to give some time 
for Kafka to recover as well (if that causes the problem.)

In many cases we have noticed that after a problematic leader change/broker 
death it takes some time (seconds or minutes) until Kafka goes back in a state 
that will operate normally, we should try to bridge these gaps without crashing 
because that's much worse.

> Kafka consumer fails if can't find leader for partition
> ---
>
> Key: FLINK-6264
> URL: https://issues.apache.org/jira/browse/FLINK-6264
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> We have observed the following error many times when brokers failed/were 
> restarted:
> java.lang.RuntimeException: Unable to find a leader for partitions: 
> [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, 
> KafkaPartitionHandle=[mytopic,10], offset=-1]
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985119#comment-15985119
 ] 

Gyula Fora commented on FLINK-6390:
---

Hi Stephan,

This looks pretty useful. One thing that came to my mind about this whether it 
makes sense to add a hook when all tasks have completeted their local snapshot 
but before completing the full snapshot. (To implement a 2 phase committing 
logic for instance which could be used backends that present the data 
externally)

Gyula

> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its complet

[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985122#comment-15985122
 ] 

Gyula Fora commented on FLINK-6390:
---

we could call it completeCheckpoint for example

> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by completing the 
> future. For hooks that do not
>* need to store data, the future would be completed with null.
>* 
>* @param checkpointId The ID (logical timestamp, monotonously 
> increasing) of the checkpoint
>* @param timestamp The wall clock timest

[jira] [Commented] (FLINK-6390) Add Trigger Hooks to the Checkpoint Coordinator

2017-04-26 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986084#comment-15986084
 ] 

Gyula Fora commented on FLINK-6390:
---

Not a big deal but Stephan I think you missed my comment :)

> Add Trigger Hooks to the Checkpoint Coordinator
> ---
>
> Key: FLINK-6390
> URL: https://issues.apache.org/jira/browse/FLINK-6390
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> Some source systems require to be notified prior to starting a checkpoint, in 
> order to do preparatory work for the checkpoint.
> I propose to add an interface to allow sources to register hooks that are 
> called by the checkpoint coordinator when triggering / restoring a checkpoint.
> These hooks may produce state that is stores with the checkpoint metadata.
> Envisioned interface for the hooks
> {code}
> /**
>  * The interface for hooks that can be called by the checkpoint coordinator 
> when triggering or
>  * restoring a checkpoint. Such a hook is useful for example when preparing 
> external systems for
>  * taking or restoring checkpoints.
>  * 
>  * The {@link #triggerCheckpoint(long, long, Executor)} method (called 
> when triggering a checkpoint)
>  * can return a result (via a future) that will be stored as part of the 
> checkpoint metadata.
>  * When restoring a checkpoint, that stored result will be given to the 
> {@link #restoreCheckpoint(long, Object)}
>  * method. The hook's {@link #getIdentifier() identifier} is used to map data 
> to hook in the presence
>  * of multiple hooks, and when resuming a savepoint that was potentially 
> created by a different job.
>  * The identifier has a similar role as for example the operator UID in the 
> streaming API.
>  * 
>  * The MasterTriggerRestoreHook is defined when creating the streaming 
> dataflow graph. It is attached
>  * to the job graph, which gets sent to the cluster for execution. To avoid 
> having to make the hook
>  * itself serializable, these hooks are attached to the job graph via a 
> {@link MasterTriggerRestoreHook.Factory}.
>  * 
>  * @param  The type of the data produced by the hook and stored as part of 
> the checkpoint metadata.
>  *If the hook never stores any data, this can be typed to {@code 
> Void}.
>  */
> public interface MasterTriggerRestoreHook {
>   /**
>* Gets the identifier of this hook. The identifier is used to identify 
> a specific hook in the
>* presence of multiple hooks and to give it the correct checkpointed 
> data upon checkpoint restoration.
>* 
>* The identifier should be unique between different hooks of a job, 
> but deterministic/constant
>* so that upon resuming a savepoint, the hook will get the correct 
> data.
>* For example, if the hook calls into another storage system and 
> persists namespace/schema specific
>* information, then the name of the storage system, together with the 
> namespace/schema name could
>* be an appropriate identifier.
>* 
>* When multiple hooks of the same name are created and attached to 
> a job graph, only the first
>* one is actually used. This can be exploited to deduplicate hooks 
> that would do the same thing.
>* 
>* @return The identifier of the hook. 
>*/
>   String getIdentifier();
>   /**
>* This method is called by the checkpoint coordinator prior when 
> triggering a checkpoint, prior
>* to sending the "trigger checkpoint" messages to the source tasks.
>* 
>* If the hook implementation wants to store data as part of the 
> checkpoint, it may return
>* that data via a future, otherwise it should return null. The data is 
> stored as part of
>* the checkpoint metadata under the hooks identifier (see {@link 
> #getIdentifier()}).
>* 
>* If the action by this hook needs to be executed synchronously, 
> then this method should
>* directly execute the action synchronously and block until it is 
> complete. The returned future
>* (if any) would typically be a completed future.
>* 
>* If the action should be executed asynchronously and only needs to 
> complete before the
>* checkpoint is considered completed, then the method may use the 
> given executor to execute the
>* actual action and would signal its completion by completing the 
> future. For hooks that do not
>* need to store data, the future would be completed with null.
>* 
>* @param checkpointId The ID (logical timestamp, monotonously 
> increasing) of the checkpoint
>* @param timestamp The wall 

[jira] [Created] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing

2016-10-20 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4873:
-

 Summary: Add config option to specify "home directory" for YARN 
client resource sharing
 Key: FLINK-4873
 URL: https://issues.apache.org/jira/browse/FLINK-4873
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Affects Versions: 1.2.0, 1.1.3
Reporter: Gyula Fora


The YARN client currently uses FileSystem.getHomeDirectory() to store the jar 
files that needed to be shared on the cluster. This pretty much forces users to 
run HDFS or something compatible with the Hadoop FS api on the cluster.

In some production environments file systems (distributed or simply shared) are 
simply mounted under the same path and do not require the use of the hadoop api 
for convenience. If we want to run Flink on YARN in these cases we would need 
to be able to define the "home directory" where Flink should copy the files for 
sharing.

It could be something like:
yarn.resource.upload.dir in the flink-conf.yaml



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4948) Consider using checksums or similar to detect bad checkpoints

2016-10-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15612892#comment-15612892
 ] 

Gyula Fora commented on FLINK-4948:
---

I think in general this would be very nice but what about tools that might want 
to alter the checkpoints intentionally for some reasons. Maybe 
cleaning/transforming the state. So I think we should aim for some flexible 
checking logic here.

> Consider using checksums or similar to detect bad checkpoints
> -
>
> Key: FLINK-4948
> URL: https://issues.apache.org/jira/browse/FLINK-4948
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> We should consider proactively checking to verify that checkpoints are valid 
> when reading (and maybe writing).  This should help prevent any possible 
> state corruption issues that might otherwise go undetected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-10-31 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15622787#comment-15622787
 ] 

Gyula Fora commented on FLINK-4193:
---

Yes, still an issue that we could not figure out. We are now moving to yarn so 
the ocassional TM crashes don't cause chain failures of jobs. 
One thing to note is that I also observed this to happen on Flink 1.0.3 once. 
Seems to be RocksDB related but mostly manifests as errors during GC

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-11-01 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624754#comment-15624754
 ] 

Gyula Fora commented on FLINK-4193:
---

These issues usually happened inside the RocksDB.open(...) method during 
initialization of the state backend. If you think that the refactoring can 
affect this then we might get lucky :)

We are running this in production applications and haven't ported them to 1.2 
but in a week or two I will start working on that.

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation

2016-11-01 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15625086#comment-15625086
 ] 

Gyula Fora commented on FLINK-3659:
---

Hi,
Would this work in a way as connected streams work now but without the 
partitioning limitation and the Value state access would be "blocked" by some 
flag when we are in the method for processing the broadcast input. Or do we 
assume the broadcast state to be the same on all nodes for checkpointing?

> Add ConnectWithBroadcast Operation
> --
>
> Key: FLINK-3659
> URL: https://issues.apache.org/jira/browse/FLINK-3659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> We should add a new operation that has a main input that can be keyed (but 
> doesn't have to be) and a second input that is always broadcast. This is 
> similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to 
> be keyed or non-keyed.
> This builds on FLINK-4940 which aims at adding broadcast/global state. When 
> processing an element from the broadcast input only access to broadcast state 
> is allowed. When processing an element from the main input access both the 
> regular keyed state and the broadcast state can be accessed.
> I'm proposing this as an intermediate/low-level operation because it will 
> probably take a while until we add support for side-inputs in the API. This 
> new operation would allow expressing new patterns that cannot be expressed 
> with the currently expressed operations.
> This is the new proposed API (names are non-final): 
> 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and 
> {{KeyedStream.connectWithBroadcast(DataStream)}}
> 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/
> 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user 
> functions.
> The API names, function names are a bit verbose and we have to add two new 
> different ones but I don't see a way around this with the current way the 
> Flink API works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation

2016-11-01 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15625089#comment-15625089
 ] 

Gyula Fora commented on FLINK-3659:
---

(I have a workshop after lunch so might be slow to responds :()

> Add ConnectWithBroadcast Operation
> --
>
> Key: FLINK-3659
> URL: https://issues.apache.org/jira/browse/FLINK-3659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> We should add a new operation that has a main input that can be keyed (but 
> doesn't have to be) and a second input that is always broadcast. This is 
> similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to 
> be keyed or non-keyed.
> This builds on FLINK-4940 which aims at adding broadcast/global state. When 
> processing an element from the broadcast input only access to broadcast state 
> is allowed. When processing an element from the main input access both the 
> regular keyed state and the broadcast state can be accessed.
> I'm proposing this as an intermediate/low-level operation because it will 
> probably take a while until we add support for side-inputs in the API. This 
> new operation would allow expressing new patterns that cannot be expressed 
> with the currently expressed operations.
> This is the new proposed API (names are non-final): 
> 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and 
> {{KeyedStream.connectWithBroadcast(DataStream)}}
> 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/
> 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user 
> functions.
> The API names, function names are a bit verbose and we have to add two new 
> different ones but I don't see a way around this with the current way the 
> Flink API works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3659) Add ConnectWithBroadcast Operation

2016-11-02 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628205#comment-15628205
 ] 

Gyula Fora commented on FLINK-3659:
---

I agree that we should try to keep the API as it is now (if possible), and just 
do runtime checks whether the user is reading/updating the correct types of 
states from the correct inputs.

This goes back to the direction of your original PR with some additional 
runtime checks.

> Add ConnectWithBroadcast Operation
> --
>
> Key: FLINK-3659
> URL: https://issues.apache.org/jira/browse/FLINK-3659
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> We should add a new operation that has a main input that can be keyed (but 
> doesn't have to be) and a second input that is always broadcast. This is 
> similar to a {{CoFlatMap}} or {{CoMap}} but there either both inputs have to 
> be keyed or non-keyed.
> This builds on FLINK-4940 which aims at adding broadcast/global state. When 
> processing an element from the broadcast input only access to broadcast state 
> is allowed. When processing an element from the main input access both the 
> regular keyed state and the broadcast state can be accessed.
> I'm proposing this as an intermediate/low-level operation because it will 
> probably take a while until we add support for side-inputs in the API. This 
> new operation would allow expressing new patterns that cannot be expressed 
> with the currently expressed operations.
> This is the new proposed API (names are non-final): 
> 1) Add {{DataStream.connectWithBroadcast(DataStream)}} and 
> {{KeyedStream.connectWithBroadcast(DataStream)}}
> 2) Add {{ConnectedWithBroadcastStream}}, akin to {{ConnectedStreams}}/
> 3) Add {{BroadcastFlatMap}} and {{TimelyBroadcastFlatMap}} as the user 
> functions.
> Sketch of the user function:
> {code}
> interface BroadcastFlatMapFunction {
>   public void flatMap(IN in, Collector out);
>   public void processBroadcastInput(BIN in);
> }
> {code}
> The API names, function names are a bit verbose and we have to add two new 
> different ones but I don't see a way around this with the current way the 
> Flink API works.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4992) Expose String parameter for timers in Timely functions and TimerService

2016-11-02 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4992:
-

 Summary: Expose String parameter for timers in Timely functions 
and TimerService
 Key: FLINK-4992
 URL: https://issues.apache.org/jira/browse/FLINK-4992
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.2.0
Reporter: Gyula Fora
Priority: Minor


Currently it is very hard to register and execute multiple different types 
timers from the same user function because timers don't carry any metadata.

We propose to extend the timer registration and onTimer logic by attaching a 
String argument so users of these features can implement functionality that 
depends on this addtitional metadata.

The proposed new methods:

In the TimerService:
void registerProcessingTimeTimer(long time, String label);
void registerEventTimeTimer(long time, String label);

In the TimelyFunctions:
void onTimer(long timestamp, String label, TimeDomain timeDomain, TimerService 
timerService...);

This extended functionality can be mapped to a String namespace for the 
internal timer service. I suggest we don't use the term "namespace" here 
because it just complicates things for the users, I think "label" or "id" or 
"name" is much simpler to understand.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4992) Expose String parameter for timers in Timely functions and TimerService

2016-11-02 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15630678#comment-15630678
 ] 

Gyula Fora commented on FLINK-4992:
---

Well, it doesn't have to be Strings but there should be a way of attaching some 
sort of metadata to the registered timer (like the namespace internally). Can 
you please explain what you meant by different timer services? Do you mean 
attaching metadata to the timerservice instead of the actual registered timer? 

> Expose String parameter for timers in Timely functions and TimerService
> ---
>
> Key: FLINK-4992
> URL: https://issues.apache.org/jira/browse/FLINK-4992
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Minor
>
> Currently it is very hard to register and execute multiple different types 
> timers from the same user function because timers don't carry any metadata.
> We propose to extend the timer registration and onTimer logic by attaching a 
> String argument so users of these features can implement functionality that 
> depends on this addtitional metadata.
> The proposed new methods:
> In the TimerService:
> void registerProcessingTimeTimer(long time, String label);
> void registerEventTimeTimer(long time, String label);
> In the TimelyFunctions:
> void onTimer(long timestamp, String label, TimeDomain timeDomain, 
> TimerService timerService...);
> This extended functionality can be mapped to a String namespace for the 
> internal timer service. I suggest we don't use the term "namespace" here 
> because it just complicates things for the users, I think "label" or "id" or 
> "name" is much simpler to understand.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-07 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-5985:
-

 Summary: Flink treats every task as stateful (making topology 
changes impossible)
 Key: FLINK-5985
 URL: https://issues.apache.org/jira/browse/FLINK-5985
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Gyula Fora
Priority: Critical


It seems  that Flink treats every Task as stateful so changing the topology is 
not possible without setting uid on every single operator.

If the topology has an iteration this is virtually impossible (or at least gets 
super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-08 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900980#comment-15900980
 ] 

Gyula Fora commented on FLINK-5985:
---

It should be possible to set it for *only* the stateful ones but it doesn work.

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-08 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900995#comment-15900995
 ] 

Gyula Fora commented on FLINK-5985:
---

even something as simple as stream.filter(event -> true) requires a uid. Also 
the iteration tasks which are impossible to assign a uid for in the API

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5996) Jobmanager HA should not crash on lost ZK node

2017-03-08 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-5996:
-

 Summary: Jobmanager HA should not crash on lost ZK node
 Key: FLINK-5996
 URL: https://issues.apache.org/jira/browse/FLINK-5996
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.2.0
Reporter: Gyula Fora


Even if there are multiple zk hosts configured the jobmanager crashes if one of 
them is lost:

org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: 
KeeperErrorCode = ConnectionLoss
at 
org.apache.flink.shaded.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)
at 
org.apache.flink.shaded.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87)
at 
org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:477)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:302)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:291)
at 
org.apache.flink.shaded.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.pathInForeground(GetDataBuilderImpl.java:288)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:279)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:41)
at 
org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.readValue(SharedValue.java:244)
at 
org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.access$100(SharedValue.java:44)
at 
org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue$1.process(SharedValue.java:61)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67)
at 
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)

We should have some retry logic there



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-08 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901105#comment-15901105
 ] 

Gyula Fora commented on FLINK-5985:
---

Hi Ufuk,

I did exactly this yesterday:

First got an error like: Cannot map savepoint state for operator xxx to the new 
program, because the operator is not available in the new program. Then I added 
the debugging to see which actual operator was misbehaving.

It showed a stateless map, so I was like whatever and set the uidHash(xxx) and 
I tried again. Now it gave the same error for a different stateless operator 
(lets say a filter). I kept adding the hashes for the stateless operators until 
I reached to the point where it complained about not being able to map back the 
Iteration sink, that's when I gave up.

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5996) Jobmanager HA should not crash on lost ZK node

2017-03-08 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901108#comment-15901108
 ] 

Gyula Fora commented on FLINK-5996:
---

That is correct. The jobmanager fails and recovers successfully but the running 
jobs are restarted.

> Jobmanager HA should not crash on lost ZK node
> --
>
> Key: FLINK-5996
> URL: https://issues.apache.org/jira/browse/FLINK-5996
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> Even if there are multiple zk hosts configured the jobmanager crashes if one 
> of them is lost:
> org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: 
> KeeperErrorCode = ConnectionLoss
>   at 
> org.apache.flink.shaded.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)
>   at 
> org.apache.flink.shaded.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87)
>   at 
> org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:477)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:302)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl$4.call(GetDataBuilderImpl.java:291)
>   at 
> org.apache.flink.shaded.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.pathInForeground(GetDataBuilderImpl.java:288)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:279)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.GetDataBuilderImpl.forPath(GetDataBuilderImpl.java:41)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.readValue(SharedValue.java:244)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue.access$100(SharedValue.java:44)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.recipes.shared.SharedValue$1.process(SharedValue.java:61)
>   at 
> org.apache.flink.shaded.org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67)
>   at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
>   at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
> We should have some retry logic there



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-08 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15901424#comment-15901424
 ] 

Gyula Fora commented on FLINK-5985:
---

I was somehow assuming that this affects all jobs. Unfortunately I cannot send 
the program but I can try to reproduce it in a minimal example tomorrow. I know 
this used to work in 1.1 with pretty much the same job.

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-09 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902973#comment-15902973
 ] 

Gyula Fora commented on FLINK-6006:
---

Thank you for looking into this!

> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-09 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903110#comment-15903110
 ] 

Gyula Fora commented on FLINK-5985:
---

You can easily reproduce the bug locally by running the following simple 
program:

https://gist.github.com/gyfora/b5ccf836dc9e95bab2b1f5f483cb8bf6

Take a savepoint and uncomment the stateless map before the sink to change the 
sink hash then restore.

> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots

2017-05-15 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010393#comment-16010393
 ] 

Gyula Fora commented on FLINK-6537:
---

Hi Stefan!

Thank you for all the work :) 
I have tested your changes today and some of the issues have been resolved.

Incremental checkpoints now succeed but cannot be restored from them (ether 
after cacnellation or under normal failure scenarios)
Savepoints still fail with the previous error. And also there seems to be some 
error while discarding subsumed checkpoints. I am attaching the logs containing 
the errors:

In the beginning you can see the failed savepoint, then a couple of successful 
checkpoints with the warnings and at the end I killed the TM to trigger a 
restore which lead to file not found errors in HDFS:

https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086

I hope this helps :)
Gyula

> Umbrella issue for fixes to incremental snapshots
> -
>
> Key: FLINK-6537
> URL: https://issues.apache.org/jira/browse/FLINK-6537
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> This issue tracks ongoing fixes in the incremental checkpointing feature for 
> the 1.3 release.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6633) Register with shared state registry before adding to CompletedCheckpointStore

2017-05-19 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017374#comment-16017374
 ] 

Gyula Fora commented on FLINK-6633:
---

I have tried you fix, restore now works for incremental checkpoints but fails 
on the first checkpoint afterwards:

org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the 
pending checkpoint 4.
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:853)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:772)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: 
class org.apache.flink.runtime.state.PlaceholderStreamStateHandle
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164)
at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:287)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:843)

full log:
https://gist.github.com/gyfora/cf1894158ddbc5bbba2c0cc70d69b505

> Register with shared state registry before adding to CompletedCheckpointStore
> -
>
> Key: FLINK-6633
> URL: https://issues.apache.org/jira/browse/FLINK-6633
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Introducing placeholders for previously existing shared state requires a 
> change that shared state is first registering with {{SharedStateregistry}} 
> (thereby being consolidated) and only after that added to a 
> {{CompletedCheckpointStore}}, so that the consolidated checkpoint is written 
> to stable storage. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots

2017-05-22 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019611#comment-16019611
 ] 

Gyula Fora commented on FLINK-6537:
---

Hi Stefan!
I can confirm that the incremental checkpointing and recovery now seems to work 
as expected :) Great job on the fixes!

Savepointing still gives the same error regardless whether incremental 
checkpointing is used or not. 
Is there a JIRA for tracking that issue? I have no idea what might cause it :(

Thank you!
Gyula


> Umbrella issue for fixes to incremental snapshots
> -
>
> Key: FLINK-6537
> URL: https://issues.apache.org/jira/browse/FLINK-6537
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> This issue tracks ongoing fixes in the incremental checkpointing feature for 
> the 1.3 release.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots

2017-05-22 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1601#comment-1601
 ] 

Gyula Fora commented on FLINK-6537:
---

You can look at all the logs I have posted. For example: 
https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086#file-gistfile1-txt-L197

This exact error happens at every single savepoint attempt.

> Umbrella issue for fixes to incremental snapshots
> -
>
> Key: FLINK-6537
> URL: https://issues.apache.org/jira/browse/FLINK-6537
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> This issue tracks ongoing fixes in the incremental checkpointing feature for 
> the 1.3 release.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6537) Umbrella issue for fixes to incremental snapshots

2017-05-23 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1602#comment-1602
 ] 

Gyula Fora commented on FLINK-6537:
---

Thanks a lot for looking into this. I wont be able to verify any fix today as I 
am travelling to London, but I can try to do it tomorrow!!+

> Umbrella issue for fixes to incremental snapshots
> -
>
> Key: FLINK-6537
> URL: https://issues.apache.org/jira/browse/FLINK-6537
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> This issue tracks ongoing fixes in the incremental checkpointing feature for 
> the 1.3 release.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6685) SafetyNetCloseableRegistry is closed prematurely in Task::triggerCheckpointBarrier

2017-05-24 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023140#comment-16023140
 ] 

Gyula Fora commented on FLINK-6685:
---

Thanks Stefan, awesome job! I can confirm that this solves our savepoint issues 
:)


> SafetyNetCloseableRegistry is closed prematurely in 
> Task::triggerCheckpointBarrier
> --
>
> Key: FLINK-6685
> URL: https://issues.apache.org/jira/browse/FLINK-6685
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
>
> The {{SafetyNetCloseableRegistry}} is closed to early in 
> {{triggerCheckpointBarrier(...)}}. Right now, it seems like the code assumes 
> that {{statefulTask.triggerCheckpoint(...)}} is blocking - which it is not. 
> Like this, the registry can be closed while the checkpoint is still running.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6742) Savepoint conversion might fail if task ids change

2017-05-27 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6742:
-

 Summary: Savepoint conversion might fail if task ids change
 Key: FLINK-6742
 URL: https://issues.apache.org/jira/browse/FLINK-6742
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Gyula Fora
Priority: Critical


Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6742) Savepoint conversion might fail if operators change

2017-05-27 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-6742:
--
Summary: Savepoint conversion might fail if operators change  (was: 
Savepoint conversion might fail if task ids change)

> Savepoint conversion might fail if operators change
> ---
>
> Key: FLINK-6742
> URL: https://issues.apache.org/jira/browse/FLINK-6742
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Priority: Critical
>
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change

2017-05-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027477#comment-16027477
 ] 

Gyula Fora commented on FLINK-6742:
---

what's the reason for not allowing the removal / new operators here?

> Savepoint conversion might fail if operators change
> ---
>
> Key: FLINK-6742
> URL: https://issues.apache.org/jira/browse/FLINK-6742
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Priority: Critical
>
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change

2017-05-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027495#comment-16027495
 ] 

Gyula Fora commented on FLINK-6742:
---

In my case I would like to remove 2 operators while migrating because the state 
for those is not compatible (basically just changing the uids for those). In 
this case it actually becomes a techincal hurdle :D

> Savepoint conversion might fail if operators change
> ---
>
> Key: FLINK-6742
> URL: https://issues.apache.org/jira/browse/FLINK-6742
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Priority: Critical
>
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change

2017-05-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027498#comment-16027498
 ] 

Gyula Fora commented on FLINK-6742:
---

They are incompatible due to some custom state backend code, that's my problem 
really :)

I like option 2, but now I just went with adding an extra null check to the 
conversion step to avoid the nullpointer.

> Savepoint conversion might fail if operators change
> ---
>
> Key: FLINK-6742
> URL: https://issues.apache.org/jira/browse/FLINK-6742
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Priority: Critical
>
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6742) Savepoint conversion might fail if operators change

2017-05-27 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-6742:
--
Priority: Minor  (was: Critical)

> Savepoint conversion might fail if operators change
> ---
>
> Key: FLINK-6742
> URL: https://issues.apache.org/jira/browse/FLINK-6742
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Priority: Minor
>
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6742) Savepoint conversion might fail if operators change

2017-05-27 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027499#comment-16027499
 ] 

Gyula Fora commented on FLINK-6742:
---

Thank you for the help Chesnay! Should we close this JIRA?

> Savepoint conversion might fail if operators change
> ---
>
> Key: FLINK-6742
> URL: https://issues.apache.org/jira/browse/FLINK-6742
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Priority: Critical
>
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
>   at 
> org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6754) Savepoints don't respect client timeout config

2017-05-29 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6754:
-

 Summary: Savepoints don't respect client timeout config
 Key: FLINK-6754
 URL: https://issues.apache.org/jira/browse/FLINK-6754
 Project: Flink
  Issue Type: Bug
  Components: Client, Configuration
Reporter: Gyula Fora
Priority: Trivial


Savepoints have a hardcoded timeout:

Future response = jobManager.ask(new TriggerSavepoint(jobId, 
Option.apply(savepointDirectory)), new FiniteDuration(1, TimeUnit.HOURS));
.
.
.
result = Await.result(response, FiniteDuration.Inf());






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-1313) Add support for out-of-place aggregations for streaming

2017-05-29 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-1313.
-
Resolution: Won't Fix

> Add support for out-of-place aggregations for streaming
> ---
>
> Key: FLINK-1313
> URL: https://issues.apache.org/jira/browse/FLINK-1313
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Minor
>
> There is an ongoing effort to implement a new aggregation api for batch 
> processing: https://issues.apache.org/jira/browse/FLINK-1293
> The streaming api should implement the same aggregation logic as well to keep 
> the two apis as close as possible.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6755) Allow triggering Checkpoints through command line client

2017-05-29 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6755:
-

 Summary: Allow triggering Checkpoints through command line client
 Key: FLINK-6755
 URL: https://issues.apache.org/jira/browse/FLINK-6755
 Project: Flink
  Issue Type: New Feature
  Components: Client, State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Gyula Fora


The command line client currently only allows triggering (and canceling with) 
Savepoints. 

While this is good if we want to fork or modify the pipelines in a 
non-checkpoint compatible way, now with incremental checkpoints this becomes 
wasteful for simple job restarts/pipeline updates. 

I suggest we add a new command: 
./bin/flink checkpoint  [checkpointDirectory]

and a new flag -c for the cancel command to indicate we want to trigger a 
checkpoint:
./bin/flink cancel -c [targetDirectory] 

Otherwise this can work similar to the current savepoint taking logic, we could 
probably even piggyback on the current messages by adding boolean flag 
indicating whether it should be a savepoint or a checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6754) Savepoints don't respect client timeout config

2017-05-29 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-6754:
--
Affects Version/s: 1.3.0
   1.2.1

> Savepoints don't respect client timeout config
> --
>
> Key: FLINK-6754
> URL: https://issues.apache.org/jira/browse/FLINK-6754
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Configuration
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Gyula Fora
>Priority: Trivial
>
> Savepoints have a hardcoded timeout:
> Future response = jobManager.ask(new TriggerSavepoint(jobId, 
> Option.apply(savepointDirectory)), new FiniteDuration(1, TimeUnit.HOURS));
> .
> .
> .
> result = Await.result(response, FiniteDuration.Inf());



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6762) Cannot rescale externalized incremental checkpoints

2017-05-29 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-6762:
-

 Summary: Cannot rescale externalized incremental checkpoints
 Key: FLINK-6762
 URL: https://issues.apache.org/jira/browse/FLINK-6762
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Gyula Fora
Priority: Critical


When a job is rescaled from an externalized incremental checkpoint, the 
subsequent checkpoints fail with the following error:

org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the 
pending checkpoint 3205.
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:861)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:776)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: 
class org.apache.flink.runtime.state.PlaceholderStreamStateHandle
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandle(SavepointV2Serializer.java:484)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeStreamStateHandleMap(SavepointV2Serializer.java:342)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeKeyedStateHandle(SavepointV2Serializer.java:329)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serializeSubtaskState(SavepointV2Serializer.java:270)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:122)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.serialize(SavepointV2Serializer.java:66)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:199)
at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeExternalizedCheckpointToHandle(SavepointStore.java:164)
at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:286)
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:851)

Full log:
https://gist.github.com/gyfora/693b9a720aace843ff4570e504c4a242

Rescaling with savepoints work.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-10387) StateBackend create methods should return interface not abstract KeyedStateBackend classes

2018-09-21 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-10387:
--

 Summary: StateBackend create methods should return interface not 
abstract KeyedStateBackend classes
 Key: FLINK-10387
 URL: https://issues.apache.org/jira/browse/FLINK-10387
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Gyula Fora


Currently the createKeyedStateBackend(...) methods return 
AbstractKeyedStateBackend instead of an interface.

This makes it virtually impossible to write nice extensions to StateBackends 
that add additional functionality to existing backends while delegating other 
method calls.

It should be easy enough to add a new interface that extends everything that 
the AbstractKeyedStateBackend does and the method should return that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-1003) Spread out scheduling strategy

2018-10-02 Thread Gyula Fora (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-1003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-1003.
-
Resolution: Won't Fix

> Spread out scheduling strategy
> --
>
> Key: FLINK-1003
> URL: https://issues.apache.org/jira/browse/FLINK-1003
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Gyula Fora
>Priority: Major
>
> Currently the Flink scheduler tries to fill one instance completely before 
> the tasks are deployed to another instance. This is a good behaviour in 
> multi-user and multi-job scenarios but it wastes resources if one wants to 
> use the complete cluster. Therefore, another scheduling strategy where the 
> load among the different instances is kept balanced might be useful. This 
> spread out strategy will deploy the tasks such that the overall work is 
> equally distributed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-02 Thread Gyula Fora (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635168#comment-16635168
 ] 

Gyula Fora commented on FLINK-9635:
---

Should we consider this issue a blocker? I know the proper fix is very hard and 
a lot of effort but the current state is very unsafe as well.

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-28 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917819#comment-16917819
 ] 

Gyula Fora commented on FLINK-13874:


I added some extra logging to figure out what the problem is.
What seems to happen is that the lease revoke timeout is reached before before 
we move to the truncate, so the file is never closed before truncating:

[https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L341]

 

I wonder what might cause it to take so long, and what can we do in these 
cases. Probably throw an error or at least log a warning?

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.ap

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-28 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16917833#comment-16917833
 ] 

Gyula Fora commented on FLINK-13874:


And while the lease revoking process seem to time out for certain files, for 
others it runs in 1-2 seconds, within the same taskmanager. 

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.secur

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-28 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918043#comment-16918043
 ] 

Gyula Fora commented on FLINK-13874:


I increased the lease timeout to 5 minutes, it still failed sometimes but it 
could recover afterwards. Namenode logs don't seem to contain any additional 
information on what happened, why it was slow. It just looks normal...

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918526#comment-16918526
 ] 

Gyula Fora commented on FLINK-13874:


This seems like an issue with HDFS as the lease cannot be recovered for certain 
files for a long period of time. Something with the Block recovery process 
might be off. These files cannot be even closed manually using the hdfs client. 
Only a restart of HDFS solves the problem in many cases

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
> 

[jira] [Commented] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-29 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16918528#comment-16918528
 ] 

Gyula Fora commented on FLINK-13874:


We will investigate this further in the future to see what in HDFS might cause 
it, I will close this issue for now.

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(

[jira] [Resolved] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

2019-08-29 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora resolved FLINK-13874.

Resolution: Won't Fix

This seems to be an issue with HDFS or HDFS configuration, I will close this 
issue until further investigation is done on the HDFS side.

> StreamingFileSink fails to recover (truncate) properly
> --
>
> Key: FLINK-13874
> URL: https://issues.apache.org/jira/browse/FLINK-13874
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Gyula Fora
>Priority: Blocker
>
> It seems that there might be some problem with the truncate / recovery logic 
> for the HadoopRecoverableFsDataOutputStream.
> I keep hitting the following error:
>  
> {noformat}
> java.io.IOException: Problem while truncating file: 
> hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:89)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
>  Failed to TRUNCATE_FILE 
> /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d
>  for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because 
> DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder.
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>   at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.

[jira] [Commented] (FLINK-13566) Support checkpoint configuration through flink-conf.yaml

2019-09-05 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923145#comment-16923145
 ] 

Gyula Fora commented on FLINK-13566:


Let's wait until the discussion of FLIP-59 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration]

is over :)

> Support checkpoint configuration through flink-conf.yaml
> 
>
> Key: FLINK-13566
> URL: https://issues.apache.org/jira/browse/FLINK-13566
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / Configuration
>Reporter: Gyula Fora
>Priority: Major
>
> Currently basic checkpointing configuration happens through the 
> StreamExecutionEnvironment and the CheckpointConfig class.
> There is no way to configure checkpointing behaviour purely from the 
> flink-conf.yaml file (or provide a default checkpointing behaviour) as it 
> always needs to happen programmatically through the environment.
> The checkpoint config settings are then translated down to the 
> CheckpointCoordinatorConfiguration which will control the runtime behaviour.
> As checkpointing related settings are operational features that should not 
> affect the application logic I think we need to support configuring these 
> params through the flink-conf yaml.
> In order to do this we probably need to rework the CheckpointConfig class so 
> that it distinguishes parameters that the user actually set from the defaults 
> (to support overriding what was set in the conf).



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13979) Translate new streamfilesink docs to chinese

2019-09-05 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-13979:
--

 Summary: Translate new streamfilesink docs to chinese
 Key: FLINK-13979
 URL: https://issues.apache.org/jira/browse/FLINK-13979
 Project: Flink
  Issue Type: New Feature
  Components: chinese-translation, Documentation
Reporter: Gyula Fora


The StreamFileSink docs have been reworked as part of FLINK-13842



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Closed] (FLINK-13842) Improve Javadocs and web documentation of the StreamingFileSink

2019-09-05 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-13842.
--
Resolution: Fixed

> Improve Javadocs and web documentation of the StreamingFileSink
> ---
>
> Key: FLINK-13842
> URL: https://issues.apache.org/jira/browse/FLINK-13842
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem, Documentation
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Both the javadocs and the web docs of the StreamingFileSink and associated 
> components should be improved for a smooth user experience with more detailed 
> explanations and examples.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic

2019-09-07 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-14002:
--

 Summary: FlinkKafkaProducer constructor that takes 
KafkaSerializationSchema shouldnt take default topic
 Key: FLINK-14002
 URL: https://issues.apache.org/jira/browse/FLINK-14002
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Gyula Fora


When the KafkaSerializationSchema is used the user has the to provide the topic 
always when they create the ProducerRecord.

The defaultTopic specified in the constructor (and enforced not to be null) 
will always be ignored, this is very misleading.

We should depracate these constructors and create new ones without defaultTopic.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic

2019-09-08 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16925398#comment-16925398
 ] 

Gyula Fora commented on FLINK-14002:


Actually the default topic is used in case of KafkaContextAware serialization 
schemas which might cause further problems.

> FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt 
> take default topic
> --
>
> Key: FLINK-14002
> URL: https://issues.apache.org/jira/browse/FLINK-14002
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Gyula Fora
>Priority: Major
>
> When the KafkaSerializationSchema is used the user has the to provide the 
> topic always when they create the ProducerRecord.
> The defaultTopic specified in the constructor (and enforced not to be null) 
> will always be ignored, this is very misleading.
> We should depracate these constructors and create new ones without 
> defaultTopic.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing

2019-09-10 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16926473#comment-16926473
 ] 

Gyula Fora commented on FLINK-4873:
---

Hm, looks like I had the change commit on my master branch which I have 
overwritten recently... but we can definitely add this feature. Should be 
simple enough :)

> Add config option to specify "home directory" for YARN client resource sharing
> --
>
> Key: FLINK-4873
> URL: https://issues.apache.org/jira/browse/FLINK-4873
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.1.3, 1.2.0
>Reporter: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The YARN client currently uses FileSystem.getHomeDirectory() to store the jar 
> files that needed to be shared on the cluster. This pretty much forces users 
> to run HDFS or something compatible with the Hadoop FS api on the cluster.
> In some production environments file systems (distributed or simply shared) 
> are simply mounted under the same path and do not require the use of the 
> hadoop api for convenience. If we want to run Flink on YARN in these cases we 
> would need to be able to define the "home directory" where Flink should copy 
> the files for sharing.
> It could be something like:
> yarn.resource.upload.dir in the flink-conf.yaml



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing

2019-09-10 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-4873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-4873:
-

Assignee: Matyas Orhidi

> Add config option to specify "home directory" for YARN client resource sharing
> --
>
> Key: FLINK-4873
> URL: https://issues.apache.org/jira/browse/FLINK-4873
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.1.3, 1.2.0
>Reporter: Gyula Fora
>Assignee: Matyas Orhidi
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The YARN client currently uses FileSystem.getHomeDirectory() to store the jar 
> files that needed to be shared on the cluster. This pretty much forces users 
> to run HDFS or something compatible with the Hadoop FS api on the cluster.
> In some production environments file systems (distributed or simply shared) 
> are simply mounted under the same path and do not require the use of the 
> hadoop api for convenience. If we want to run Flink on YARN in these cases we 
> would need to be able to define the "home directory" where Flink should copy 
> the files for sharing.
> It could be something like:
> yarn.resource.upload.dir in the flink-conf.yaml



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment

2019-09-10 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-14048:
--

 Summary: Flink client hangs after trying to kill Yarn Job during 
deployment
 Key: FLINK-14048
 URL: https://issues.apache.org/jira/browse/FLINK-14048
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission, Deployment / YARN
Reporter: Gyula Fora


If we kill the flink client run command from the terminal while deploying to 
YARN (let's say we realize we used the wrong parameters), the YARN application 
will be killed immediately but the client won't shut down.

We get the following messages over and over:

19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The 
client is stopped, while invoking 
ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 
failover attempts. Trying to failover after sleeping for 16296ms.

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14048) Flink client hangs after trying to kill Yarn Job during deployment

2019-09-11 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927313#comment-16927313
 ] 

Gyula Fora commented on FLINK-14048:


yes it was in per-job mode

> Flink client hangs after trying to kill Yarn Job during deployment
> --
>
> Key: FLINK-14048
> URL: https://issues.apache.org/jira/browse/FLINK-14048
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Deployment / YARN
>Reporter: Gyula Fora
>Priority: Major
>
> If we kill the flink client run command from the terminal while deploying to 
> YARN (let's say we realize we used the wrong parameters), the YARN 
> application will be killed immediately but the client won't shut down.
> We get the following messages over and over:
> 19/09/10 23:35:55 INFO retry.RetryInvocationHandler: java.io.IOException: The 
> client is stopped, while invoking 
> ApplicationClientProtocolPBClientImpl.forceKillApplication over null after 14 
> failover attempts. Trying to failover after sleeping for 16296ms.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic

2019-09-12 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928513#comment-16928513
 ] 

Gyula Fora commented on FLINK-14002:


I am bit busy in the next few days but I can definitely fix this before the 
next release if no one picks it up earlier :)

> FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt 
> take default topic
> --
>
> Key: FLINK-14002
> URL: https://issues.apache.org/jira/browse/FLINK-14002
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Gyula Fora
>Priority: Major
>
> When the KafkaSerializationSchema is used the user has the to provide the 
> topic always when they create the ProducerRecord.
> The defaultTopic specified in the constructor (and enforced not to be null) 
> will always be ignored, this is very misleading.
> We should depracate these constructors and create new ones without 
> defaultTopic.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (FLINK-14145) getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-14145:
--

Assignee: Gyula Fora

> getLatestCheckpoint(true) returns wrong checkpoint
> --
>
> Key: FLINK-14145
> URL: https://issues.apache.org/jira/browse/FLINK-14145
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Ufuk Celebi
>Assignee: Gyula Fora
>Priority: Major
> Attachments: 
> 0001-FLINK-14145-runtime-Add-getLatestCheckpoint-test.patch
>
>
> The flag to prefer checkpoints for recovery introduced in FLINK-11159 returns 
> the wrong checkpoint if:
> * checkpoints are preferred  ({{getLatestCheckpoint(true)}}),
> * the latest checkpoint is *not* a savepoint,
> * more than a single checkpoint is retained.
> The current implementation assumes that the latest checkpoint is a savepoint 
> and skips over it. I attached a patch for 
> {{StandaloneCompletedCheckpointStoreTest}} that demonstrates the issue.
> You can apply the patch via {{git am -3 < *.patch}}.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14145) getLatestCheckpoint(true) returns wrong checkpoint

2019-09-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16934222#comment-16934222
 ] 

Gyula Fora commented on FLINK-14145:


Thanks [~uce] for catching this, I should have tested this feature more 
thoroughly 

> getLatestCheckpoint(true) returns wrong checkpoint
> --
>
> Key: FLINK-14145
> URL: https://issues.apache.org/jira/browse/FLINK-14145
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Ufuk Celebi
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-14145-runtime-Add-getLatestCheckpoint-test.patch
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The flag to prefer checkpoints for recovery introduced in FLINK-11159 returns 
> the wrong checkpoint if:
> * checkpoints are preferred  ({{getLatestCheckpoint(true)}}),
> * the latest checkpoint is *not* a savepoint,
> * more than a single checkpoint is retained.
> The current implementation assumes that the latest checkpoint is a savepoint 
> and skips over it. I attached a patch for 
> {{StandaloneCompletedCheckpointStoreTest}} that demonstrates the issue.
> You can apply the patch via {{git am -3 < *.patch}}.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-14145) getLatestCheckpoint(true) returns wrong checkpoint

2019-09-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora resolved FLINK-14145.

Resolution: Fixed

> getLatestCheckpoint(true) returns wrong checkpoint
> --
>
> Key: FLINK-14145
> URL: https://issues.apache.org/jira/browse/FLINK-14145
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.0
>Reporter: Ufuk Celebi
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> 0001-FLINK-14145-runtime-Add-getLatestCheckpoint-test.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The flag to prefer checkpoints for recovery introduced in FLINK-11159 returns 
> the wrong checkpoint if:
> * checkpoints are preferred  ({{getLatestCheckpoint(true)}}),
> * the latest checkpoint is *not* a savepoint,
> * more than a single checkpoint is retained.
> The current implementation assumes that the latest checkpoint is a savepoint 
> and skips over it. I attached a patch for 
> {{StandaloneCompletedCheckpointStoreTest}} that demonstrates the issue.
> You can apply the patch via {{git am -3 < *.patch}}.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-6755) Allow triggering Checkpoints through command line client

2019-05-15 Thread Gyula Fora (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840198#comment-16840198
 ] 

Gyula Fora commented on FLINK-6755:
---

I agree with the leaked abstraction, I dont see this as a problem though.

[~yanghua] I think selecting either automatic/manual is a bit too restrictive. 
The periodic checkpoints are nice, but sometimes manually triggering one right 
before a restart is just a great feature.

> Allow triggering Checkpoints through command line client
> 
>
> Key: FLINK-6755
> URL: https://issues.apache.org/jira/browse/FLINK-6755
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Affects Versions: 1.3.0
>Reporter: Gyula Fora
>Assignee: vinoyang
>Priority: Major
>
> The command line client currently only allows triggering (and canceling with) 
> Savepoints. 
> While this is good if we want to fork or modify the pipelines in a 
> non-checkpoint compatible way, now with incremental checkpoints this becomes 
> wasteful for simple job restarts/pipeline updates. 
> I suggest we add a new command: 
> ./bin/flink checkpoint  [checkpointDirectory]
> and a new flag -c for the cancel command to indicate we want to trigger a 
> checkpoint:
> ./bin/flink cancel -c [targetDirectory] 
> Otherwise this can work similar to the current savepoint taking logic, we 
> could probably even piggyback on the current messages by adding boolean flag 
> indicating whether it should be a savepoint or a checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   6   7   8   9   10   >