Consuming messages on the same backend

2015-03-11 Thread jelmer
Hi. I have the following problem :

we are building a system that will generate any number of different events
published to different topics

Events have an associated client and a client can express interest in these
events. When they do, then for each event we will execute a callback to a
remote (http) endpoint

It is important that we so not overload the remote endpoint say after a
crash and there are a metric ton of unprocessed events waiting to be
processed

So i'd like all events for a client to end up on the same cosumer and do
the throttling there

Is it possible to do this in kafka ? I know that i can specify the client
as a key when producing the events so that events for a client for that
topic all end up on the same partition

but because we have a topic per event type and lots of events then the
messages will still be processed by many backends

Creating a new topic that contains all the events from all the other topics
would work i guess. But this feels inelegant and then you are storing data
twice

Can anyone offer any guidance?


How to make savepoints more robust in the face of refactorings ?

2018-01-28 Thread jelmer
Changing the class operators are nested in can break compatibility with
existing savepoints. The following piece of code demonstrates this

https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a

If I change Operators in this file to Operators2  i will not be able to
recover from a savepoint that was made  when this class still had its old
name.

The error in the flink ui will be

java.lang.IllegalStateException: Could not initialize keyed state backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
... 6 more

But the real reason is found in the task manager logs


2018-01-28 17:03:58,830 WARN
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282)
at
org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
at
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at
org.apache.flin

"Attempt to heart beat failed since the group is rebalancing, try to re-join group" errors

2016-10-29 Thread jelmer
Hi

We run kafka 0.10.0.1 on the server

And recently upgraded a client that used the kafka 0.8.2 java client to use
kafka client 0.9.0.1
We previously had fetch.message.max.bytes set to 11534336

This ended up crashing in production with

org.apache.kafka.common.errors.RecordTooLargeException: There are some
messages at [Partition=Offset]: {conversations-3=80824497} whose size
is larger than the fetch size 1048576 and hence cannot be ever
returned. Increase the fetch size, or decrease the maximum message
size the broker will allow.

and we realized this property had been changed to max.partition.fetch.bytes
however as soon as i set this property. The log would be full of

Attempt to heart beat failed since the group is rebalancing, try to re-join
group.

errors and no one would ever end end up owning a partition. The where all
unassigned.

Can anyone offer any suggestions on how to solve this ?

--jelmer