Looks like using the following overload of
StreamExecutionEnvironment.addSource which takes a TypeInformation as well,
does the trick ..

env.<Data>addSource(
  FlinkSource.<Data>collectionSourceFunction(data),
  TypeInformation.<Data>of(Data.class)
)

regards.

On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> oh .. and I am using Flink 1.8 ..
>
> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Thanks for the feedback .. here are the details ..
>>
>> Just to give u some background the original API is a Scala API as follows
>> ..
>>
>> final def readStream[In: TypeInformation: DeserializationSchema](inlet:
>> CodecInlet[In]): DataStream[In] =
>>     context.readStream(inlet)
>>
>> and the *Scala version of the code runs fine* .. Here's the Java API
>> (also written in Scala though but passing type information and
>> deserialization schema explicitly and using the DataStream class from Flink
>> Java) ..
>>
>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
>>     context.readStream(inlet)(TypeInformation.of[In](clazz),
>> deserializationSchema)
>>       .javaStream
>>
>> Here's the Java code for transformation where I get the error ..
>>
>> DataStream<Data> ins =
>>   this.<Data>readStream(in, Data.class, serdeData)
>>       .map((Data d) -> d)
>>       .returns(new TypeHint<Data>(){}.getTypeInfo());
>>
>> DataStream<Simple> simples = ins.map((Data d) -> new
>> Simple(d.getName())); // .returns(new TypeHint<Simple>(){}.getTypeInfo());
>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class,
>> serdeSimple);
>>
>> Here's the corresponding Scala code that runs fine ..
>>
>> val ins: DataStream[Data] = readStream(in)
>> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName()))
>> writeStream(out, simples)
>>
>> Here's the custom source that's also referred in the exception .. the
>> case class is directly used in Scala while I use the Java API that uses
>> that case class from Java ..
>>
>> object FlinkSource {
>>   case class CollectionSourceFunction[T](data: Seq[T]) extends
>> SourceFunction[T] {
>>     def cancel(): Unit = {}
>>     def run(ctx: SourceContext[T]): Unit = {
>>       data.foreach(d ⇒ ctx.collect(d))
>>     }
>>   }
>>
>>   /**
>>    * Java API
>>    */
>>   def collectionSourceFunction[T](data: java.util.List[T]) =
>>     CollectionSourceFunction(data.asScala.toSeq)
>> }
>>
>> Here's how I use the custom source from Java .. (which gives exception)
>> .. here data is a java.util.List<Data>
>>
>> env.<Data>addSource(
>>   FlinkSource.<Data>collectionSourceFunction(data)
>> )
>>
>> and here's the Scala version, which runs fine .. here data is a
>> scala.Seq[Data]
>>
>> env.addSource(FlinkSource.CollectionSourceFunction(data))
>>
>> Here's the complete exception ..
>>
>> [info]   org.apache.flink.api.common.functions.InvalidTypesException: The
>> return type of function 'Custom Source' could not be determined
>> automatically, due to type erasure. You can give type information hints by
>> using the returns(...) method on the result of the transformation call, or
>> by letting your function implement the 'ResultTypeQueryable' interface.
>> [info]   at
>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
>> [info]   at
>> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
>> [info]   at
>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587)
>> [info]   at
>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
>> [info]   at
>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
>> [info]   at
>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
>> [info]   at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
>> [info]   at
>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
>> [info]   at
>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
>> [info]   at
>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
>> [info]   ...
>> [info]   Cause:
>> org.apache.flink.api.common.functions.InvalidTypesException: Type of
>> TypeVariable 'T' in 'class
>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
>> determined. This is most likely a type erasure problem. The type extraction
>> currently supports types with generic variables only in cases where all
>> variables in the return type can be deduced from the input type(s).
>> Otherwise the type has to be specified explicitly using type information.
>> [info]   at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
>> [info]   at
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
>> [info]   at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
>> [info]   at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
>> [info]   at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
>> [info]   at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
>> [info]   at
>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34)
>> [info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> [info]   at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> [info]   at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> [info]   ...
>>
>> regards.
>>
>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote:
>>
>>> I am not sure how the function `readStream` is implemented (also which
>>> version of Flink are you using?).
>>> Can you share more information on your code blocks and exception logs?
>>>
>>> Also to answer your question, DataStream return type is determined by
>>> its underlying transformation, so you cannot set it directly.
>>>
>>> Thanks,
>>> Rong
>>>
>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <
>>> ghosh.debas...@gmail.com> wrote:
>>>
>>>> Thanks .. I tried this ..
>>>>
>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data
>>>> d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo());
>>>>
>>>> But still get the same error on this line ..
>>>>
>>>> (BTW I am not sure how to invoke returns on a DataStream and hence had
>>>> to do a fake map - any suggestions here ?)
>>>>
>>>> regards.
>>>>
>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> wrote:
>>>>
>>>>> Hi Debasish,
>>>>>
>>>>> I think the error refers to the output of your source instead of your
>>>>> result of the map function. E.g.
>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new
>>>>> TypeInformation<Data>);*
>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> Hello -
>>>>>>
>>>>>> I have the following call to addSource where I pass a Custom
>>>>>> SourceFunction ..
>>>>>>
>>>>>> env.<Data>addSource(
>>>>>>   new CollectionSourceFunctionJ<Data>(data,
>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){}))
>>>>>> )
>>>>>>
>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala
>>>>>> case class ..
>>>>>>
>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
>>>>>> TypeInformation[T]) extends SourceFunction[T] {
>>>>>>   def cancel(): Unit = {}
>>>>>>   def run(ctx: SourceContext[T]): Unit = {
>>>>>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> When the following transformation runs ..
>>>>>>
>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>>
>>>>>> I get the following exception in the second line ..
>>>>>>
>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The
>>>>>>> return type of function 'Custom Source' could not be determined
>>>>>>> automatically, due to type erasure. You can give type information hints 
>>>>>>> by
>>>>>>> using the returns(...) method on the result of the transformation call, 
>>>>>>> or
>>>>>>> by letting your function implement the 'ResultTypeQueryable' interface.
>>>>>>
>>>>>>
>>>>>> Initially the returns call was not there and I was getting the same
>>>>>> exception. Now after adding the returns call, nothing changes.
>>>>>>
>>>>>> Any help will be appreciated ..
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to