actually the scala and java code are completely separate - in fact they are part of separate test suites. We have both scala and Java API in our application but they r completely separate .. and yeah in Scala the implicits did the trick while I had to pass the TypeInformation explicitly with addSource ..
regards. On Mon, Aug 26, 2019 at 10:00 PM Rong Rong <walter...@gmail.com> wrote: > Glad that you sort it out and sorry for the late reply. > yes. I think the problem is how your `TypeInformation` for `Data` is being > passed to the DataStreamSource construct. > > Regarding why scala side works but not java, there might've been something > to do with the implicit variable passing for your `readStream`, which is > very tricky mixing with Java code. So I would avoid mixing them if possible. > > -- > Rong > > On Sun, Aug 25, 2019 at 11:10 PM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Looks like using the following overload of >> StreamExecutionEnvironment.addSource which takes a TypeInformation as well, >> does the trick .. >> >> env.<Data>addSource( >> FlinkSource.<Data>collectionSourceFunction(data), >> TypeInformation.<Data>of(Data.class) >> ) >> >> regards. >> >> On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> oh .. and I am using Flink 1.8 .. >>> >>> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh < >>> ghosh.debas...@gmail.com> wrote: >>> >>>> Thanks for the feedback .. here are the details .. >>>> >>>> Just to give u some background the original API is a Scala API as >>>> follows .. >>>> >>>> final def readStream[In: TypeInformation: DeserializationSchema](inlet: >>>> CodecInlet[In]): DataStream[In] = >>>> context.readStream(inlet) >>>> >>>> and the *Scala version of the code runs fine* .. Here's the Java API >>>> (also written in Scala though but passing type information and >>>> deserialization schema explicitly and using the DataStream class from Flink >>>> Java) .. >>>> >>>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In], >>>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] = >>>> context.readStream(inlet)(TypeInformation.of[In](clazz), >>>> deserializationSchema) >>>> .javaStream >>>> >>>> Here's the Java code for transformation where I get the error .. >>>> >>>> DataStream<Data> ins = >>>> this.<Data>readStream(in, Data.class, serdeData) >>>> .map((Data d) -> d) >>>> .returns(new TypeHint<Data>(){}.getTypeInfo()); >>>> >>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>> Simple(d.getName())); // .returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class, >>>> serdeSimple); >>>> >>>> Here's the corresponding Scala code that runs fine .. >>>> >>>> val ins: DataStream[Data] = readStream(in) >>>> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName())) >>>> writeStream(out, simples) >>>> >>>> Here's the custom source that's also referred in the exception .. the >>>> case class is directly used in Scala while I use the Java API that uses >>>> that case class from Java .. >>>> >>>> object FlinkSource { >>>> case class CollectionSourceFunction[T](data: Seq[T]) extends >>>> SourceFunction[T] { >>>> def cancel(): Unit = {} >>>> def run(ctx: SourceContext[T]): Unit = { >>>> data.foreach(d ⇒ ctx.collect(d)) >>>> } >>>> } >>>> >>>> /** >>>> * Java API >>>> */ >>>> def collectionSourceFunction[T](data: java.util.List[T]) = >>>> CollectionSourceFunction(data.asScala.toSeq) >>>> } >>>> >>>> Here's how I use the custom source from Java .. (which gives exception) >>>> .. here data is a java.util.List<Data> >>>> >>>> env.<Data>addSource( >>>> FlinkSource.<Data>collectionSourceFunction(data) >>>> ) >>>> >>>> and here's the Scala version, which runs fine .. here data is a >>>> scala.Seq[Data] >>>> >>>> env.addSource(FlinkSource.CollectionSourceFunction(data)) >>>> >>>> Here's the complete exception .. >>>> >>>> [info] org.apache.flink.api.common.functions.InvalidTypesException: >>>> The return type of function 'Custom Source' could not be determined >>>> automatically, due to type erasure. You can give type information hints by >>>> using the returns(...) method on the result of the transformation call, or >>>> by letting your function implement the 'ResultTypeQueryable' interface. >>>> [info] at >>>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420) >>>> [info] at >>>> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175) >>>> [info] at >>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587) >>>> [info] at >>>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237) >>>> [info] at >>>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38) >>>> [info] at >>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282) >>>> [info] at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151) >>>> [info] at >>>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146) >>>> [info] at >>>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138) >>>> [info] at >>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46) >>>> [info] ... >>>> [info] Cause: >>>> org.apache.flink.api.common.functions.InvalidTypesException: Type of >>>> TypeVariable 'T' in 'class >>>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be >>>> determined. This is most likely a type erasure problem. The type extraction >>>> currently supports types with generic variables only in cases where all >>>> variables in the return type can be deduced from the input type(s). >>>> Otherwise the type has to be specified explicitly using type information. >>>> [info] at >>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) >>>> [info] at >>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) >>>> [info] at >>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) >>>> [info] at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459) >>>> [info] at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414) >>>> [info] at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396) >>>> [info] at >>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34) >>>> [info] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> [info] at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> [info] at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> [info] ... >>>> >>>> regards. >>>> >>>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote: >>>> >>>>> I am not sure how the function `readStream` is implemented (also which >>>>> version of Flink are you using?). >>>>> Can you share more information on your code blocks and exception logs? >>>>> >>>>> Also to answer your question, DataStream return type is determined by >>>>> its underlying transformation, so you cannot set it directly. >>>>> >>>>> Thanks, >>>>> Rong >>>>> >>>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> Thanks .. I tried this .. >>>>>> >>>>>> DataStream<Data> ins = readStream(in, Data.class, >>>>>> serdeData).map((Data d) -> d).returns(new >>>>>> TypeHint<Data>(){}.getTypeInfo()); >>>>>> >>>>>> But still get the same error on this line .. >>>>>> >>>>>> (BTW I am not sure how to invoke returns on a DataStream and hence >>>>>> had to do a fake map - any suggestions here ?) >>>>>> >>>>>> regards. >>>>>> >>>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Debasish, >>>>>>> >>>>>>> I think the error refers to the output of your source instead of >>>>>>> your result of the map function. E.g. >>>>>>> DataStream<Data> ins = readStream(in, Data.class, >>>>>>> serdeData)*.returns(new >>>>>>> TypeInformation<Data>);* >>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}. >>>>>>> getTypeInfo()); >>>>>>> >>>>>>> -- >>>>>>> Rong >>>>>>> >>>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh < >>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello - >>>>>>>> >>>>>>>> I have the following call to addSource where I pass a Custom >>>>>>>> SourceFunction .. >>>>>>>> >>>>>>>> env.<Data>addSource( >>>>>>>> new CollectionSourceFunctionJ<Data>(data, >>>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){})) >>>>>>>> ) >>>>>>>> >>>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala >>>>>>>> case class .. >>>>>>>> >>>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], >>>>>>>> ti: TypeInformation[T]) extends SourceFunction[T] { >>>>>>>> def cancel(): Unit = {} >>>>>>>> def run(ctx: SourceContext[T]): Unit = { >>>>>>>> data.asScala.foreach(d ⇒ ctx.collect(d)) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> When the following transformation runs .. >>>>>>>> >>>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData); >>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>>>>>> >>>>>>>> I get the following exception in the second line .. >>>>>>>> >>>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The >>>>>>>>> return type of function 'Custom Source' could not be determined >>>>>>>>> automatically, due to type erasure. You can give type information >>>>>>>>> hints by >>>>>>>>> using the returns(...) method on the result of the transformation >>>>>>>>> call, or >>>>>>>>> by letting your function implement the 'ResultTypeQueryable' >>>>>>>>> interface. >>>>>>>> >>>>>>>> >>>>>>>> Initially the returns call was not there and I was getting the >>>>>>>> same exception. Now after adding the returns call, nothing changes. >>>>>>>> >>>>>>>> Any help will be appreciated .. >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> -- >>>>>>>> Debasish Ghosh >>>>>>>> http://manning.com/ghosh2 >>>>>>>> http://manning.com/ghosh >>>>>>>> >>>>>>>> Twttr: @debasishg >>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>> Code: http://github.com/debasishg >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg