Thanks so much Timo, got it working now. All down to my lack of Java skill.
Many thanks, Chris Stevens Head of Research & Development +44 7565 034 595 On Wed, 19 Feb 2020 at 15:12, Timo Walther <twal...@apache.org> wrote: > Hi Chris, > > your observation is right. By `new Sensor() {}` instead of just `new > Sensor()` you are creating an anonymous non-static class that references > the outer method and class. > > If you check your logs, there might be also a reason why your POJO is > used as a generic type. I assume because you declared `implements > Serializable` which forces Flink to think "user wants to deal with > serialization of the POJO himself". > > Regards, > Timo > > > On 19.02.20 14:24, Chris Stevens wrote: > > Thanks again Timo, I hope I replied correctly this time. > > > > As per my previous message the Sensor class is a very simple POJO type > > (I think). > > > > When the serialization trace talks about PGSql stuff it makes me think > > that something from my operator is being included in serialization. Not > > just the Sensor object itself which I am explicitly including in state. > > > > packagesensingfeeling.functions.mapping; > > > > publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>, > > TypeA> { > > > > privatestaticfinallongserialVersionUID= 8582433437601788991L; > > > > privatetransientValueState<Sensor> sensorState; > > > > @Override > > publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths) > > throwsJsonProcessingException{ > > > > Sensorsensor= sensorState.value(); > > if(sensor == null) { > > LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId); > > sensor = getSensor(frame); > > sensorState.update(sensor); > > } > > > > returnnewTypeA(); > > } > > > > @Override > > publicvoidopen(Configurationconfig) { > > LOG.debug("Sensor open method called", config); > > > > StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1)) > > .cleanupInBackground() > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build(); > > > > ValueStateDescriptor<Sensor> sensorStateDescriptor= > > newValueStateDescriptor<>( "sensor", > > TypeInformation.of(newTypeHint<Sensor>(){})); > > // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig); > > sensorState = getRuntimeContext().getState(sensorStateDescriptor); > > > > } > > > > privateSensorgetSensor(TypeBframe) throwsException{ > > > > Class.forName("org.postgresql.Driver"); > > try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, > dbPassword); > > Statementst= con.createStatement(); > > ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+ > > frame.sensorId+ "'")) { > > > > if(rs.next()) { > > Sensorsensor= newSensor() {}; > > > > LOG.debug("Got sensor"+ sensor); > > > > returnsensor; > > } > > > > } catch(SQLExceptionex) { > > LOG.error("Error when connection postgres", ex); > > throwex; > > } > > > > returnnull; > > } > > > > } > > > > Above is a cut down version of my operator, I'm guessing it is the > > ResultSet rs that is getting serialized. How do I prevent this > > undesirable behaviour? I'm quite happy for my solution to serialize only > > what I explicitly tell it to, I don't need exactly once or anything. > > > > Many thanks, > > Chris Stevens > > Head of Research & Development > > +44 7565 034 595 > > > > > > On Wed, 19 Feb 2020 at 12:19, Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Chris, > > > > [forwarding the private discussion to the mailing list again] > > > > first of all, are you sure that your Sensor class is either a > top-level > > class or a static inner class. Because it seems there is way more > stuff > > in it (maybe included by accident transitively?). Such as: > > > > org.apache.logging.log4j.core.layout.AbstractCsvLayout > > > Serialization trace: > > > classes (sun.misc.Launcher$AppClassLoader) > > > classloader (java.security.ProtectionDomain) > > > cachedPDs (javax.security.auth.SubjectDomainCombiner) > > > combiner (java.security.AccessControlContext) > > > acc (sun.security.ssl.SSLSocketImpl) > > > connection (org.postgresql.core.PGStream) > > > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > > > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > > > commitQuery (org.postgresql.jdbc.PgConnection) > > > connection (org.postgresql.jdbc.PgResultSet) > > > val$rs > > > > When declaring state you can use > > > `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to > > > > check if your state is a POJO type. > > > > Regards, > > Timo > > > > > > -------- Forwarded Message -------- > > Subject: Re: Updating ValueState not working in hosted Kinesis > > Date: Wed, 19 Feb 2020 12:02:16 +0000 > > From: Chris Stevens <ch...@sensingfeeling.com > > <mailto:ch...@sensingfeeling.com>> > > To: Timo Walther <twal...@apache.org <mailto:twal...@apache.org > >> > > > > > > > > Hi Timo, > > > > Thanks for your reply. This makes sense to me, how do I treat > something > > as a POJO instead of a generic serialized BB type? Sorry relatively > new > > to Java and Flink. > > > > This is my full class def: > > > > package sensingfeeling.models; > > import java.io.Serializable; > > > > public class Sensor implements Serializable { > > > > private static final long serialVersionUID = > > 8582433437601788991L; > > public String sensorId; > > public String companyId; > > public String label; > > // public Date createdAt; > > // public Date updatedAt; > > public Integer uncomfortableFaceLimit; > > public Boolean online; > > public String capabilityId; > > // public Date lastOnlineAt; > > // public Date lastOfflineAt; > > public Integer onlineVersionNumber; > > public int status; > > @Override > > public String toString(){ > > return this.sensorId + " - " + this.label; > > } > > } > > > > Super simple really. > > > > I'm not trying to upgrade anything as far as I know. Just making an > > operator state aware. > > > > Many thanks, > > Chris Stevens > > Head of Research & Development > > +44 7565 034 595 > > > > > > On Wed, 19 Feb 2020 at 11:55, Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org> > > <mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote: > > > > Hi Chris, > > > > it seems there are field serialized into state that actually > don't > > belong there. You should aim to treat Sensor as a POJO instead > > of a > > Kryo > > generic serialized black-box type. > > > > Furthermore, it seems that field such as > > "org.apache.logging.log4j.core.layout.AbstractCsvLayout" > > should not be > > state. Is there a "transient" keyword missing? > > > > Are you trying to upgrade your job or the Flink version? > > > > Regards, > > Timo > > > > > > > > On 18.02.20 18:59, Chris Stevens wrote: > > > Hi there, > > > > > > I'm trying to update state in one of my applications hosted > in > > Kinesis > > > Data Analytics. > > > > > > private transient ValueState<Sensor> sensorState; > > > using sensorState.update(sensor); > > > > > > Get error: > > > > > > An error occurred: > org.apache.flink.util.FlinkRuntimeException: > > Error > > > while adding data to RocksDB > > > at > > > > > > > > > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108) > > > at > > > > > > > > > org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50) > > > at > > > > > > > > > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97) > > > at > > > > > > > > > sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48) > > > at > > > > > > > > > org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460) > > > at > > > > > > > > > org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > > > at > > > > > > > > > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255) > > > at > > > > > > > > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > > > at > > > > > > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) > > > at > > > org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > > <http://runtime.io > >.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > > > at > > > org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io> > > > > <http://runtime.io > >.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:714) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: com.esotericsoftware.kryo.KryoException: > > > java.lang.IllegalArgumentException: Unable to create > serializer > > > "com.esotericsoftware.kryo.serializers.FieldSerializer" for > > class: > > > org.apache.logging.log4j.core.layout.AbstractCsvLayout > > > Serialization trace: > > > classes (sun.misc.Launcher$AppClassLoader) > > > classloader (java.security.ProtectionDomain) > > > cachedPDs (javax.security.auth.SubjectDomainCombiner) > > > combiner (java.security.AccessControlContext) > > > acc (sun.security.ssl.SSLSocketImpl) > > > connection (org.postgresql.core.PGStream) > > > pgStream (org.postgresql.core.v3.QueryExecutorImpl) > > > transferModeRegistry (org.postgresql.core.v3.SimpleQuery) > > > commitQuery (org.postgresql.jdbc.PgConnection) > > > connection (org.postgresql.jdbc.PgResultSet) > > > val$rs > > > > > > > > (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at > > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at > > com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) > > > at > > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > > > at > > > > > > > > > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) > > > at > > > > > > > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362) > > > at > > > > > > > > > org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142) > > > at > > > > > > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158) > > > at > > > > > > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178) > > > at > > > > > > > > > org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167) > > > at > > > > > > > > > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106) > > > ... 20 more > > > Caused by: java.lang.IllegalArgumentException: Unable to > create > > > serializer > > "com.esotericsoftware.kryo.serializers.FieldSerializer" for > > > class: > org.apache.logging.log4j.core.layout.AbstractCsvLayout > > > at > > > > > > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48) > > > at > > > > > > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26) > > > at > > com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351) > > > at > > com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58) > > > at > > com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344) > > > at > > > > > > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) > > > at > > com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461) > > > at > > com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52) > > > at > > > > > > > > > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > > > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232) > > > at > > com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) > > > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) > > > at > > > > > > > > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) > > > ... 62 more > > > Caused by: java.lang.reflect.InvocationTargetException > > > at > > sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown > > Source) > > > at > > > > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > > at > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > > at > > > > > > > > > com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35) > > > ... 78 more > > > Caused by: java.lang.NoClassDefFoundError: > > > Lorg/apache/commons/csv/CSVFormat; > > > at java.lang.Class.getDeclaredFields0(Native Method) > > > at java.lang.Class.privateGetDeclaredFields(Class.java:2583) > > > at java.lang.Class.getDeclaredFields(Class.java:1916) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156) > > > at > > > > > > > > > com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133) > > > ... 82 more > > > Caused by: java.lang.ClassNotFoundException: > > > org.apache.commons.csv.CSVFormat > > > at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) > > > at > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:351) > > > ... 88 more > > > > > > Any help would be great. I tried manually including > > CSVFormat from > > > apache commons but didn't change anything. > > > > > > Many thanks, > > > Chris Stevens > > > Head of Research & Development > > > +44 7565 034 595 > > > >