Looks like using the following overload of StreamExecutionEnvironment.addSource which takes a TypeInformation as well, does the trick ..
env.<Data>addSource( FlinkSource.<Data>collectionSourceFunction(data), TypeInformation.<Data>of(Data.class) ) regards. On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > oh .. and I am using Flink 1.8 .. > > On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Thanks for the feedback .. here are the details .. >> >> Just to give u some background the original API is a Scala API as follows >> .. >> >> final def readStream[In: TypeInformation: DeserializationSchema](inlet: >> CodecInlet[In]): DataStream[In] = >> context.readStream(inlet) >> >> and the *Scala version of the code runs fine* .. Here's the Java API >> (also written in Scala though but passing type information and >> deserialization schema explicitly and using the DataStream class from Flink >> Java) .. >> >> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In], >> deserializationSchema: DeserializationSchema[In]): JDataStream[In] = >> context.readStream(inlet)(TypeInformation.of[In](clazz), >> deserializationSchema) >> .javaStream >> >> Here's the Java code for transformation where I get the error .. >> >> DataStream<Data> ins = >> this.<Data>readStream(in, Data.class, serdeData) >> .map((Data d) -> d) >> .returns(new TypeHint<Data>(){}.getTypeInfo()); >> >> DataStream<Simple> simples = ins.map((Data d) -> new >> Simple(d.getName())); // .returns(new TypeHint<Simple>(){}.getTypeInfo()); >> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class, >> serdeSimple); >> >> Here's the corresponding Scala code that runs fine .. >> >> val ins: DataStream[Data] = readStream(in) >> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName())) >> writeStream(out, simples) >> >> Here's the custom source that's also referred in the exception .. the >> case class is directly used in Scala while I use the Java API that uses >> that case class from Java .. >> >> object FlinkSource { >> case class CollectionSourceFunction[T](data: Seq[T]) extends >> SourceFunction[T] { >> def cancel(): Unit = {} >> def run(ctx: SourceContext[T]): Unit = { >> data.foreach(d ⇒ ctx.collect(d)) >> } >> } >> >> /** >> * Java API >> */ >> def collectionSourceFunction[T](data: java.util.List[T]) = >> CollectionSourceFunction(data.asScala.toSeq) >> } >> >> Here's how I use the custom source from Java .. (which gives exception) >> .. here data is a java.util.List<Data> >> >> env.<Data>addSource( >> FlinkSource.<Data>collectionSourceFunction(data) >> ) >> >> and here's the Scala version, which runs fine .. here data is a >> scala.Seq[Data] >> >> env.addSource(FlinkSource.CollectionSourceFunction(data)) >> >> Here's the complete exception .. >> >> [info] org.apache.flink.api.common.functions.InvalidTypesException: The >> return type of function 'Custom Source' could not be determined >> automatically, due to type erasure. You can give type information hints by >> using the returns(...) method on the result of the transformation call, or >> by letting your function implement the 'ResultTypeQueryable' interface. >> [info] at >> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420) >> [info] at >> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175) >> [info] at >> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587) >> [info] at >> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237) >> [info] at >> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38) >> [info] at >> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282) >> [info] at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151) >> [info] at >> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146) >> [info] at >> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138) >> [info] at >> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46) >> [info] ... >> [info] Cause: >> org.apache.flink.api.common.functions.InvalidTypesException: Type of >> TypeVariable 'T' in 'class >> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be >> determined. This is most likely a type erasure problem. The type extraction >> currently supports types with generic variables only in cases where all >> variables in the return type can be deduced from the input type(s). >> Otherwise the type has to be specified explicitly using type information. >> [info] at >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) >> [info] at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) >> [info] at >> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) >> [info] at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459) >> [info] at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414) >> [info] at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396) >> [info] at >> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34) >> [info] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> [info] at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> [info] at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> [info] ... >> >> regards. >> >> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote: >> >>> I am not sure how the function `readStream` is implemented (also which >>> version of Flink are you using?). >>> Can you share more information on your code blocks and exception logs? >>> >>> Also to answer your question, DataStream return type is determined by >>> its underlying transformation, so you cannot set it directly. >>> >>> Thanks, >>> Rong >>> >>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh < >>> ghosh.debas...@gmail.com> wrote: >>> >>>> Thanks .. I tried this .. >>>> >>>> DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data >>>> d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo()); >>>> >>>> But still get the same error on this line .. >>>> >>>> (BTW I am not sure how to invoke returns on a DataStream and hence had >>>> to do a fake map - any suggestions here ?) >>>> >>>> regards. >>>> >>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> wrote: >>>> >>>>> Hi Debasish, >>>>> >>>>> I think the error refers to the output of your source instead of your >>>>> result of the map function. E.g. >>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new >>>>> TypeInformation<Data>);* >>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> Hello - >>>>>> >>>>>> I have the following call to addSource where I pass a Custom >>>>>> SourceFunction .. >>>>>> >>>>>> env.<Data>addSource( >>>>>> new CollectionSourceFunctionJ<Data>(data, >>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){})) >>>>>> ) >>>>>> >>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala >>>>>> case class .. >>>>>> >>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: >>>>>> TypeInformation[T]) extends SourceFunction[T] { >>>>>> def cancel(): Unit = {} >>>>>> def run(ctx: SourceContext[T]): Unit = { >>>>>> data.asScala.foreach(d ⇒ ctx.collect(d)) >>>>>> } >>>>>> } >>>>>> >>>>>> When the following transformation runs .. >>>>>> >>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData); >>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>>>> >>>>>> I get the following exception in the second line .. >>>>>> >>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The >>>>>>> return type of function 'Custom Source' could not be determined >>>>>>> automatically, due to type erasure. You can give type information hints >>>>>>> by >>>>>>> using the returns(...) method on the result of the transformation call, >>>>>>> or >>>>>>> by letting your function implement the 'ResultTypeQueryable' interface. >>>>>> >>>>>> >>>>>> Initially the returns call was not there and I was getting the same >>>>>> exception. Now after adding the returns call, nothing changes. >>>>>> >>>>>> Any help will be appreciated .. >>>>>> >>>>>> regards. >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg