oh .. and I am using Flink 1.8 ..

On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Thanks for the feedback .. here are the details ..
>
> Just to give u some background the original API is a Scala API as follows
> ..
>
> final def readStream[In: TypeInformation: DeserializationSchema](inlet:
> CodecInlet[In]): DataStream[In] =
>     context.readStream(inlet)
>
> and the *Scala version of the code runs fine* .. Here's the Java API
> (also written in Scala though but passing type information and
> deserialization schema explicitly and using the DataStream class from Flink
> Java) ..
>
> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
> deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
>     context.readStream(inlet)(TypeInformation.of[In](clazz),
> deserializationSchema)
>       .javaStream
>
> Here's the Java code for transformation where I get the error ..
>
> DataStream<Data> ins =
>   this.<Data>readStream(in, Data.class, serdeData)
>       .map((Data d) -> d)
>       .returns(new TypeHint<Data>(){}.getTypeInfo());
>
> DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName()));
> // .returns(new TypeHint<Simple>(){}.getTypeInfo());
> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class,
> serdeSimple);
>
> Here's the corresponding Scala code that runs fine ..
>
> val ins: DataStream[Data] = readStream(in)
> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName()))
> writeStream(out, simples)
>
> Here's the custom source that's also referred in the exception .. the case
> class is directly used in Scala while I use the Java API that uses that
> case class from Java ..
>
> object FlinkSource {
>   case class CollectionSourceFunction[T](data: Seq[T]) extends
> SourceFunction[T] {
>     def cancel(): Unit = {}
>     def run(ctx: SourceContext[T]): Unit = {
>       data.foreach(d ⇒ ctx.collect(d))
>     }
>   }
>
>   /**
>    * Java API
>    */
>   def collectionSourceFunction[T](data: java.util.List[T]) =
>     CollectionSourceFunction(data.asScala.toSeq)
> }
>
> Here's how I use the custom source from Java .. (which gives exception) ..
> here data is a java.util.List<Data>
>
> env.<Data>addSource(
>   FlinkSource.<Data>collectionSourceFunction(data)
> )
>
> and here's the Scala version, which runs fine .. here data is a
> scala.Seq[Data]
>
> env.addSource(FlinkSource.CollectionSourceFunction(data))
>
> Here's the complete exception ..
>
> [info]   org.apache.flink.api.common.functions.InvalidTypesException: The
> return type of function 'Custom Source' could not be determined
> automatically, due to type erasure. You can give type information hints by
> using the returns(...) method on the result of the transformation call, or
> by letting your function implement the 'ResultTypeQueryable' interface.
> [info]   at
> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
> [info]   at
> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
> [info]   at
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587)
> [info]   at
> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
> [info]   at
> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
> [info]   at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
> [info]   at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
> [info]   at
> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
> [info]   at
> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
> [info]   at
> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
> [info]   ...
> [info]   Cause:
> org.apache.flink.api.common.functions.InvalidTypesException: Type of
> TypeVariable 'T' in 'class
> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
> determined. This is most likely a type erasure problem. The type extraction
> currently supports types with generic variables only in cases where all
> variables in the return type can be deduced from the input type(s).
> Otherwise the type has to be specified explicitly using type information.
> [info]   at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
> [info]   at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
> [info]   at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
> [info]   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
> [info]   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
> [info]   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
> [info]   at
> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34)
> [info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [info]   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> [info]   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> [info]   ...
>
> regards.
>
> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote:
>
>> I am not sure how the function `readStream` is implemented (also which
>> version of Flink are you using?).
>> Can you share more information on your code blocks and exception logs?
>>
>> Also to answer your question, DataStream return type is determined by its
>> underlying transformation, so you cannot set it directly.
>>
>> Thanks,
>> Rong
>>
>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Thanks .. I tried this ..
>>>
>>> DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data
>>> d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo());
>>>
>>> But still get the same error on this line ..
>>>
>>> (BTW I am not sure how to invoke returns on a DataStream and hence had
>>> to do a fake map - any suggestions here ?)
>>>
>>> regards.
>>>
>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> wrote:
>>>
>>>> Hi Debasish,
>>>>
>>>> I think the error refers to the output of your source instead of your
>>>> result of the map function. E.g.
>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new
>>>> TypeInformation<Data>);*
>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>
>>>> --
>>>> Rong
>>>>
>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> Hello -
>>>>>
>>>>> I have the following call to addSource where I pass a Custom
>>>>> SourceFunction ..
>>>>>
>>>>> env.<Data>addSource(
>>>>>   new CollectionSourceFunctionJ<Data>(data,
>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){}))
>>>>> )
>>>>>
>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala
>>>>> case class ..
>>>>>
>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
>>>>> TypeInformation[T]) extends SourceFunction[T] {
>>>>>   def cancel(): Unit = {}
>>>>>   def run(ctx: SourceContext[T]): Unit = {
>>>>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>>>>   }
>>>>> }
>>>>>
>>>>> When the following transformation runs ..
>>>>>
>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>
>>>>> I get the following exception in the second line ..
>>>>>
>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The
>>>>>> return type of function 'Custom Source' could not be determined
>>>>>> automatically, due to type erasure. You can give type information hints 
>>>>>> by
>>>>>> using the returns(...) method on the result of the transformation call, 
>>>>>> or
>>>>>> by letting your function implement the 'ResultTypeQueryable' interface.
>>>>>
>>>>>
>>>>> Initially the returns call was not there and I was getting the same
>>>>> exception. Now after adding the returns call, nothing changes.
>>>>>
>>>>> Any help will be appreciated ..
>>>>>
>>>>> regards.
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to