Consuming messages on the same backend
Hi. I have the following problem : we are building a system that will generate any number of different events published to different topics Events have an associated client and a client can express interest in these events. When they do, then for each event we will execute a callback to a remote (http) endpoint It is important that we so not overload the remote endpoint say after a crash and there are a metric ton of unprocessed events waiting to be processed So i'd like all events for a client to end up on the same cosumer and do the throttling there Is it possible to do this in kafka ? I know that i can specify the client as a key when producing the events so that events for a client for that topic all end up on the same partition but because we have a topic per event type and lots of events then the messages will still be processed by many backends Creating a new topic that contains all the events from all the other topics would work i guess. But this feels inelegant and then you are storing data twice Can anyone offer any guidance?
How to make savepoints more robust in the face of refactorings ?
Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a If I change Operators in this file to Operators2 i will not be able to recover from a savepoint that was made when this class still had its old name. The error in the flink ui will be java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) ... 6 more But the real reason is found in the task manager logs 2018-01-28 17:03:58,830 WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null. java.io.IOException: Unloadable class for type serializer. at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282) at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: failed to read class descriptor at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flin
"Attempt to heart beat failed since the group is rebalancing, try to re-join group" errors
Hi We run kafka 0.10.0.1 on the server And recently upgraded a client that used the kafka 0.8.2 java client to use kafka client 0.9.0.1 We previously had fetch.message.max.bytes set to 11534336 This ended up crashing in production with org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {conversations-3=80824497} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow. and we realized this property had been changed to max.partition.fetch.bytes however as soon as i set this property. The log would be full of Attempt to heart beat failed since the group is rebalancing, try to re-join group. errors and no one would ever end end up owning a partition. The where all unassigned. Can anyone offer any suggestions on how to solve this ? --jelmer