Hi James,
the TypeInformation must be available at the call site, not in the case
class definition. In your WindowFunction you are using a TestGen[String] so
it should suffice to add this line at some point before the call to apply():

implicit val testGenType = createTypeInformation[TestGen[String]]

Hope that helps.

Best,
Aljoscha

On Wed, 1 Jun 2016 at 20:11 James Bucher <jbuc...@expedia.com> wrote:

> Hi,
>
> I have been trying to get a case class with a generic parameter working
> with Filnk 1.0.3 and have been having some trouble. However when I compile
> I get the following error:
> debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40:
> error: could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]]
> [ERROR]           .apply(new AggregateOrigins)
>
> I am importing org.apache.flink.api.scala._ and the generic type is
> defined as [T: TypeInformation] as suggested here:
> https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html
>
>
> The full code for the program is as follows:
>
> package com.example.flink.jobs
>
> import java.util.{Properties}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.util.Collector
> import org.apache.flink.streaming.api.scala.function.WindowFunction
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08}
> import org.apache.flink.api.scala._
>
> object CaseClassWithGeneric {
>   case class TestGen[T: TypeInformation](item: T) {}
>
>   class AggregateOrigins extends WindowFunction[String, TestGen[String], 
> String, TimeWindow] {
>     def apply(key: String, win: TimeWindow, values: Iterable[String], col: 
> Collector[TestGen[String]]): Unit = {
>       values.foreach(x => { })
>       col.collect(new TestGen[String]("Foo"))
>     }
>   }
>
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val properties = new Properties();
>     val messageStream = env.addSource(
>       new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties))
>           .keyBy(s => s)
>           .timeWindow(Time.days(1))
>           .apply(new AggregateOrigins)
>     messageStream.print()
>     env.execute("Simple Job")
>   }
> }
>
> When I dug into the apply() function definition I found the following:
>
> def apply[R: TypeInformation](
>     function: WindowFunction[T, R, K, W]): DataStream[R] = {
>
>   val cleanFunction = clean(function)
>   val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, 
> W](cleanFunction)
>   asScalaStream(javaStream.apply(applyFunction, 
> implicitly[TypeInformation[R]]))
> }
>
> As Far as I can tell TestGen[String] should correspond to [R: 
> TypeInformation] in apply. Am I missing something or is it not possible to 
> define case class with a generic parameter?
>
> Thanks,
>
> James Bucher
>
>

Reply via email to