Thanks .. I tried this ..

DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data d)
-> d).returns(new TypeHint<Data>(){}.getTypeInfo());

But still get the same error on this line ..

(BTW I am not sure how to invoke returns on a DataStream and hence had to
do a fake map - any suggestions here ?)

regards.

On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> wrote:

> Hi Debasish,
>
> I think the error refers to the output of your source instead of your
> result of the map function. E.g.
> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new
> TypeInformation<Data>);*
> DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName()))
> .returns(new TypeHint<Simple>(){}.getTypeInfo());
>
> --
> Rong
>
> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Hello -
>>
>> I have the following call to addSource where I pass a Custom
>> SourceFunction ..
>>
>> env.<Data>addSource(
>>   new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new
>> TypeHint<Data>(){}))
>> )
>>
>> where data is List<Data> and CollectionSourceFunctionJ is a Scala case
>> class ..
>>
>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
>> TypeInformation[T]) extends SourceFunction[T] {
>>   def cancel(): Unit = {}
>>   def run(ctx: SourceContext[T]): Unit = {
>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>   }
>> }
>>
>> When the following transformation runs ..
>>
>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>> DataStream<Simple> simples = ins.map((Data d) -> new
>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>
>> I get the following exception in the second line ..
>>
>> org.apache.flink.api.common.functions.InvalidTypesException: The return
>>> type of function 'Custom Source' could not be determined automatically, due
>>> to type erasure. You can give type information hints by using the
>>> returns(...) method on the result of the transformation call, or by letting
>>> your function implement the 'ResultTypeQueryable' interface.
>>
>>
>> Initially the returns call was not there and I was getting the same
>> exception. Now after adding the returns call, nothing changes.
>>
>> Any help will be appreciated ..
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to