Hi Lars,

can you please show a small reproducer of the way you construct the DataStream, 
and which imports do you use?

We also often experience similar performance issues with scala, but usually 
they are related to accidental usage of Flink Java API. A couple of hints from 
my experience:
1. Make sure that you always use the scala DataStream, and not the java one.
2. All operations on scala datastream require an implicit TypeInformation[T] 
parameter, which is usually generated automatically for you if you do an 
"import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. So 
make sure you have this import present.
3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an 
exception each time it have to fall back to generic kryo serialization. 
Backtrace will highlight you an exact place in your code where it have to do a 
kryo fallback.

Also Flink will always revert to Kryo in case if you use sum types (or ADTs, or 
"sealed traits"). Shameless plug: we made a library to support that: 
https://github.com/findify/flink-adt

Roman Grebennikov | g...@dfdx.me


On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive 
> member types (could you refer to the section which made you conclude this?). 
> I haven't used Scala with Flink, yet. So maybe, others can give more context.
> But have you looked into using the TypeInfoFactory to define the schema [1]?
> 
> Best,
> Matthias
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
> 
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven <lar...@gmail.com> wrote:
>> Hello,
>> We're running Flink 1.14 with scala, and we're suspecting that performance 
>> is suffering due to serialization of some scala case classes. Specifically 
>> we're seeing that our Case Class "cannot be used as a POJO type because not 
>> all fields are valid POJO fields, and must be processed as GenericType", and 
>> that the case class "does not contain a setter for field X". I'm 
>> interpreting these log messages as performance warnings. 
>> 
>> A simple case class example we're writing to state that triggers the 
>> mentioned 'warnings': 
>> 
>> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>> 
>> I'm understanding the docs that case classes with primitive types should be 
>> supported "out of the box". 
>> 
>> Any tips on how to proceed ? 
>> 
>> Kind regards, 
>> Lars
> 

Reply via email to