actually the scala and java code are completely separate - in fact they are
part of separate test suites. We have both scala and Java API in our
application but they r completely separate .. and yeah in Scala the
implicits did the trick while I had to pass the TypeInformation explicitly
with addSource ..

regards.

On Mon, Aug 26, 2019 at 10:00 PM Rong Rong <walter...@gmail.com> wrote:

> Glad that you sort it out and sorry for the late reply.
> yes. I think the problem is how your `TypeInformation` for `Data` is being
> passed to the DataStreamSource construct.
>
> Regarding why scala side works but not java, there might've been something
> to do with the implicit variable passing for your `readStream`, which is
> very tricky mixing with Java code. So I would avoid mixing them if possible.
>
> --
> Rong
>
> On Sun, Aug 25, 2019 at 11:10 PM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Looks like using the following overload of
>> StreamExecutionEnvironment.addSource which takes a TypeInformation as well,
>> does the trick ..
>>
>> env.<Data>addSource(
>>   FlinkSource.<Data>collectionSourceFunction(data),
>>   TypeInformation.<Data>of(Data.class)
>> )
>>
>> regards.
>>
>> On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> oh .. and I am using Flink 1.8 ..
>>>
>>> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <
>>> ghosh.debas...@gmail.com> wrote:
>>>
>>>> Thanks for the feedback .. here are the details ..
>>>>
>>>> Just to give u some background the original API is a Scala API as
>>>> follows ..
>>>>
>>>> final def readStream[In: TypeInformation: DeserializationSchema](inlet:
>>>> CodecInlet[In]): DataStream[In] =
>>>>     context.readStream(inlet)
>>>>
>>>> and the *Scala version of the code runs fine* .. Here's the Java API
>>>> (also written in Scala though but passing type information and
>>>> deserialization schema explicitly and using the DataStream class from Flink
>>>> Java) ..
>>>>
>>>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
>>>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
>>>>     context.readStream(inlet)(TypeInformation.of[In](clazz),
>>>> deserializationSchema)
>>>>       .javaStream
>>>>
>>>> Here's the Java code for transformation where I get the error ..
>>>>
>>>> DataStream<Data> ins =
>>>>   this.<Data>readStream(in, Data.class, serdeData)
>>>>       .map((Data d) -> d)
>>>>       .returns(new TypeHint<Data>(){}.getTypeInfo());
>>>>
>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>> Simple(d.getName())); // .returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class,
>>>> serdeSimple);
>>>>
>>>> Here's the corresponding Scala code that runs fine ..
>>>>
>>>> val ins: DataStream[Data] = readStream(in)
>>>> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName()))
>>>> writeStream(out, simples)
>>>>
>>>> Here's the custom source that's also referred in the exception .. the
>>>> case class is directly used in Scala while I use the Java API that uses
>>>> that case class from Java ..
>>>>
>>>> object FlinkSource {
>>>>   case class CollectionSourceFunction[T](data: Seq[T]) extends
>>>> SourceFunction[T] {
>>>>     def cancel(): Unit = {}
>>>>     def run(ctx: SourceContext[T]): Unit = {
>>>>       data.foreach(d ⇒ ctx.collect(d))
>>>>     }
>>>>   }
>>>>
>>>>   /**
>>>>    * Java API
>>>>    */
>>>>   def collectionSourceFunction[T](data: java.util.List[T]) =
>>>>     CollectionSourceFunction(data.asScala.toSeq)
>>>> }
>>>>
>>>> Here's how I use the custom source from Java .. (which gives exception)
>>>> .. here data is a java.util.List<Data>
>>>>
>>>> env.<Data>addSource(
>>>>   FlinkSource.<Data>collectionSourceFunction(data)
>>>> )
>>>>
>>>> and here's the Scala version, which runs fine .. here data is a
>>>> scala.Seq[Data]
>>>>
>>>> env.addSource(FlinkSource.CollectionSourceFunction(data))
>>>>
>>>> Here's the complete exception ..
>>>>
>>>> [info]   org.apache.flink.api.common.functions.InvalidTypesException:
>>>> The return type of function 'Custom Source' could not be determined
>>>> automatically, due to type erasure. You can give type information hints by
>>>> using the returns(...) method on the result of the transformation call, or
>>>> by letting your function implement the 'ResultTypeQueryable' interface.
>>>> [info]   at
>>>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587)
>>>> [info]   at
>>>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
>>>> [info]   at
>>>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
>>>> [info]   at
>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
>>>> [info]   at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
>>>> [info]   at
>>>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
>>>> [info]   at
>>>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
>>>> [info]   at
>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
>>>> [info]   ...
>>>> [info]   Cause:
>>>> org.apache.flink.api.common.functions.InvalidTypesException: Type of
>>>> TypeVariable 'T' in 'class
>>>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
>>>> determined. This is most likely a type erasure problem. The type extraction
>>>> currently supports types with generic variables only in cases where all
>>>> variables in the return type can be deduced from the input type(s).
>>>> Otherwise the type has to be specified explicitly using type information.
>>>> [info]   at
>>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
>>>> [info]   at
>>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
>>>> [info]   at
>>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
>>>> [info]   at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
>>>> [info]   at
>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34)
>>>> [info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> [info]   at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> [info]   at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> [info]   ...
>>>>
>>>> regards.
>>>>
>>>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote:
>>>>
>>>>> I am not sure how the function `readStream` is implemented (also which
>>>>> version of Flink are you using?).
>>>>> Can you share more information on your code blocks and exception logs?
>>>>>
>>>>> Also to answer your question, DataStream return type is determined by
>>>>> its underlying transformation, so you cannot set it directly.
>>>>>
>>>>> Thanks,
>>>>> Rong
>>>>>
>>>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> Thanks .. I tried this ..
>>>>>>
>>>>>> DataStream<Data> ins = readStream(in, Data.class,
>>>>>> serdeData).map((Data d) -> d).returns(new 
>>>>>> TypeHint<Data>(){}.getTypeInfo());
>>>>>>
>>>>>> But still get the same error on this line ..
>>>>>>
>>>>>> (BTW I am not sure how to invoke returns on a DataStream and hence
>>>>>> had to do a fake map - any suggestions here ?)
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Debasish,
>>>>>>>
>>>>>>> I think the error refers to the output of your source instead of
>>>>>>> your result of the map function. E.g.
>>>>>>> DataStream<Data> ins = readStream(in, Data.class, 
>>>>>>> serdeData)*.returns(new
>>>>>>> TypeInformation<Data>);*
>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.
>>>>>>> getTypeInfo());
>>>>>>>
>>>>>>> --
>>>>>>> Rong
>>>>>>>
>>>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <
>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello -
>>>>>>>>
>>>>>>>> I have the following call to addSource where I pass a Custom
>>>>>>>> SourceFunction ..
>>>>>>>>
>>>>>>>> env.<Data>addSource(
>>>>>>>>   new CollectionSourceFunctionJ<Data>(data,
>>>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){}))
>>>>>>>> )
>>>>>>>>
>>>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala
>>>>>>>> case class ..
>>>>>>>>
>>>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T],
>>>>>>>> ti: TypeInformation[T]) extends SourceFunction[T] {
>>>>>>>>   def cancel(): Unit = {}
>>>>>>>>   def run(ctx: SourceContext[T]): Unit = {
>>>>>>>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> When the following transformation runs ..
>>>>>>>>
>>>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>>>>
>>>>>>>> I get the following exception in the second line ..
>>>>>>>>
>>>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The
>>>>>>>>> return type of function 'Custom Source' could not be determined
>>>>>>>>> automatically, due to type erasure. You can give type information 
>>>>>>>>> hints by
>>>>>>>>> using the returns(...) method on the result of the transformation 
>>>>>>>>> call, or
>>>>>>>>> by letting your function implement the 'ResultTypeQueryable' 
>>>>>>>>> interface.
>>>>>>>>
>>>>>>>>
>>>>>>>> Initially the returns call was not there and I was getting the
>>>>>>>> same exception. Now after adding the returns call, nothing changes.
>>>>>>>>
>>>>>>>> Any help will be appreciated ..
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to