Hi,

I'm having trouble integrating existing Scala code with Flink, due to POJO-only 
requirement. 

We're using AnyVal heavily for type safety, and immutable classes as a default. 
For example, the following does not work:

object Test {
  class Id(val underlying: Int) extends AnyVal

  class X(var id: Id) {
    def this() { this(new Id(0)) } 
  }

  class MySource extends SourceFunction[X] {
    def run(ctx: SourceFunction.SourceContext[X]) {
      ctx.collect(new X(new Id(1)))
    }
    def cancel() {}
  }

  def main(args: Array[String]) {
    val env = StreamExecutionContext.getExecutionContext
    env.addSource(new MySource).print
    env.execute("Test")
  }
}

Currently I'm thinking that I would need to have duplicate classes and code for 
Flint and for non-Flint code, or somehow use immutable interfaces for non-Flint 
code. Both ways are expensive in terms of development time. 

Would you have any guidance on how to integrate Flink with a code base that has 
immutability as a norm?

Thanks

Reply via email to