Hi Chris,

[forwarding the private discussion to the mailing list again]

first of all, are you sure that your Sensor class is either a top-level class or a static inner class. Because it seems there is way more stuff in it (maybe included by accident transitively?). Such as:

org.apache.logging.log4j.core.layout.AbstractCsvLayout
     > Serialization trace:
     > classes (sun.misc.Launcher$AppClassLoader)
     > classloader (java.security.ProtectionDomain)
     > cachedPDs (javax.security.auth.SubjectDomainCombiner)
     > combiner (java.security.AccessControlContext)
     > acc (sun.security.ssl.SSLSocketImpl)
     > connection (org.postgresql.core.PGStream)
     > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
     > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
     > commitQuery (org.postgresql.jdbc.PgConnection)
     > connection (org.postgresql.jdbc.PgResultSet)
     > val$rs

When declaring state you can use `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to check if your state is a POJO type.

Regards,
Timo


-------- Forwarded Message --------
Subject:        Re: Updating ValueState not working in hosted Kinesis
Date:   Wed, 19 Feb 2020 12:02:16 +0000
From:   Chris Stevens <ch...@sensingfeeling.com>
To:     Timo Walther <twal...@apache.org>



Hi Timo,

Thanks for your reply. This makes sense to me, how do I treat something as a POJO instead of a generic serialized BB type? Sorry relatively new to Java and Flink.

This is my full class def:

package sensingfeeling.models;
import java.io.Serializable;

public class Sensor implements Serializable {

     private static final long serialVersionUID = 8582433437601788991L;
     public String sensorId;
     public String companyId;
     public String label;
     // public Date createdAt;
     // public Date updatedAt;
     public Integer uncomfortableFaceLimit;
     public Boolean online;
     public String capabilityId;
     // public Date lastOnlineAt;
     // public Date lastOfflineAt;
     public Integer onlineVersionNumber;
     public int status;
     @Override
     public String toString(){
         return this.sensorId + " - " + this.label;
     }
}

Super simple really.

I'm not trying to upgrade anything as far as I know. Just making an operator state aware.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 11:55, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Chris,

    it seems there are field serialized into state that actually don't
    belong there. You should aim to treat Sensor as a POJO instead of a
    Kryo
    generic serialized black-box type.

    Furthermore, it seems that field such as
    "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be
    state. Is there a "transient" keyword missing?

    Are you trying to upgrade your job or the Flink version?

    Regards,
    Timo



    On 18.02.20 18:59, Chris Stevens wrote:
     > Hi there,
     >
     > I'm trying to update state in one of my applications hosted in
    Kinesis
     > Data Analytics.
     >
     > private transient ValueState<Sensor> sensorState;
     > using sensorState.update(sensor);
     >
     > Get error:
     >
     > An error occurred: org.apache.flink.util.FlinkRuntimeException:
    Error
     > while adding data to RocksDB
     > at
     >

org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
     > at
     >

org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
     > at
     >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
     > at
     >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
     > at
     >

org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
     > at
     >

org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
     > at
     >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
     > at
     >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
     > at
     >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
     > at
     >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
     > at
     >

org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
     > at
     >

org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
     > at
     >

org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
     > at
     > org.apache.flink.streaming.runtime.io

<http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
     > at
     >

org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
     > at
     >

org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
     > at
     > org.apache.flink.streaming.runtime.io

<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
     > at
     >

org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
     > at
     >

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
     > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
     > at java.lang.Thread.run(Thread.java:748)
     > Caused by: com.esotericsoftware.kryo.KryoException:
     > java.lang.IllegalArgumentException: Unable to create serializer
     > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
     > org.apache.logging.log4j.core.layout.AbstractCsvLayout
     > Serialization trace:
     > classes (sun.misc.Launcher$AppClassLoader)
     > classloader (java.security.ProtectionDomain)
     > cachedPDs (javax.security.auth.SubjectDomainCombiner)
     > combiner (java.security.AccessControlContext)
     > acc (sun.security.ssl.SSLSocketImpl)
     > connection (org.postgresql.core.PGStream)
     > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
     > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
     > commitQuery (org.postgresql.jdbc.PgConnection)
     > connection (org.postgresql.jdbc.PgResultSet)
     > val$rs
     >

(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
     > at
     >

com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
     > at
     >

com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
     > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
     > at
     >

org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
     > at
     >

org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
     > at
     >

org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
     > at
     >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
     > at
     >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
     > at
     >

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
     > at
     >

org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
     > ... 20 more
     > Caused by: java.lang.IllegalArgumentException: Unable to create
     > serializer
    "com.esotericsoftware.kryo.serializers.FieldSerializer" for
     > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
     > at
     >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
     > at
     >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
> at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351) > at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58) > at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
     > at
     >

com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
     > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
     > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
     > at
     >

com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
     > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
     > at
     >

com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
     > at
     >

com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
     > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
     > at
     >

com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
     > at
     >

com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
     > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
     > at
     >

com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
     > ... 62 more
     > Caused by: java.lang.reflect.InvocationTargetException
     > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
    Source)
     > at
     >

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
     > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
     > at
     >

com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
     > ... 78 more
     > Caused by: java.lang.NoClassDefFoundError:
     > Lorg/apache/commons/csv/CSVFormat;
     > at java.lang.Class.getDeclaredFields0(Native Method)
     > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
     > at java.lang.Class.getDeclaredFields(Class.java:1916)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
     > at
     >

com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
     > ... 82 more
     > Caused by: java.lang.ClassNotFoundException:
     > org.apache.commons.csv.CSVFormat
     > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
     > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
     > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
     > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
     > ... 88 more
     >
     > Any help would be great. I tried manually including CSVFormat from
     > apache commons but didn't change anything.
     >
     > Many thanks,
     > Chris Stevens
     > Head of Research & Development
     > +44 7565 034 595

Reply via email to