Thanks for quick response. Please find attached a minimal example
illustrating the issue. I've added implicit TypeInformation, and checked
that I'm importing the scala variant only.

Matthias: Just my superficial impression from [1]. Will look into
TypeInfoFactory.

Thanks again!

package com.mystuff
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeInformation}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

case class TestCaseClass(id: String, pos: Int)

class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] {
  implicit val ttestclass: TypeInformation[TestCaseClass] =
createTypeInformation[TestCaseClass]

  lazy val myState: MapState[String, TestCaseClass] =
getRuntimeContext.getMapState(
    new MapStateDescriptor[String, TestCaseClass]("test-state",
classOf[String], ttestclass.getTypeClass)
  )

  override def flatMap(value: TestCaseClass, out: Collector[String]): Unit
= {
    myState.put(value.id, value)
    myState.get(value.id)
    out.collect(value.id)
  }
}

object TestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    env.getConfig.disableGenericTypes()

    val s = Seq[TestCaseClass](
      TestCaseClass(id = "1", pos = 1),
      TestCaseClass(id = "2", pos = 2),
      TestCaseClass(id = "3", pos = 3),
    )

    env
      .fromCollection[TestCaseClass](s)
      .keyBy(s => s.id)
      .flatMap(new MyRichFlatMap)
      .print()

    env.execute("Test Job")
  }
}

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov <g...@dfdx.me> wrote:

> Hi Lars,
>
> can you please show a small reproducer of the way you construct the
> DataStream, and which imports do you use?
>
> We also often experience similar performance issues with scala, but
> usually they are related to accidental usage of Flink Java API. A couple of
> hints from my experience:
> 1. Make sure that you always use the scala DataStream, and not the java
> one.
> 2. All operations on scala datastream require an implicit
> TypeInformation[T] parameter, which is usually generated automatically for
> you if you do an "import org.apache.flink.api.scala._" by the
> createTypeInformation[T] macro. So make sure you have this import present.
> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw
> an exception each time it have to fall back to generic kryo serialization.
> Backtrace will highlight you an exact place in your code where it have to
> do a kryo fallback.
>
> Also Flink will always revert to Kryo in case if you use sum types (or
> ADTs, or "sealed traits"). Shameless plug: we made a library to support
> that: https://github.com/findify/flink-adt
>
> Roman Grebennikov | g...@dfdx.me
>
>
> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
>
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive
> member types (could you refer to the section which made you conclude
> this?). I haven't used Scala with Flink, yet. So maybe, others can give
> more context.
> But have you looked into using the TypeInfoFactory to define the schema
> [1]?
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
>
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <lar...@gmail.com> wrote:
>
> Hello,
> We're running Flink 1.14 with scala, and we're suspecting that performance
> is suffering due to serialization of some scala case classes. Specifically
> we're seeing that our Case Class "cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType",
> and that the case class "does not contain a setter for field X". I'm
> interpreting these log messages as performance warnings.
>
> A simple case class example we're writing to state that triggers the
> mentioned 'warnings':
>
> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>
> I'm understanding the docs that case classes with primitive types should
> be supported "out of the box".
>
> Any tips on how to proceed ?
>
> Kind regards,
> Lars
>
>
>
>

Reply via email to