Hi there,
I'm trying to update state in one of my applications hosted in Kinesis
Data Analytics.
private transient ValueState<Sensor> sensorState;
using sensorState.update(sensor);
Get error:
An error occurred: org.apache.flink.util.FlinkRuntimeException: Error
while adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
at
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
at
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
at
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
at
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
at
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Unable to create serializer
"com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
org.apache.logging.log4j.core.layout.AbstractCsvLayout
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
cachedPDs (javax.security.auth.SubjectDomainCombiner)
combiner (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
connection (org.postgresql.core.PGStream)
pgStream (org.postgresql.core.v3.QueryExecutorImpl)
transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
commitQuery (org.postgresql.jdbc.PgConnection)
connection (org.postgresql.jdbc.PgResultSet)
val$rs
(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
at
org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
... 20 more
Caused by: java.lang.IllegalArgumentException: Unable to create
serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for
class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
at
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
at
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
... 62 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
... 78 more
Caused by: java.lang.NoClassDefFoundError:
Lorg/apache/commons/csv/CSVFormat;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at java.lang.Class.getDeclaredFields(Class.java:1916)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
... 82 more
Caused by: java.lang.ClassNotFoundException:
org.apache.commons.csv.CSVFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 88 more
Any help would be great. I tried manually including CSVFormat from
apache commons but didn't change anything.
Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595