Hi James, the TypeInformation must be available at the call site, not in the case class definition. In your WindowFunction you are using a TestGen[String] so it should suffice to add this line at some point before the call to apply():
implicit val testGenType = createTypeInformation[TestGen[String]] Hope that helps. Best, Aljoscha On Wed, 1 Jun 2016 at 20:11 James Bucher <jbuc...@expedia.com> wrote: > Hi, > > I have been trying to get a case class with a generic parameter working > with Filnk 1.0.3 and have been having some trouble. However when I compile > I get the following error: > debug-type-bug/src/main/scala/com/example/flink/jobs/CaseClassWithGeneric.scala:40: > error: could not find implicit value for evidence parameter of type > org.apache.flink.api.common.typeinfo.TypeInformation[com.example.flink.jobs.CaseClassWithGeneric.TestGen[String]] > [ERROR] .apply(new AggregateOrigins) > > I am importing org.apache.flink.api.scala._ and the generic type is > defined as [T: TypeInformation] as suggested here: > https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html > > > The full code for the program is as follows: > > package com.example.flink.jobs > > import java.util.{Properties} > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > import org.apache.flink.util.Collector > import org.apache.flink.streaming.api.scala.function.WindowFunction > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer08} > import org.apache.flink.api.scala._ > > object CaseClassWithGeneric { > case class TestGen[T: TypeInformation](item: T) {} > > class AggregateOrigins extends WindowFunction[String, TestGen[String], > String, TimeWindow] { > def apply(key: String, win: TimeWindow, values: Iterable[String], col: > Collector[TestGen[String]]): Unit = { > values.foreach(x => { }) > col.collect(new TestGen[String]("Foo")) > } > } > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val properties = new Properties(); > val messageStream = env.addSource( > new FlinkKafkaConsumer08("topic", new SimpleStringSchema, properties)) > .keyBy(s => s) > .timeWindow(Time.days(1)) > .apply(new AggregateOrigins) > messageStream.print() > env.execute("Simple Job") > } > } > > When I dug into the apply() function definition I found the following: > > def apply[R: TypeInformation]( > function: WindowFunction[T, R, K, W]): DataStream[R] = { > > val cleanFunction = clean(function) > val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, > W](cleanFunction) > asScalaStream(javaStream.apply(applyFunction, > implicitly[TypeInformation[R]])) > } > > As Far as I can tell TestGen[String] should correspond to [R: > TypeInformation] in apply. Am I missing something or is it not possible to > define case class with a generic parameter? > > Thanks, > > James Bucher > >