Thanks .. I tried this .. DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo());
But still get the same error on this line .. (BTW I am not sure how to invoke returns on a DataStream and hence had to do a fake map - any suggestions here ?) regards. On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> wrote: > Hi Debasish, > > I think the error refers to the output of your source instead of your > result of the map function. E.g. > DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new > TypeInformation<Data>);* > DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())) > .returns(new TypeHint<Simple>(){}.getTypeInfo()); > > -- > Rong > > On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Hello - >> >> I have the following call to addSource where I pass a Custom >> SourceFunction .. >> >> env.<Data>addSource( >> new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new >> TypeHint<Data>(){})) >> ) >> >> where data is List<Data> and CollectionSourceFunctionJ is a Scala case >> class .. >> >> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: >> TypeInformation[T]) extends SourceFunction[T] { >> def cancel(): Unit = {} >> def run(ctx: SourceContext[T]): Unit = { >> data.asScala.foreach(d ⇒ ctx.collect(d)) >> } >> } >> >> When the following transformation runs .. >> >> DataStream<Data> ins = readStream(in, Data.class, serdeData); >> DataStream<Simple> simples = ins.map((Data d) -> new >> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >> >> I get the following exception in the second line .. >> >> org.apache.flink.api.common.functions.InvalidTypesException: The return >>> type of function 'Custom Source' could not be determined automatically, due >>> to type erasure. You can give type information hints by using the >>> returns(...) method on the result of the transformation call, or by letting >>> your function implement the 'ResultTypeQueryable' interface. >> >> >> Initially the returns call was not there and I was getting the same >> exception. Now after adding the returns call, nothing changes. >> >> Any help will be appreciated .. >> >> regards. >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg