Hi Debasish,

I think the error refers to the output of your source instead of your
result of the map function. E.g.
DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new
TypeInformation<Data>);*
DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName()))
.returns(new TypeHint<Simple>(){}.getTypeInfo());

--
Rong

On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hello -
>
> I have the following call to addSource where I pass a Custom
> SourceFunction ..
>
> env.<Data>addSource(
>   new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new
> TypeHint<Data>(){}))
> )
>
> where data is List<Data> and CollectionSourceFunctionJ is a Scala case
> class ..
>
> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
> TypeInformation[T]) extends SourceFunction[T] {
>   def cancel(): Unit = {}
>   def run(ctx: SourceContext[T]): Unit = {
>     data.asScala.foreach(d ⇒ ctx.collect(d))
>   }
> }
>
> When the following transformation runs ..
>
> DataStream<Data> ins = readStream(in, Data.class, serdeData);
> DataStream<Simple> simples = ins.map((Data d) -> new
> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>
> I get the following exception in the second line ..
>
> org.apache.flink.api.common.functions.InvalidTypesException: The return
>> type of function 'Custom Source' could not be determined automatically, due
>> to type erasure. You can give type information hints by using the
>> returns(...) method on the result of the transformation call, or by letting
>> your function implement the 'ResultTypeQueryable' interface.
>
>
> Initially the returns call was not there and I was getting the same
> exception. Now after adding the returns call, nothing changes.
>
> Any help will be appreciated ..
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to