Hi Debasish, I think the error refers to the output of your source instead of your result of the map function. E.g. DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new TypeInformation<Data>);* DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())) .returns(new TypeHint<Simple>(){}.getTypeInfo());
-- Rong On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hello - > > I have the following call to addSource where I pass a Custom > SourceFunction .. > > env.<Data>addSource( > new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new > TypeHint<Data>(){})) > ) > > where data is List<Data> and CollectionSourceFunctionJ is a Scala case > class .. > > case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: > TypeInformation[T]) extends SourceFunction[T] { > def cancel(): Unit = {} > def run(ctx: SourceContext[T]): Unit = { > data.asScala.foreach(d ⇒ ctx.collect(d)) > } > } > > When the following transformation runs .. > > DataStream<Data> ins = readStream(in, Data.class, serdeData); > DataStream<Simple> simples = ins.map((Data d) -> new > Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); > > I get the following exception in the second line .. > > org.apache.flink.api.common.functions.InvalidTypesException: The return >> type of function 'Custom Source' could not be determined automatically, due >> to type erasure. You can give type information hints by using the >> returns(...) method on the result of the transformation call, or by letting >> your function implement the 'ResultTypeQueryable' interface. > > > Initially the returns call was not there and I was getting the same > exception. Now after adding the returns call, nothing changes. > > Any help will be appreciated .. > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >