Hi, I guess the problematic line where the kryo fallback is happening is here:
lazy val myState: MapState[String, TestCaseClass] = getRuntimeContext.getMapState( new MapStateDescriptor[String, TestCaseClass]("test-state", classOf[String], ttestclass.getTypeClass) MapStateDescriptor has multiple constructors, some of them do have strong java smell :) The one you've used here with classOf[String] - is passing a class instance inside of the java constructor, and the constructor implicitly uses java typeinformation derivation under the hood, which has no idea about scala. MapStateDescriptor also has another constructor, which can take the explicit TypeInformation for key and value, like this: val keyTypeInfo = createTypeInformation[String] val valueTypeInfo = createTypeInformation[TestCaseClass] new MapStateDescriptor[String,TestCaseClass]("test", keyTypeInfo, valueTypeInfo) then it won't try to behave too smart, won't try to derive typeinfo for Class[_] and will use the one you provided. with best regards, Roman Grebennikov | g...@dfdx.me On Tue, Dec 7, 2021, at 19:05, Lars Skjærven wrote: > Thanks for quick response. Please find attached a minimal example > illustrating the issue. I've added implicit TypeInformation, and checked that > I'm importing the scala variant only. > > Matthias: Just my superficial impression from [1]. Will look into > TypeInfoFactory. > > Thanks again! > > package com.mystuff > import org.apache.flink.api.common.functions.RichFlatMapFunction > import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} > import org.apache.flink.api.common.typeinfo.{TypeInformation} > import org.apache.flink.api.scala._ > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.util.Collector > > case class TestCaseClass(id: String, pos: Int) > > class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] { > implicit val ttestclass: TypeInformation[TestCaseClass] = > createTypeInformation[TestCaseClass] > > lazy val myState: MapState[String, TestCaseClass] = > getRuntimeContext.getMapState( > new MapStateDescriptor[String, TestCaseClass]("test-state", > classOf[String], ttestclass.getTypeClass) > ) > > override def flatMap(value: TestCaseClass, out: Collector[String]): Unit = { > myState.put(value.id, value) > myState.get(value.id) > out.collect(value.id) > } > } > > object TestJob { > > def main(args: Array[String]): Unit = { > > val env = StreamExecutionEnvironment.createLocalEnvironment() > env.getConfig.disableGenericTypes() > > val s = Seq[TestCaseClass]( > TestCaseClass(id = "1", pos = 1), > TestCaseClass(id = "2", pos = 2), > TestCaseClass(id = "3", pos = 3), > ) > > env > .fromCollection[TestCaseClass](s) > .keyBy(s => s.id) > .flatMap(new MyRichFlatMap) > .print() > > env.execute("Test Job") > } > } > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory > > On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov <g...@dfdx.me> wrote: >> __ >> Hi Lars, >> >> can you please show a small reproducer of the way you construct the >> DataStream, and which imports do you use? >> >> We also often experience similar performance issues with scala, but usually >> they are related to accidental usage of Flink Java API. A couple of hints >> from my experience: >> 1. Make sure that you always use the scala DataStream, and not the java one. >> 2. All operations on scala datastream require an implicit TypeInformation[T] >> parameter, which is usually generated automatically for you if you do an >> "import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. >> So make sure you have this import present. >> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an >> exception each time it have to fall back to generic kryo serialization. >> Backtrace will highlight you an exact place in your code where it have to do >> a kryo fallback. >> >> Also Flink will always revert to Kryo in case if you use sum types (or ADTs, >> or "sealed traits"). Shameless plug: we made a library to support that: >> https://github.com/findify/flink-adt >> >> Roman Grebennikov | g...@dfdx.me >> >> >> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote: >>> Hi Lars, >>> not sure about the out-of-the-box support for case classes with primitive >>> member types (could you refer to the section which made you conclude >>> this?). I haven't used Scala with Flink, yet. So maybe, others can give >>> more context. >>> But have you looked into using the TypeInfoFactory to define the schema [1]? >>> >>> Best, >>> Matthias >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory >>> >>> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <lar...@gmail.com> wrote: >>>> Hello, >>>> We're running Flink 1.14 with scala, and we're suspecting that performance >>>> is suffering due to serialization of some scala case classes. Specifically >>>> we're seeing that our Case Class "cannot be used as a POJO type because >>>> not all fields are valid POJO fields, and must be processed as >>>> GenericType", and that the case class "does not contain a setter for field >>>> X". I'm interpreting these log messages as performance warnings. >>>> >>>> A simple case class example we're writing to state that triggers the >>>> mentioned 'warnings': >>>> >>>> case class Progress(position: Int, eventTime: Int, alive: Boolean) >>>> >>>> I'm understanding the docs that case classes with primitive types should >>>> be supported "out of the box". >>>> >>>> Any tips on how to proceed ? >>>> >>>> Kind regards, >>>> Lars >>> >>