Hi

I’ve been trying to get state migration with Avro working on Flink 1.7.2 using 
Scala case classes but I’m not getting anywhere closer to solving it.

We’re using the most basic streaming WordCount example as a reference to test 
the schema evolution:

val wordCountStream: DataStream[WordWithCount] = text
 .flatMap { w => w.split("\\s") }
 .map { w => WordWithCount(w, 1) }
 .keyBy(_.word)
 .reduce((a, b) => WordWithCount(a.word, a.count + b.count))

In this example, WordWithCount is our data object that we’d like to have 
serialized and deserialized with schema evolution support since keyBy maintains 
state.

I understood from the documentation that it would only work for classes 
generated from Avro schema’s so I’ve tried using sbt-avrohugger to generate our 
case classes. However, for normal case classes generated by Avro we quickly ran 
into the problem that we needed a no-arg constructor.

We looked at the flink-avro module and noticed that the classes generated by 
the avro-maven-plugin were implementing SpecificRecord and seemed to comply 
with the POJO rules as described in the Flink documentation. After switching 
from normal to specific avro generation with sbt-avrohugger, we ended up with 
Scala case classes that should comply with all rules.

An example of such a generated case class is as follows:

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch

case class WordWithCount(var word: String, var count: Long) extends 
org.apache.avro.specific.SpecificRecordBase {
def this() = this("", 0L)
def get(field$: Int): AnyRef = {
   (field$: @switch) match {
     case 0 => {
       word
     }.asInstanceOf[AnyRef]
     case 1 => {
       count
     }.asInstanceOf[AnyRef]
     case _ => new org.apache.avro.AvroRuntimeException("Bad index")
   }
 }
def put(field$: Int, value: Any): Unit = {
   (field$: @switch) match {
     case 0 => this.word = {
       value.toString
     }.asInstanceOf[String]
     case 1 => this.count = {
       value
     }.asInstanceOf[Long]
     case _ => new org.apache.avro.AvroRuntimeException("Bad index")
   }
   ()
 }
def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
}

object WordWithCount {
val SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\":[{\"name\":\"word\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"long\"}]}")
}

This, however, also didn’t work out of the box. We then tried to define our own 
type information using flink-avro’s AvroTypeInfo but this fails because Avro 
looks for a SCHEMA$ property (SpecificData:285) in the class and is unable to 
use Java reflection to identify the SCHEMA$ in the Scala companion object.
implicit val wordWithCountInfo: AvroTypeInfo[WordWithCount] = new 
AvroTypeInfo(classOf[WordWithCount])
We then read in the 1.7 documentation that Flink doesn’t natively support POJO 
types, but only state defined by descriptors, like f.e. the 
ListStateDescriptor, and only if you allow Flink to infer the type information. 
This is definitely what we need for our processors that have map and list 
state. However, for the simple word count example, we should only need native 
POJO (de)serialization with state migration.

We then noticed Github PR #7759 that adds support for POJO state schema 
evolution/migration. We wanted to give this a try and built flink from source 
from the release-1.8 branch. We then included the 1.8-SNAPSHOT jars in our job 
and got a local 1.8 cluster and job running fine.

However, if we do not specify our own type information, and perform the 
following steps:


1. Run the job
2. Create a savepoint and stop the job
3. Update the WordWithCount avro schema to include a third field
4. Update the job according to the generated case class
5. Run the new job from the savepoint


We are then faced with the following error:

Caused by: java.lang.IllegalArgumentException: array is not of length 3 thrown 
from ScalaCaseClassSerializer.scala:50

However, if we again try to define our own type information using the 
AvroTypeInfo class, we are faced with the same issue as in 1.7.

What are we missing? The documentation on how to use this is very limited, and 
we’re getting the idea that it may work with Java types, but maybe not with 
Scala case classes. I’d love to hear some pointers on how to approach this? 
Compared to our solution in 1.4 
(https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399),
 we hoped to get rid of all the custom serializers by moving to 1.7

Thanks in advance!

Marc

Reply via email to