Flink job hangs/deadlocks (possibly related to out of memory)

2018-06-29 Thread gerardg
Hello,We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to these ones:[Canceler/In

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-06-29 Thread gerardg
(fixed formatting) Hello, We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to

When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-16 Thread gerardg
Hi, Our deployment consists of a standalone HA cluster of 8 machines with an external Zookeeper cluster. We have observed several times that when a jobmanager fails and a new one is elected, the new one tries to restart more jobs than the ones that were running and since it can't find some files,

Override CaseClassSerializer with custom serializer

2018-08-17 Thread gerardg
Hello, I can't seem to be able to override the CaseClassSerializer with my custom serializer. I'm using env.getConfig.addDefaultKryoSerializer() to add the custom serializer but I don't see it being used. I guess it is because it only uses Kryo based serializers if it can't find a Flink serializer

Unbalanced Kafka consumer consumption

2018-10-26 Thread gerardg
Hi, We are experience issues scaling our Flink application and we have observed that it may be because Kafka messages consumption is not balanced across partitions. The attached image (lag per partition) shows how only one partition consumes messages (the blue one in the back) and it wasn't until

Timestamp synchronized message consumption across kafka partitions

2019-03-07 Thread gerardg
I'm wondering if there is a way to avoid consuming too fast from partitions that not have as much data as the other ones in the same topic by keeping them more or less synchronized by its ingestion timestamp. Similar to what kafka streams does: https://cwiki.apache.org/confluence/display/KAFKA/KIP-

Status.JVM.Memory.Heap.Used metric shows only a few megabytes but profiling shows gigabytes (as expected)

2019-03-19 Thread gerardg
Hi, Before Flink 1.7.0 we were getting correct values in Status.JVM.Memory.Heap.Used metric. Since updating we just see a constant small value (just a few megabytes), did something change? Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Missing checkpoint when restarting failed job

2017-11-21 Thread gerardg
Hello, We have a task that fails to restart from a checkpoint with the following error: java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at

Re: Missing checkpoint when restarting failed job

2017-11-21 Thread gerardg
> where exactly did you read many times that incremental checkpoints cannot reference files from previous > checkpoints, because we would have to correct that information. In fact, > this is how incremental checkpoints work. My fault, I read it in some other posts in the mailing list but now tha

Re: Kafka topic partition skewness causes watermark not being emitted

2017-12-12 Thread gerardg
I'm also affected by this behavior. There are no updates in FLINK-5479 but did you manage to find a way to workaround this? Thanks, Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Get which key groups are assigned to an operator

2018-02-20 Thread gerardg
Hello, To improve performance we have " keyed state" in the operator's memory, basically we keep a Map which contains the state per each of the keys. The problem comes when we want to restore the state after a failure or after rescaling the operator. What we are doing is sending the concatenation

Re: This server is not the leader for that topic-partition

2018-05-22 Thread gerardg
I've seen the same error while upgrading Kafka. We are using FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka 1.1.0, each time a server was restarted, an already running Flink job failed with the same message. Gerard -- Sent from: http://apache-flink-user-mailing-list-arc

Managed operator state treating state of all parallel operators as the same

2017-07-03 Thread gerardg
Hello, Is it possible to have managed operator state where all the parallel operators know that they have the same state? Therefore, it would be only snapshotted by one of them and in case of failure (or rescaling) all would use that snapshot. For the rescaling case I guess I could use union redi

Re: Managed operator state treating state of all parallel operators as the same

2017-07-04 Thread gerardg
Thanks Fabian, I'll keep an eye to that JIRA. I'm not sure I follow you Stefan. You mean that I could implement my own OperatorStateStore and override its methods (e.g. snapshot and restore) to achieve this functionality? I think I don't have enough knowledge about Flink's internals to implement t

Operator variables in memory scoped by key

2017-08-31 Thread gerardg
I'm using a trie tree to match prefixes efficiently in an operator applied to a KeyedStream. It can grow quite large so I'd like to store the contents of the tree in a keyed MapState so I benefit from incremental checkpoints. Then, I'd just need to recreate the tree in memory from the MapState in c

Re: Operator variables in memory scoped by key

2017-09-01 Thread gerardg
Thanks Aljoscha, So I can think of three possible solutions: * Use an instance dictionary to store the trie tree scoped by the same key that the KeyedStream. That should work if the lifetime of the operator object is tied to the keys that it processes. * Store the trie tree in a ValueState but so

Clean GlobalWidnow state

2017-09-15 Thread gerardg
Hi, I have the following operator: mainStream .coGroup(coStream) .where(_.uuid).equalTo(_.uuid) .window(GlobalWindows.create()) .trigger(triggerWhenAllReceived) .apply(mergeElements) TLDR; It seems that the checkpointed state of the operator keeps growing forever ev

Re: Clean GlobalWidnow state

2017-09-15 Thread gerardg
Sure: The application is configured to use processing time. Thanks, Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

2017-09-15 Thread gerardg
I'm using nabble and seems that it has removed the code between raw tags. Here it is again: import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.{ReducingStateDescriptor, ValueStateDescriptor} import org.a

Re: Clean GlobalWidnow state

2017-09-18 Thread gerardg
I may be able to better know what is happening if I could get what is being stored in the state. Is there any way to read the RocksDB db state? Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
Thanks Fabian, I'll take a look to these improvements. I was wondering if the increasing state size could be due to that the UUID used in the keyBy are randomly generated. Maybe even if I correctly delete all the state related to a given key there is still some metadata related to the key wanderin

Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
The UUIDs are assigned. As far as I can see (inspecting the metrics and how the task behaves) the mergeElements apply function receives all the elements (the main element and the other elements that it expects) so it seems that the correlation is correct. Also, nothing indicates that there are el

Re: Clean GlobalWidnow state

2017-09-20 Thread gerardg
I have prepared a repo that reproduces the issue: https://github.com/GerardGarcia/flink-global-window-growing-state Maybe this way it is easier to spot the error or we can determine if it is a bug. Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/