oh .. and I am using Flink 1.8 .. On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <ghosh.debas...@gmail.com> wrote:
> Thanks for the feedback .. here are the details .. > > Just to give u some background the original API is a Scala API as follows > .. > > final def readStream[In: TypeInformation: DeserializationSchema](inlet: > CodecInlet[In]): DataStream[In] = > context.readStream(inlet) > > and the *Scala version of the code runs fine* .. Here's the Java API > (also written in Scala though but passing type information and > deserialization schema explicitly and using the DataStream class from Flink > Java) .. > > final def readStream[In](inlet: CodecInlet[In], clazz: Class[In], > deserializationSchema: DeserializationSchema[In]): JDataStream[In] = > context.readStream(inlet)(TypeInformation.of[In](clazz), > deserializationSchema) > .javaStream > > Here's the Java code for transformation where I get the error .. > > DataStream<Data> ins = > this.<Data>readStream(in, Data.class, serdeData) > .map((Data d) -> d) > .returns(new TypeHint<Data>(){}.getTypeInfo()); > > DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName())); > // .returns(new TypeHint<Simple>(){}.getTypeInfo()); > DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class, > serdeSimple); > > Here's the corresponding Scala code that runs fine .. > > val ins: DataStream[Data] = readStream(in) > val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName())) > writeStream(out, simples) > > Here's the custom source that's also referred in the exception .. the case > class is directly used in Scala while I use the Java API that uses that > case class from Java .. > > object FlinkSource { > case class CollectionSourceFunction[T](data: Seq[T]) extends > SourceFunction[T] { > def cancel(): Unit = {} > def run(ctx: SourceContext[T]): Unit = { > data.foreach(d ⇒ ctx.collect(d)) > } > } > > /** > * Java API > */ > def collectionSourceFunction[T](data: java.util.List[T]) = > CollectionSourceFunction(data.asScala.toSeq) > } > > Here's how I use the custom source from Java .. (which gives exception) .. > here data is a java.util.List<Data> > > env.<Data>addSource( > FlinkSource.<Data>collectionSourceFunction(data) > ) > > and here's the Scala version, which runs fine .. here data is a > scala.Seq[Data] > > env.addSource(FlinkSource.CollectionSourceFunction(data)) > > Here's the complete exception .. > > [info] org.apache.flink.api.common.functions.InvalidTypesException: The > return type of function 'Custom Source' could not be determined > automatically, due to type erasure. You can give type information hints by > using the returns(...) method on the result of the transformation call, or > by letting your function implement the 'ResultTypeQueryable' interface. > [info] at > org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420) > [info] at > org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175) > [info] at > org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587) > [info] at > pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237) > [info] at > pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38) > [info] at > pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282) > [info] at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151) > [info] at > pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146) > [info] at > pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138) > [info] at > pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46) > [info] ... > [info] Cause: > org.apache.flink.api.common.functions.InvalidTypesException: Type of > TypeVariable 'T' in 'class > pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be > determined. This is most likely a type erasure problem. The type extraction > currently supports types with generic variables only in cases where all > variables in the return type can be deduced from the input type(s). > Otherwise the type has to be specified explicitly using type information. > [info] at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) > [info] at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) > [info] at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) > [info] at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459) > [info] at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414) > [info] at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396) > [info] at > pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34) > [info] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > [info] at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > [info] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > [info] ... > > regards. > > On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> wrote: > >> I am not sure how the function `readStream` is implemented (also which >> version of Flink are you using?). >> Can you share more information on your code blocks and exception logs? >> >> Also to answer your question, DataStream return type is determined by its >> underlying transformation, so you cannot set it directly. >> >> Thanks, >> Rong >> >> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> Thanks .. I tried this .. >>> >>> DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data >>> d) -> d).returns(new TypeHint<Data>(){}.getTypeInfo()); >>> >>> But still get the same error on this line .. >>> >>> (BTW I am not sure how to invoke returns on a DataStream and hence had >>> to do a fake map - any suggestions here ?) >>> >>> regards. >>> >>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> wrote: >>> >>>> Hi Debasish, >>>> >>>> I think the error refers to the output of your source instead of your >>>> result of the map function. E.g. >>>> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new >>>> TypeInformation<Data>);* >>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>> >>>> -- >>>> Rong >>>> >>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh < >>>> ghosh.debas...@gmail.com> wrote: >>>> >>>>> Hello - >>>>> >>>>> I have the following call to addSource where I pass a Custom >>>>> SourceFunction .. >>>>> >>>>> env.<Data>addSource( >>>>> new CollectionSourceFunctionJ<Data>(data, >>>>> TypeInformation.<Data>of(new TypeHint<Data>(){})) >>>>> ) >>>>> >>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala >>>>> case class .. >>>>> >>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti: >>>>> TypeInformation[T]) extends SourceFunction[T] { >>>>> def cancel(): Unit = {} >>>>> def run(ctx: SourceContext[T]): Unit = { >>>>> data.asScala.foreach(d ⇒ ctx.collect(d)) >>>>> } >>>>> } >>>>> >>>>> When the following transformation runs .. >>>>> >>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData); >>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>>> >>>>> I get the following exception in the second line .. >>>>> >>>>> org.apache.flink.api.common.functions.InvalidTypesException: The >>>>>> return type of function 'Custom Source' could not be determined >>>>>> automatically, due to type erasure. You can give type information hints >>>>>> by >>>>>> using the returns(...) method on the result of the transformation call, >>>>>> or >>>>>> by letting your function implement the 'ResultTypeQueryable' interface. >>>>> >>>>> >>>>> Initially the returns call was not there and I was getting the same >>>>> exception. Now after adding the returns call, nothing changes. >>>>> >>>>> Any help will be appreciated .. >>>>> >>>>> regards. >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg