Thanks for the clear explanation .. On Mon, Aug 26, 2019 at 10:34 PM Seth Wiesman <s...@ververica.com> wrote:
> Hi Debasish, > > As it seems your aware TypeInformation is Flink’s internal type system > used for serialization between tasks and in/out of state backends. > > The issue you are seeing is because you are using a Scala case class with > flink-streaming-java. Flinks java api derives type information using java > reflection and only knows about java types and patterns such as pojos. To > use your case class from java and not fall back to Kryo you must either > rewrite it to be a pojo or register a custom serializer. > > Scala TypeInformation is defined in flink-scala and flink-streaming-scala > and generates type information using macros. There is no way to create a > CaseClassTypInfo from java (technically you could but it I’m not sure how > easy it would be). > > This often comes up as a point of confusion because flink-streaming-java > pulls in Scala. This is due to a transitive dependency on flink-runtime > which uses Scala for some parts of the Job Manager and Task Managers. > flink-streaming-java itself is Scala oblivious and a short to mid term goal > of the project is to either remove or shade away these components so java > users have a pure java dependency. > > Seth > > On Mon, Aug 26, 2019 at 11:59 AM Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> actually the scala and java code are completely separate - in fact they >> are part of separate test suites. We have both scala and Java API in our >> application but they r completely separate .. and yeah in Scala the >> implicits did the trick while I had to pass the TypeInformation explicitly >> with addSource .. >> >> regards. >> >> On Mon, Aug 26, 2019 at 10:00 PM Rong Rong <walter...@gmail.com> wrote: >> >>> Glad that you sort it out and sorry for the late reply. >>> yes. I think the problem is how your `TypeInformation` for `Data` is >>> being passed to the DataStreamSource construct. >>> >>> Regarding why scala side works but not java, there might've been >>> something to do with the implicit variable passing for your `readStream`, >>> which is very tricky mixing with Java code. So I would avoid mixing them if >>> possible. >>> >>> -- >>> Rong >>> >>> On Sun, Aug 25, 2019 at 11:10 PM Debasish Ghosh < >>> ghosh.debas...@gmail.com> wrote: >>> >>>> Looks like using the following overload of >>>> StreamExecutionEnvironment.addSource which takes a TypeInformation as well, >>>> does the trick .. >>>> >>>> env.<Data>addSource( >>>> FlinkSource.<Data>collectionSourceFunction(data), >>>> TypeInformation.<Data>of(Data.class) >>>> ) >>>> >>>> regards. >>>> >>>> On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh < >>>> ghosh.debas...@gmail.com> wrote: >>>> >>>>> oh .. and I am using Flink 1.8 .. >>>>> >>>>> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> Thanks for the feedback .. here are the details .. >>>>>> >>>>>> Just to give u some background the original API is a Scala API as >>>>>> follows .. >>>>>> >>>>>> final def readStream[In: TypeInformation: >>>>>> DeserializationSchema](inlet: CodecInlet[In]): DataStream[In] = >>>>>> context.readStream(inlet) >>>>>> >>>>>> and the *Scala version of the code runs fine* .. Here's the Java API >>>>>> (also written in Scala though but passing type information and >>>>>> deserialization schema explicitly and using the DataStream class from >>>>>> Flink >>>>>> Java) .. >>>>>> >>>>>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In], >>>>>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] = >>>>>> context.readStream(inlet)(TypeInformation.of[In](clazz), >>>>>> deserializationSchema) >>>>>> .javaStream >>>>>> >>>>>> Here's the Java code for transformation where I get the error .. >>>>>> >>>>>> DataStream<Data> ins = >>>>>> this.<Data>readStream(in, Data.class, serdeData) >>>>>> .map((Data d) -> d) >>>>>> .returns(new TypeHint<Data>(){}.getTypeInfo()); >>>>>> >>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>> Simple(d.getName())); // .returns(new >>>>>> TypeHint<Simple>(){}.getTypeInfo()); >>>>>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class, >>>>>> serdeSimple); >>>>>> >>>>>> Here's the corresponding Scala code that runs fine .. >>>>>> >>>>>> val ins: DataStream[Data] = readStream(in) >>>>>> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName())) >>>>>> writeStream(out, simples) >>>>>> >>>>>> Here's the custom source that's also referred in the exception .. the >>>>>> case class is directly used in Scala while I use the Java API that uses >>>>>> that case class from Java .. >>>>>> >>>>>> object FlinkSource { >>>>>> case class CollectionSourceFunction[T](data: Seq[T]) extends >>>>>> SourceFunction[T] { >>>>>> def cancel(): Unit = {} >>>>>> def run(ctx: SourceContext[T]): Unit = { >>>>>> data.foreach(d ⇒ ctx.collect(d)) >>>>>> } >>>>>> } >>>>>> >>>>>> /** >>>>>> * Java API >>>>>> */ >>>>>> def collectionSourceFunction[T](data: java.util.List[T]) = >>>>>> CollectionSourceFunction(data.asScala.toSeq) >>>>>> } >>>>>> >>>>>> Here's how I use the custom source from Java .. (which gives >>>>>> exception) .. here data is a java.util.List<Data> >>>>>> >>>>>> env.<Data>addSource( >>>>>> FlinkSource.<Data>collectionSourceFunction(data) >>>>>> ) >>>>>> >>>>>> and here's the Scala version, which runs fine .. here data is a >>>>>> scala.Seq[Data] >>>>>> >>>>>> env.addSource(FlinkSource.CollectionSourceFunction(data)) >>>>>> >>>>>> Here's the complete exception .. >>>>>> >>>>>> [info] org.apache.flink.api.common.functions.InvalidTypesException: >>>>>> The return type of function 'Custom Source' could not be determined >>>>>> automatically, due to type erasure. You can give type information hints >>>>>> by >>>>>> using the returns(...) method on the result of the transformation call, >>>>>> or >>>>>> by letting your function implement the 'ResultTypeQueryable' interface. >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587) >>>>>> [info] at >>>>>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237) >>>>>> [info] at >>>>>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38) >>>>>> [info] at >>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282) >>>>>> [info] at >>>>>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151) >>>>>> [info] at >>>>>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146) >>>>>> [info] at >>>>>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138) >>>>>> [info] at >>>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46) >>>>>> [info] ... >>>>>> [info] Cause: >>>>>> org.apache.flink.api.common.functions.InvalidTypesException: Type of >>>>>> TypeVariable 'T' in 'class >>>>>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not >>>>>> be >>>>>> determined. This is most likely a type erasure problem. The type >>>>>> extraction >>>>>> currently supports types with generic variables only in cases where all >>>>>> variables in the return type can be deduced from the input type(s). >>>>>> Otherwise the type has to be specified explicitly using type information. >>>>>> [info] at >>>>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882) >>>>>> [info] at >>>>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) >>>>>> [info] at >>>>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414) >>>>>> [info] at >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396) >>>>>> [info] at >>>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34) >>>>>> [info] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> [info] at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> [info] at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> [info] ... >>>>>> >>>>>> regards. >>>>>> >>>>>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I am not sure how the function `readStream` is implemented (also >>>>>>> which version of Flink are you using?). >>>>>>> Can you share more information on your code blocks and exception >>>>>>> logs? >>>>>>> >>>>>>> Also to answer your question, DataStream return type is determined >>>>>>> by its underlying transformation, so you cannot set it directly. >>>>>>> >>>>>>> Thanks, >>>>>>> Rong >>>>>>> >>>>>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh < >>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks .. I tried this .. >>>>>>>> >>>>>>>> DataStream<Data> ins = readStream(in, Data.class, >>>>>>>> serdeData).map((Data d) -> d).returns(new >>>>>>>> TypeHint<Data>(){}.getTypeInfo()); >>>>>>>> >>>>>>>> But still get the same error on this line .. >>>>>>>> >>>>>>>> (BTW I am not sure how to invoke returns on a DataStream and hence >>>>>>>> had to do a fake map - any suggestions here ?) >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Debasish, >>>>>>>>> >>>>>>>>> I think the error refers to the output of your source instead of >>>>>>>>> your result of the map function. E.g. >>>>>>>>> DataStream<Data> ins = readStream(in, Data.class, >>>>>>>>> serdeData)*.returns(new >>>>>>>>> TypeInformation<Data>);* >>>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}. >>>>>>>>> getTypeInfo()); >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Rong >>>>>>>>> >>>>>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh < >>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello - >>>>>>>>>> >>>>>>>>>> I have the following call to addSource where I pass a Custom >>>>>>>>>> SourceFunction .. >>>>>>>>>> >>>>>>>>>> env.<Data>addSource( >>>>>>>>>> new CollectionSourceFunctionJ<Data>(data, >>>>>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){})) >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a >>>>>>>>>> Scala case class .. >>>>>>>>>> >>>>>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], >>>>>>>>>> ti: TypeInformation[T]) extends SourceFunction[T] { >>>>>>>>>> def cancel(): Unit = {} >>>>>>>>>> def run(ctx: SourceContext[T]): Unit = { >>>>>>>>>> data.asScala.foreach(d ⇒ ctx.collect(d)) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> When the following transformation runs .. >>>>>>>>>> >>>>>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData); >>>>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new >>>>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo()); >>>>>>>>>> >>>>>>>>>> I get the following exception in the second line .. >>>>>>>>>> >>>>>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The >>>>>>>>>>> return type of function 'Custom Source' could not be determined >>>>>>>>>>> automatically, due to type erasure. You can give type information >>>>>>>>>>> hints by >>>>>>>>>>> using the returns(...) method on the result of the transformation >>>>>>>>>>> call, or >>>>>>>>>>> by letting your function implement the 'ResultTypeQueryable' >>>>>>>>>>> interface. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Initially the returns call was not there and I was getting the >>>>>>>>>> same exception. Now after adding the returns call, nothing >>>>>>>>>> changes. >>>>>>>>>> >>>>>>>>>> Any help will be appreciated .. >>>>>>>>>> >>>>>>>>>> regards. >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Debasish Ghosh >>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>> http://manning.com/ghosh >>>>>>>>>> >>>>>>>>>> Twttr: @debasishg >>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Debasish Ghosh >>>>>>>> http://manning.com/ghosh2 >>>>>>>> http://manning.com/ghosh >>>>>>>> >>>>>>>> Twttr: @debasishg >>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>> Code: http://github.com/debasishg >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- > > Seth Wiesman | Solutions Architect > > +1 314 387 1463 > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg