Thanks for quick response. Please find attached a minimal example illustrating the issue. I've added implicit TypeInformation, and checked that I'm importing the scala variant only.
Matthias: Just my superficial impression from [1]. Will look into TypeInfoFactory. Thanks again! package com.mystuff import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeInformation} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector case class TestCaseClass(id: String, pos: Int) class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] { implicit val ttestclass: TypeInformation[TestCaseClass] = createTypeInformation[TestCaseClass] lazy val myState: MapState[String, TestCaseClass] = getRuntimeContext.getMapState( new MapStateDescriptor[String, TestCaseClass]("test-state", classOf[String], ttestclass.getTypeClass) ) override def flatMap(value: TestCaseClass, out: Collector[String]): Unit = { myState.put(value.id, value) myState.get(value.id) out.collect(value.id) } } object TestJob { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironment() env.getConfig.disableGenericTypes() val s = Seq[TestCaseClass]( TestCaseClass(id = "1", pos = 1), TestCaseClass(id = "2", pos = 2), TestCaseClass(id = "3", pos = 3), ) env .fromCollection[TestCaseClass](s) .keyBy(s => s.id) .flatMap(new MyRichFlatMap) .print() env.execute("Test Job") } } [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov <g...@dfdx.me> wrote: > Hi Lars, > > can you please show a small reproducer of the way you construct the > DataStream, and which imports do you use? > > We also often experience similar performance issues with scala, but > usually they are related to accidental usage of Flink Java API. A couple of > hints from my experience: > 1. Make sure that you always use the scala DataStream, and not the java > one. > 2. All operations on scala datastream require an implicit > TypeInformation[T] parameter, which is usually generated automatically for > you if you do an "import org.apache.flink.api.scala._" by the > createTypeInformation[T] macro. So make sure you have this import present. > 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw > an exception each time it have to fall back to generic kryo serialization. > Backtrace will highlight you an exact place in your code where it have to > do a kryo fallback. > > Also Flink will always revert to Kryo in case if you use sum types (or > ADTs, or "sealed traits"). Shameless plug: we made a library to support > that: https://github.com/findify/flink-adt > > Roman Grebennikov | g...@dfdx.me > > > On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote: > > Hi Lars, > not sure about the out-of-the-box support for case classes with primitive > member types (could you refer to the section which made you conclude > this?). I haven't used Scala with Flink, yet. So maybe, others can give > more context. > But have you looked into using the TypeInfoFactory to define the schema > [1]? > > Best, > Matthias > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory > > On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <lar...@gmail.com> wrote: > > Hello, > We're running Flink 1.14 with scala, and we're suspecting that performance > is suffering due to serialization of some scala case classes. Specifically > we're seeing that our Case Class "cannot be used as a POJO type because not > all fields are valid POJO fields, and must be processed as GenericType", > and that the case class "does not contain a setter for field X". I'm > interpreting these log messages as performance warnings. > > A simple case class example we're writing to state that triggers the > mentioned 'warnings': > > case class Progress(position: Int, eventTime: Int, alive: Boolean) > > I'm understanding the docs that case classes with primitive types should > be supported "out of the box". > > Any tips on how to proceed ? > > Kind regards, > Lars > > > >