Hi Chris,

your observation is right. By `new Sensor() {}` instead of just `new Sensor()` you are creating an anonymous non-static class that references the outer method and class.

If you check your logs, there might be also a reason why your POJO is used as a generic type. I assume because you declared `implements Serializable` which forces Flink to think "user wants to deal with serialization of the POJO himself".

Regards,
Timo


On 19.02.20 14:24, Chris Stevens wrote:
Thanks again Timo, I hope I replied correctly this time.

As per my previous message the Sensor class is a very simple POJO type (I think).

When the serialization trace talks about PGSql stuff it makes me think that something from my operator is being included in serialization. Not just the Sensor object itself which I am explicitly including in state.

packagesensingfeeling.functions.mapping;

publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>, TypeA> {

privatestaticfinallongserialVersionUID= 8582433437601788991L;

privatetransientValueState<Sensor> sensorState;

@Override
publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths) throwsJsonProcessingException{

Sensorsensor= sensorState.value();
if(sensor == null) {
LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
sensor = getSensor(frame);
sensorState.update(sensor);
}

returnnewTypeA();
}

@Override
publicvoidopen(Configurationconfig) {
LOG.debug("Sensor open method called", config);

StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

ValueStateDescriptor<Sensor> sensorStateDescriptor= newValueStateDescriptor<>( "sensor", TypeInformation.of(newTypeHint<Sensor>(){}));
// sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
sensorState = getRuntimeContext().getState(sensorStateDescriptor);

}

privateSensorgetSensor(TypeBframe) throwsException{

Class.forName("org.postgresql.Driver");
try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword);
Statementst= con.createStatement();
ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+ frame.sensorId+ "'")) {

if(rs.next()) {
Sensorsensor= newSensor() {};

LOG.debug("Got sensor"+ sensor);

returnsensor;
}

} catch(SQLExceptionex) {
LOG.error("Error when connection postgres", ex);
throwex;
}

returnnull;
}

}

Above is a cut down version of my operator, I'm guessing it is the ResultSet rs that is getting serialized. How do I prevent this undesirable behaviour? I'm quite happy for my solution to serialize only what I explicitly tell it to, I don't need exactly once or anything.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 12:19, Timo Walther <twal...@apache.org <mailto:twal...@apache.org>> wrote:

    Hi Chris,

    [forwarding the private discussion to the mailing list again]

    first of all, are you sure that your Sensor class is either a top-level
    class or a static inner class. Because it seems there is way more stuff
    in it (maybe included by accident transitively?). Such as:

    org.apache.logging.log4j.core.layout.AbstractCsvLayout
           > Serialization trace:
           > classes (sun.misc.Launcher$AppClassLoader)
           > classloader (java.security.ProtectionDomain)
           > cachedPDs (javax.security.auth.SubjectDomainCombiner)
           > combiner (java.security.AccessControlContext)
           > acc (sun.security.ssl.SSLSocketImpl)
           > connection (org.postgresql.core.PGStream)
           > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
           > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
           > commitQuery (org.postgresql.jdbc.PgConnection)
           > connection (org.postgresql.jdbc.PgResultSet)
           > val$rs

    When declaring state you can use
    `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to

    check if your state is a POJO type.

    Regards,
    Timo


    -------- Forwarded Message --------
    Subject:        Re: Updating ValueState not working in hosted Kinesis
    Date:   Wed, 19 Feb 2020 12:02:16 +0000
    From:   Chris Stevens <ch...@sensingfeeling.com
    <mailto:ch...@sensingfeeling.com>>
    To:     Timo Walther <twal...@apache.org <mailto:twal...@apache.org>>



    Hi Timo,

    Thanks for your reply. This makes sense to me, how do I treat something
    as a POJO instead of a generic serialized BB type? Sorry relatively new
    to Java and Flink.

    This is my full class def:

    package sensingfeeling.models;
    import java.io.Serializable;

    public class Sensor implements Serializable {

           private static final long serialVersionUID =
    8582433437601788991L;
           public String sensorId;
           public String companyId;
           public String label;
           // public Date createdAt;
           // public Date updatedAt;
           public Integer uncomfortableFaceLimit;
           public Boolean online;
           public String capabilityId;
           // public Date lastOnlineAt;
           // public Date lastOfflineAt;
           public Integer onlineVersionNumber;
           public int status;
           @Override
           public String toString(){
               return this.sensorId + " - " + this.label;
           }
    }

    Super simple really.

    I'm not trying to upgrade anything as far as I know. Just making an
    operator state aware.

    Many thanks,
    Chris Stevens
    Head of Research & Development
    +44 7565 034 595


    On Wed, 19 Feb 2020 at 11:55, Timo Walther <twal...@apache.org
    <mailto:twal...@apache.org>
    <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:

          Hi Chris,

          it seems there are field serialized into state that actually don't
          belong there. You should aim to treat Sensor as a POJO instead
    of a
          Kryo
          generic serialized black-box type.

          Furthermore, it seems that field such as
          "org.apache.logging.log4j.core.layout.AbstractCsvLayout"
    should not be
          state. Is there a "transient" keyword missing?

          Are you trying to upgrade your job or the Flink version?

          Regards,
          Timo



          On 18.02.20 18:59, Chris Stevens wrote:
           > Hi there,
           >
           > I'm trying to update state in one of my applications hosted in
          Kinesis
           > Data Analytics.
           >
           > private transient ValueState<Sensor> sensorState;
           > using sensorState.update(sensor);
           >
           > Get error:
           >
           > An error occurred: org.apache.flink.util.FlinkRuntimeException:
          Error
           > while adding data to RocksDB
           > at
           >

    
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
           > at
           >

    
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
           > at
           >

    
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
           > at
           >

    
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
           > at
           >

    
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
           > at
           >

    
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
           > at
           >

    
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
           > at
           >

    
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
           > at
           >

    
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
           > at
           >

    
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
           > at
           >

    
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
           > at
           >

    
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
           > at
           >

    
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
           > at
           > org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>

    
<http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
           > at
           >

    
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
           > at
           >

    
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
           > at
           > org.apache.flink.streaming.runtime.io
    <http://org.apache.flink.streaming.runtime.io>

    
<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
           > at
           >

    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
           > at
           >

    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
           > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
           > at java.lang.Thread.run(Thread.java:748)
           > Caused by: com.esotericsoftware.kryo.KryoException:
           > java.lang.IllegalArgumentException: Unable to create serializer
           > "com.esotericsoftware.kryo.serializers.FieldSerializer" for
    class:
           > org.apache.logging.log4j.core.layout.AbstractCsvLayout
           > Serialization trace:
           > classes (sun.misc.Launcher$AppClassLoader)
           > classloader (java.security.ProtectionDomain)
           > cachedPDs (javax.security.auth.SubjectDomainCombiner)
           > combiner (java.security.AccessControlContext)
           > acc (sun.security.ssl.SSLSocketImpl)
           > connection (org.postgresql.core.PGStream)
           > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
           > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
           > commitQuery (org.postgresql.jdbc.PgConnection)
           > connection (org.postgresql.jdbc.PgResultSet)
           > val$rs
           >

    (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at
    com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
           > at
           >

    
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
           > at
           >

    
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at
    com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
           > at
    com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
           > at
           >

    
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
           > at
           >

    
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
           > at
           >

    
org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
           > at
           >

    
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
           > at
           >

    
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
           > at
           >

    
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
           > at
           >

    
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
           > ... 20 more
           > Caused by: java.lang.IllegalArgumentException: Unable to create
           > serializer
          "com.esotericsoftware.kryo.serializers.FieldSerializer" for
           > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
           > at
           >

    
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
           > at
           >

    
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
           > at
    com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
           > at
    com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
           > at
    com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
           > at
           >

    
com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
           > at
    com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
           > at
    com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
           > at
           >

    
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
           > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
           > at
           >

    
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
           > at
           >

    
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
           > at
    com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
           > at
           >

    
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
           > at
           >

    
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
           > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
           > at
           >

    com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
           > ... 62 more
           > Caused by: java.lang.reflect.InvocationTargetException
           > at
    sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
          Source)
           > at
           >

    
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
           > at
    java.lang.reflect.Constructor.newInstance(Constructor.java:423)
           > at
           >

    
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
           > ... 78 more
           > Caused by: java.lang.NoClassDefFoundError:
           > Lorg/apache/commons/csv/CSVFormat;
           > at java.lang.Class.getDeclaredFields0(Native Method)
           > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
           > at java.lang.Class.getDeclaredFields(Class.java:1916)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
           > at
           >

    
com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
           > ... 82 more
           > Caused by: java.lang.ClassNotFoundException:
           > org.apache.commons.csv.CSVFormat
           > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
           > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
           > at
    sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
           > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
           > ... 88 more
           >
           > Any help would be great. I tried manually including
    CSVFormat from
           > apache commons but didn't change anything.
           >
           > Many thanks,
           > Chris Stevens
           > Head of Research & Development
           > +44 7565 034 595


Reply via email to