Hi Chris,
your observation is right. By `new Sensor() {}` instead of just `new
Sensor()` you are creating an anonymous non-static class that references
the outer method and class.
If you check your logs, there might be also a reason why your POJO is
used as a generic type. I assume because you declared `implements
Serializable` which forces Flink to think "user wants to deal with
serialization of the POJO himself".
Regards,
Timo
On 19.02.20 14:24, Chris Stevens wrote:
Thanks again Timo, I hope I replied correctly this time.
As per my previous message the Sensor class is a very simple POJO type
(I think).
When the serialization trace talks about PGSql stuff it makes me think
that something from my operator is being included in serialization. Not
just the Sensor object itself which I am explicitly including in state.
packagesensingfeeling.functions.mapping;
publicfinalclassArbJoinFunctionextendsRichJoinFunction<TypeB, TypeC>,
TypeA> {
privatestaticfinallongserialVersionUID= 8582433437601788991L;
privatetransientValueState<Sensor> sensorState;
@Override
publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths)
throwsJsonProcessingException{
Sensorsensor= sensorState.value();
if(sensor == null) {
LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
sensor = getSensor(frame);
sensorState.update(sensor);
}
returnnewTypeA();
}
@Override
publicvoidopen(Configurationconfig) {
LOG.debug("Sensor open method called", config);
StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
ValueStateDescriptor<Sensor> sensorStateDescriptor=
newValueStateDescriptor<>( "sensor",
TypeInformation.of(newTypeHint<Sensor>(){}));
// sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
sensorState = getRuntimeContext().getState(sensorStateDescriptor);
}
privateSensorgetSensor(TypeBframe) throwsException{
Class.forName("org.postgresql.Driver");
try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword);
Statementst= con.createStatement();
ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+
frame.sensorId+ "'")) {
if(rs.next()) {
Sensorsensor= newSensor() {};
LOG.debug("Got sensor"+ sensor);
returnsensor;
}
} catch(SQLExceptionex) {
LOG.error("Error when connection postgres", ex);
throwex;
}
returnnull;
}
}
Above is a cut down version of my operator, I'm guessing it is the
ResultSet rs that is getting serialized. How do I prevent this
undesirable behaviour? I'm quite happy for my solution to serialize only
what I explicitly tell it to, I don't need exactly once or anything.
Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595
On Wed, 19 Feb 2020 at 12:19, Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Chris,
[forwarding the private discussion to the mailing list again]
first of all, are you sure that your Sensor class is either a top-level
class or a static inner class. Because it seems there is way more stuff
in it (maybe included by accident transitively?). Such as:
org.apache.logging.log4j.core.layout.AbstractCsvLayout
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> cachedPDs (javax.security.auth.SubjectDomainCombiner)
> combiner (java.security.AccessControlContext)
> acc (sun.security.ssl.SSLSocketImpl)
> connection (org.postgresql.core.PGStream)
> pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> commitQuery (org.postgresql.jdbc.PgConnection)
> connection (org.postgresql.jdbc.PgResultSet)
> val$rs
When declaring state you can use
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
check if your state is a POJO type.
Regards,
Timo
-------- Forwarded Message --------
Subject: Re: Updating ValueState not working in hosted Kinesis
Date: Wed, 19 Feb 2020 12:02:16 +0000
From: Chris Stevens <ch...@sensingfeeling.com
<mailto:ch...@sensingfeeling.com>>
To: Timo Walther <twal...@apache.org <mailto:twal...@apache.org>>
Hi Timo,
Thanks for your reply. This makes sense to me, how do I treat something
as a POJO instead of a generic serialized BB type? Sorry relatively new
to Java and Flink.
This is my full class def:
package sensingfeeling.models;
import java.io.Serializable;
public class Sensor implements Serializable {
private static final long serialVersionUID =
8582433437601788991L;
public String sensorId;
public String companyId;
public String label;
// public Date createdAt;
// public Date updatedAt;
public Integer uncomfortableFaceLimit;
public Boolean online;
public String capabilityId;
// public Date lastOnlineAt;
// public Date lastOfflineAt;
public Integer onlineVersionNumber;
public int status;
@Override
public String toString(){
return this.sensorId + " - " + this.label;
}
}
Super simple really.
I'm not trying to upgrade anything as far as I know. Just making an
operator state aware.
Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595
On Wed, 19 Feb 2020 at 11:55, Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>
<mailto:twal...@apache.org <mailto:twal...@apache.org>>> wrote:
Hi Chris,
it seems there are field serialized into state that actually don't
belong there. You should aim to treat Sensor as a POJO instead
of a
Kryo
generic serialized black-box type.
Furthermore, it seems that field such as
"org.apache.logging.log4j.core.layout.AbstractCsvLayout"
should not be
state. Is there a "transient" keyword missing?
Are you trying to upgrade your job or the Flink version?
Regards,
Timo
On 18.02.20 18:59, Chris Stevens wrote:
> Hi there,
>
> I'm trying to update state in one of my applications hosted in
Kinesis
> Data Analytics.
>
> private transient ValueState<Sensor> sensorState;
> using sensorState.update(sensor);
>
> Get error:
>
> An error occurred: org.apache.flink.util.FlinkRuntimeException:
Error
> while adding data to RocksDB
> at
>
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
> at
>
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
> at
>
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
> at
>
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
> at
>
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
> at
>
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
> at
>
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
> at
>
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
> at
>
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> at
>
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
> at
> org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
<http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> at
>
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
>
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at
> org.apache.flink.streaming.runtime.io
<http://org.apache.flink.streaming.runtime.io>
<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
>
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Unable to create serializer
> "com.esotericsoftware.kryo.serializers.FieldSerializer" for
class:
> org.apache.logging.log4j.core.layout.AbstractCsvLayout
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> cachedPDs (javax.security.auth.SubjectDomainCombiner)
> combiner (java.security.AccessControlContext)
> acc (sun.security.ssl.SSLSocketImpl)
> connection (org.postgresql.core.PGStream)
> pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> commitQuery (org.postgresql.jdbc.PgConnection)
> connection (org.postgresql.jdbc.PgResultSet)
> val$rs
>
(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
>
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
> at
>
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
>
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> at
>
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
> at
>
org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
> at
>
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
> at
>
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
> at
>
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
> at
>
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
> ... 20 more
> Caused by: java.lang.IllegalArgumentException: Unable to create
> serializer
"com.esotericsoftware.kryo.serializers.FieldSerializer" for
> class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
> at
>
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
> at
>
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
> at
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
> at
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
> at
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
> at
>
com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
> at
com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
> at
com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
> at
>
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at
>
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
> at
>
com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
> at
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> at
>
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
> at
>
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
>
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 62 more
> Caused by: java.lang.reflect.InvocationTargetException
> at
sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
Source)
> at
>
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
>
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
> ... 78 more
> Caused by: java.lang.NoClassDefFoundError:
> Lorg/apache/commons/csv/CSVFormat;
> at java.lang.Class.getDeclaredFields0(Native Method)
> at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
> at java.lang.Class.getDeclaredFields(Class.java:1916)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
> at
>
com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
> ... 82 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.commons.csv.CSVFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 88 more
>
> Any help would be great. I tried manually including
CSVFormat from
> apache commons but didn't change anything.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595