Thanks so much Timo, got it working now. All down to my lack of Java skill.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 15:12, Timo Walther <twal...@apache.org> wrote:

> Hi Chris,
>
> your observation is right. By `new Sensor() {}` instead of just `new
> Sensor()` you are creating an anonymous non-static class that references
> the outer method and class.
>
> If you check your logs, there might be also a reason why your POJO is
> used as a generic type. I assume because you declared `implements
> Serializable` which forces Flink to think "user wants to deal with
> serialization of the POJO himself".
>
> Regards,
> Timo
>
>
> On 19.02.20 14:24, Chris Stevens wrote:
> > Thanks again Timo, I hope I replied correctly this time.
> >
> > As per my previous message the Sensor class is a very simple POJO type
> > (I think).
> >
> > When the serialization trace talks about PGSql stuff it makes me think
> > that something from my operator is being included in serialization. Not
> > just the Sensor object itself which I am explicitly including in state.
> >
> > packagesensingfeeling.functions.mapping;
> >
> > publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>,
> > TypeA> {
> >
> > privatestaticfinallongserialVersionUID= 8582433437601788991L;
> >
> > privatetransientValueState<Sensor> sensorState;
> >
> > @Override
> > publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths)
> > throwsJsonProcessingException{
> >
> > Sensorsensor= sensorState.value();
> > if(sensor == null) {
> > LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
> > sensor = getSensor(frame);
> > sensorState.update(sensor);
> > }
> >
> > returnnewTypeA();
> > }
> >
> > @Override
> > publicvoidopen(Configurationconfig) {
> > LOG.debug("Sensor open method called", config);
> >
> > StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
> > .cleanupInBackground()
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
> >
> > ValueStateDescriptor<Sensor> sensorStateDescriptor=
> > newValueStateDescriptor<>( "sensor",
> > TypeInformation.of(newTypeHint<Sensor>(){}));
> > // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
> > sensorState = getRuntimeContext().getState(sensorStateDescriptor);
> >
> > }
> >
> > privateSensorgetSensor(TypeBframe) throwsException{
> >
> > Class.forName("org.postgresql.Driver");
> > try(Connectioncon= DriverManager.getConnection(dbURL, dbUser,
> dbPassword);
> > Statementst= con.createStatement();
> > ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+
> > frame.sensorId+ "'")) {
> >
> > if(rs.next()) {
> > Sensorsensor= newSensor() {};
> >
> > LOG.debug("Got sensor"+ sensor);
> >
> > returnsensor;
> > }
> >
> > } catch(SQLExceptionex) {
> > LOG.error("Error when connection postgres", ex);
> > throwex;
> > }
> >
> > returnnull;
> > }
> >
> > }
> >
> > Above is a cut down version of my operator, I'm guessing it is the
> > ResultSet rs that is getting serialized. How do I prevent this
> > undesirable behaviour? I'm quite happy for my solution to serialize only
> > what I explicitly tell it to, I don't need exactly once or anything.
> >
> > Many thanks,
> > Chris Stevens
> > Head of Research & Development
> > +44 7565 034 595
> >
> >
> > On Wed, 19 Feb 2020 at 12:19, Timo Walther <twal...@apache.org
> > <mailto:twal...@apache.org>> wrote:
> >
> >     Hi Chris,
> >
> >     [forwarding the private discussion to the mailing list again]
> >
> >     first of all, are you sure that your Sensor class is either a
> top-level
> >     class or a static inner class. Because it seems there is way more
> stuff
> >     in it (maybe included by accident transitively?). Such as:
> >
> >     org.apache.logging.log4j.core.layout.AbstractCsvLayout
> >            > Serialization trace:
> >            > classes (sun.misc.Launcher$AppClassLoader)
> >            > classloader (java.security.ProtectionDomain)
> >            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
> >            > combiner (java.security.AccessControlContext)
> >            > acc (sun.security.ssl.SSLSocketImpl)
> >            > connection (org.postgresql.core.PGStream)
> >            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> >            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> >            > commitQuery (org.postgresql.jdbc.PgConnection)
> >            > connection (org.postgresql.jdbc.PgResultSet)
> >            > val$rs
> >
> >     When declaring state you can use
> >
>  `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
> >
> >     check if your state is a POJO type.
> >
> >     Regards,
> >     Timo
> >
> >
> >     -------- Forwarded Message --------
> >     Subject:        Re: Updating ValueState not working in hosted Kinesis
> >     Date:   Wed, 19 Feb 2020 12:02:16 +0000
> >     From:   Chris Stevens <ch...@sensingfeeling.com
> >     <mailto:ch...@sensingfeeling.com>>
> >     To:     Timo Walther <twal...@apache.org <mailto:twal...@apache.org
> >>
> >
> >
> >
> >     Hi Timo,
> >
> >     Thanks for your reply. This makes sense to me, how do I treat
> something
> >     as a POJO instead of a generic serialized BB type? Sorry relatively
> new
> >     to Java and Flink.
> >
> >     This is my full class def:
> >
> >     package sensingfeeling.models;
> >     import java.io.Serializable;
> >
> >     public class Sensor implements Serializable {
> >
> >            private static final long serialVersionUID =
> >     8582433437601788991L;
> >            public String sensorId;
> >            public String companyId;
> >            public String label;
> >            // public Date createdAt;
> >            // public Date updatedAt;
> >            public Integer uncomfortableFaceLimit;
> >            public Boolean online;
> >            public String capabilityId;
> >            // public Date lastOnlineAt;
> >            // public Date lastOfflineAt;
> >            public Integer onlineVersionNumber;
> >            public int status;
> >            @Override
> >            public String toString(){
> >                return this.sensorId + " - " + this.label;
> >            }
> >     }
> >
> >     Super simple really.
> >
> >     I'm not trying to upgrade anything as far as I know. Just making an
> >     operator state aware.
> >
> >     Many thanks,
> >     Chris Stevens
> >     Head of Research & Development
> >     +44 7565 034 595
> >
> >
> >     On Wed, 19 Feb 2020 at 11:55, Timo Walther <twal...@apache.org
> >     <mailto:twal...@apache.org>
> >     <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
> >
> >           Hi Chris,
> >
> >           it seems there are field serialized into state that actually
> don't
> >           belong there. You should aim to treat Sensor as a POJO instead
> >     of a
> >           Kryo
> >           generic serialized black-box type.
> >
> >           Furthermore, it seems that field such as
> >           "org.apache.logging.log4j.core.layout.AbstractCsvLayout"
> >     should not be
> >           state. Is there a "transient" keyword missing?
> >
> >           Are you trying to upgrade your job or the Flink version?
> >
> >           Regards,
> >           Timo
> >
> >
> >
> >           On 18.02.20 18:59, Chris Stevens wrote:
> >            > Hi there,
> >            >
> >            > I'm trying to update state in one of my applications hosted
> in
> >           Kinesis
> >            > Data Analytics.
> >            >
> >            > private transient ValueState<Sensor> sensorState;
> >            > using sensorState.update(sensor);
> >            >
> >            > Get error:
> >            >
> >            > An error occurred:
> org.apache.flink.util.FlinkRuntimeException:
> >           Error
> >            > while adding data to RocksDB
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
> >            > at
> >            >
> >
> >
>  
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
> >            > at
> >            >
> >
> >
>  
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
> >            > at
> >            > org.apache.flink.streaming.runtime.io
> >     <http://org.apache.flink.streaming.runtime.io>
> >
> >     <http://runtime.io
> >.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> >            > at
> >            > org.apache.flink.streaming.runtime.io
> >     <http://org.apache.flink.streaming.runtime.io>
> >
> >     <http://runtime.io
> >.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
> >            > at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
> >            > at java.lang.Thread.run(Thread.java:748)
> >            > Caused by: com.esotericsoftware.kryo.KryoException:
> >            > java.lang.IllegalArgumentException: Unable to create
> serializer
> >            > "com.esotericsoftware.kryo.serializers.FieldSerializer" for
> >     class:
> >            > org.apache.logging.log4j.core.layout.AbstractCsvLayout
> >            > Serialization trace:
> >            > classes (sun.misc.Launcher$AppClassLoader)
> >            > classloader (java.security.ProtectionDomain)
> >            > cachedPDs (javax.security.auth.SubjectDomainCombiner)
> >            > combiner (java.security.AccessControlContext)
> >            > acc (sun.security.ssl.SSLSocketImpl)
> >            > connection (org.postgresql.core.PGStream)
> >            > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> >            > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> >            > commitQuery (org.postgresql.jdbc.PgConnection)
> >            > connection (org.postgresql.jdbc.PgResultSet)
> >            > val$rs
> >            >
> >
> >
>  (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at
> >     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at
> >     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> >            > at
> >     com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
> >            > at
> >            >
> >
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
> >            > ... 20 more
> >            > Caused by: java.lang.IllegalArgumentException: Unable to
> create
> >            > serializer
> >           "com.esotericsoftware.kryo.serializers.FieldSerializer" for
> >            > class:
> org.apache.logging.log4j.core.layout.AbstractCsvLayout
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
> >            > at
> >     com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
> >            > at
> >     com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
> >            > at
> >     com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
> >            > at
> >     com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
> >            > at
> >     com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> >            > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
> >            > at
> >     com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> >            > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> >            > at
> >            >
> >
> >
>  com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >            > ... 62 more
> >            > Caused by: java.lang.reflect.InvocationTargetException
> >            > at
> >     sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
> >           Source)
> >            > at
> >            >
> >
> >
>  
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >            > at
> >     java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
> >            > ... 78 more
> >            > Caused by: java.lang.NoClassDefFoundError:
> >            > Lorg/apache/commons/csv/CSVFormat;
> >            > at java.lang.Class.getDeclaredFields0(Native Method)
> >            > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
> >            > at java.lang.Class.getDeclaredFields(Class.java:1916)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
> >            > at
> >            >
> >
> >
>  
> com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
> >            > ... 82 more
> >            > Caused by: java.lang.ClassNotFoundException:
> >            > org.apache.commons.csv.CSVFormat
> >            > at
> java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> >            > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> >            > at
> >     sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> >            > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> >            > ... 88 more
> >            >
> >            > Any help would be great. I tried manually including
> >     CSVFormat from
> >            > apache commons but didn't change anything.
> >            >
> >            > Many thanks,
> >            > Chris Stevens
> >            > Head of Research & Development
> >            > +44 7565 034 595
> >
>
>

Reply via email to