Glad that you sort it out and sorry for the late reply. yes. I think the problem is how your `TypeInformation` for `Data` is being passed to the DataStreamSource construct.
Regarding why scala side works but not java, there might've been something to do with the implicit variable passing for your `readStream`, which is very tricky mixing with Java code. So I would avoid mixing them if possible. -- Rong On Sun, Aug 25, 2019 at 11:10 PM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Looks like using the following overload of > StreamExecutionEnvironment.addSource which takes a TypeInformation as well, > does the trick .. > > env.<Data>addSource( > FlinkSource.<Data>collectionSourceFunction(data), > TypeInformation.<Data>of(Data.class) > ) > > regards. > > On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> oh .. and I am using Flink 1.8 .. >> >> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> Thanks for the feedback .. here are the details .. >>> >>> Just to give u some background the original API is a Scala API as >>> follows .. >>> >>> final def readStream[In: TypeInformation: DeserializationSchema](inlet: >>> CodecInlet[In]): DataStream[In] = >>> context.readStream(inlet) >>> >>> and the *Scala version of the code runs fine* .. Here's the Java API >>> (also written in Scala though but passing type information and >>> deserialization schema explicitly and using the DataStream class from Flink >>> Java) .. >>> >>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In], >>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] = >>> context.readStream(inlet)(TypeInformation.of[In](clazz), >>> deserializationSchema) >>> .javaStream >>> >>> Here's the Java code for transformation where I get the error .. >>> >>> DataStream<Data> ins = >>> this.<Data>readStream(in, Data.class, serdeData) >>> .map((Data d) -> d) >>> .returns(new TypeHint<Data>(){}.getTypeInfo()); >>> >>> DataStream<Simple> simples = ins.map((Data d) -> new >>> Simple(d.getName())); // .returns(new TypeHint<Simple>(){}.getTypeInfo()); >>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class, >>> serdeSimple); >>> >>> Here's the corresponding Scala code that runs fine .. >>> >>> val ins: DataStream[Data] = readStream(in) >>> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName())) >>> writeStream(out, simples) >>> >>> Here's the custom source that's also referred in the exception .. the >>> case class is directly used in Scala while I use the Java API that uses >>> that case class from Java .. >>> >>> object FlinkSource { >>> case class CollectionSourceFunction[T](data: Seq[T]) extends >>> SourceFunction[T] { >>> def cancel(): Unit = {} >>> def run(ctx: SourceContext[T]): Unit = { >>> data.foreach(d ⇒ ctx.collect(d)) >>> } >>> } >>> >>> /** >>> * Java API >>> */ >>> def collectionSourceFunction[T](data: java.util.List[T]) = >>> CollectionSourceFunction(data.asScala.toSeq) >>> } >>> >>> Here's how I use the custom source from Java .. (which gives exception) >>> .. here data is a java.util.List<Data> >>> >>> env.<Data>addSource( >>> FlinkSource.<Data>collectionSourceFunction(data) >>> ) >>> >>> and here's the Scala version, which runs fine .. here data is a >>> scala.Seq[Data] >>> >>> env.addSource(FlinkSource.CollectionSourceFunction(data)) >>> >>> Here's the complete exception .. >>> >>> [info] org.apache.flink.api.common.functions.InvalidTypesException: >>> The return type of function 'Custom Source' could not be determined >>> automatically, due to type erasure. You can give type information hints by >>> using the returns(...) method on the result of the transformation call, or >>> by letting your function implement the 'ResultTypeQueryable' interface. >>> [info] at >>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420) >>> [info] at >>> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175) >>> [info] at >>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587) >>> [info] at >>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237) >>> [info] at >>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38) >>> [info] at >>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282) >>> [info] at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151) >>> [info] at >>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146) >>> [info] at >>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138) >>> [info] at >>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46) >>> [info] ... >>> [info] Cause: >>> org.apache.flink.api.common.functions.InvalidTypesException: Type of >>> TypeVariable 'T' in 'class >>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be >>> determined. This is most likely a type erasure problem. The type extraction >>> currently supports types with generic variables only in cases where all >>> variables in the return type can be deduced from the input type(s). >>> Otherwise the type has to be specified explicitly using type information. >>> [info] at >>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) >>> [info] at >>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) >>> [info] at >>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) >>> [info] at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459) >>> [info] at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414) >>> [info] at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396) >>> [info] at >>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34) >>> [info] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> [info] at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> [info] at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> [info] ... >>> >>> regards. >>> >>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote: >>> >>>> I am not sure how the function `readStream` is implemented (also which >>>> version of Flink are you using?). >>>> Can you share more information on your code blocks and exception logs? >>>> >>>> Also to answer your question, DataStream return type is determined by >>>> its underlying transformation, so you cannot set it directly. >>>> >>>> Thanks, >>>> Rong >>>> >>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh < >>>> ghosh.debas...@gmail.com> wrote: >>>> >>>>> Thanks .. I tried this .. >>>>> >>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data >>>>> d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo()); >>>>> >>>>> But still get the same error on this line .. >>>>> >>>>> (BTW I am not sure how to invoke returns on a DataStream and hence >>>>> had to do a fake map - any suggestions here ?) >>>>> >>>>> regards. >>>>> >>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Debasish, >>>>>> >>>>>> I think the error refers to the output of your source instead of your >>>>>> result of the map function. E.g. >>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new >>>>>> TypeInformation<Data>);* >>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>>>> >>>>>> -- >>>>>> Rong >>>>>> >>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh < >>>>>> ghosh.debas...@gmail.com> wrote: >>>>>> >>>>>>> Hello - >>>>>>> >>>>>>> I have the following call to addSource where I pass a Custom >>>>>>> SourceFunction .. >>>>>>> >>>>>>> env.<Data>addSource( >>>>>>> new CollectionSourceFunctionJ<Data>(data, >>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){})) >>>>>>> ) >>>>>>> >>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala >>>>>>> case class .. >>>>>>> >>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: >>>>>>> TypeInformation[T]) extends SourceFunction[T] { >>>>>>> def cancel(): Unit = {} >>>>>>> def run(ctx: SourceContext[T]): Unit = { >>>>>>> data.asScala.foreach(d ⇒ ctx.collect(d)) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> When the following transformation runs .. >>>>>>> >>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData); >>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>>>>> >>>>>>> I get the following exception in the second line .. >>>>>>> >>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The >>>>>>>> return type of function 'Custom Source' could not be determined >>>>>>>> automatically, due to type erasure. You can give type information >>>>>>>> hints by >>>>>>>> using the returns(...) method on the result of the transformation >>>>>>>> call, or >>>>>>>> by letting your function implement the 'ResultTypeQueryable' interface. >>>>>>> >>>>>>> >>>>>>> Initially the returns call was not there and I was getting the same >>>>>>> exception. Now after adding the returns call, nothing changes. >>>>>>> >>>>>>> Any help will be appreciated .. >>>>>>> >>>>>>> regards. >>>>>>> >>>>>>> -- >>>>>>> Debasish Ghosh >>>>>>> http://manning.com/ghosh2 >>>>>>> http://manning.com/ghosh >>>>>>> >>>>>>> Twttr: @debasishg >>>>>>> Blog: http://debasishg.blogspot.com >>>>>>> Code: http://github.com/debasishg >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >