Thanks for the clear explanation ..

On Mon, Aug 26, 2019 at 10:34 PM Seth Wiesman <s...@ververica.com> wrote:

> Hi Debasish,
>
> As it seems your aware TypeInformation is Flink’s internal type system
> used for serialization between tasks and in/out of state backends.
>
> The issue you are seeing is because you are using a Scala case class with
> flink-streaming-java. Flinks java api derives type information using java
> reflection and only knows about java types and patterns such as pojos. To
> use your case class from java and not fall back to Kryo you must either
> rewrite it to be a pojo or register a custom serializer.
>
> Scala TypeInformation is defined in flink-scala and flink-streaming-scala
> and generates type information using macros. There is no way to create a
> CaseClassTypInfo from java (technically you could but it I’m not sure how
> easy it would be).
>
> This often comes up as a point of confusion because flink-streaming-java
> pulls in Scala. This is due to a transitive dependency on flink-runtime
> which uses Scala for some parts of the Job Manager and Task Managers.
> flink-streaming-java itself is Scala oblivious and a short to mid term goal
> of the project is to either remove or shade away these components so java
> users have a pure java dependency.
>
> Seth
>
> On Mon, Aug 26, 2019 at 11:59 AM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> actually the scala and java code are completely separate - in fact they
>> are part of separate test suites. We have both scala and Java API in our
>> application but they r completely separate .. and yeah in Scala the
>> implicits did the trick while I had to pass the TypeInformation explicitly
>> with addSource ..
>>
>> regards.
>>
>> On Mon, Aug 26, 2019 at 10:00 PM Rong Rong <walter...@gmail.com> wrote:
>>
>>> Glad that you sort it out and sorry for the late reply.
>>> yes. I think the problem is how your `TypeInformation` for `Data` is
>>> being passed to the DataStreamSource construct.
>>>
>>> Regarding why scala side works but not java, there might've been
>>> something to do with the implicit variable passing for your `readStream`,
>>> which is very tricky mixing with Java code. So I would avoid mixing them if
>>> possible.
>>>
>>> --
>>> Rong
>>>
>>> On Sun, Aug 25, 2019 at 11:10 PM Debasish Ghosh <
>>> ghosh.debas...@gmail.com> wrote:
>>>
>>>> Looks like using the following overload of
>>>> StreamExecutionEnvironment.addSource which takes a TypeInformation as well,
>>>> does the trick ..
>>>>
>>>> env.<Data>addSource(
>>>>   FlinkSource.<Data>collectionSourceFunction(data),
>>>>   TypeInformation.<Data>of(Data.class)
>>>> )
>>>>
>>>> regards.
>>>>
>>>> On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> oh .. and I am using Flink 1.8 ..
>>>>>
>>>>> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the feedback .. here are the details ..
>>>>>>
>>>>>> Just to give u some background the original API is a Scala API as
>>>>>> follows ..
>>>>>>
>>>>>> final def readStream[In: TypeInformation:
>>>>>> DeserializationSchema](inlet: CodecInlet[In]): DataStream[In] =
>>>>>>     context.readStream(inlet)
>>>>>>
>>>>>> and the *Scala version of the code runs fine* .. Here's the Java API
>>>>>> (also written in Scala though but passing type information and
>>>>>> deserialization schema explicitly and using the DataStream class from 
>>>>>> Flink
>>>>>> Java) ..
>>>>>>
>>>>>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
>>>>>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
>>>>>>     context.readStream(inlet)(TypeInformation.of[In](clazz),
>>>>>> deserializationSchema)
>>>>>>       .javaStream
>>>>>>
>>>>>> Here's the Java code for transformation where I get the error ..
>>>>>>
>>>>>> DataStream<Data> ins =
>>>>>>   this.<Data>readStream(in, Data.class, serdeData)
>>>>>>       .map((Data d) -> d)
>>>>>>       .returns(new TypeHint<Data>(){}.getTypeInfo());
>>>>>>
>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>> Simple(d.getName())); // .returns(new 
>>>>>> TypeHint<Simple>(){}.getTypeInfo());
>>>>>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class,
>>>>>> serdeSimple);
>>>>>>
>>>>>> Here's the corresponding Scala code that runs fine ..
>>>>>>
>>>>>> val ins: DataStream[Data] = readStream(in)
>>>>>> val simples: DataStream[Simple] = ins.map(r ⇒ new Simple(r.getName()))
>>>>>> writeStream(out, simples)
>>>>>>
>>>>>> Here's the custom source that's also referred in the exception .. the
>>>>>> case class is directly used in Scala while I use the Java API that uses
>>>>>> that case class from Java ..
>>>>>>
>>>>>> object FlinkSource {
>>>>>>   case class CollectionSourceFunction[T](data: Seq[T]) extends
>>>>>> SourceFunction[T] {
>>>>>>     def cancel(): Unit = {}
>>>>>>     def run(ctx: SourceContext[T]): Unit = {
>>>>>>       data.foreach(d ⇒ ctx.collect(d))
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>   /**
>>>>>>    * Java API
>>>>>>    */
>>>>>>   def collectionSourceFunction[T](data: java.util.List[T]) =
>>>>>>     CollectionSourceFunction(data.asScala.toSeq)
>>>>>> }
>>>>>>
>>>>>> Here's how I use the custom source from Java .. (which gives
>>>>>> exception) .. here data is a java.util.List<Data>
>>>>>>
>>>>>> env.<Data>addSource(
>>>>>>   FlinkSource.<Data>collectionSourceFunction(data)
>>>>>> )
>>>>>>
>>>>>> and here's the Scala version, which runs fine .. here data is a
>>>>>> scala.Seq[Data]
>>>>>>
>>>>>> env.addSource(FlinkSource.CollectionSourceFunction(data))
>>>>>>
>>>>>> Here's the complete exception ..
>>>>>>
>>>>>> [info]   org.apache.flink.api.common.functions.InvalidTypesException:
>>>>>> The return type of function 'Custom Source' could not be determined
>>>>>> automatically, due to type erasure. You can give type information hints 
>>>>>> by
>>>>>> using the returns(...) method on the result of the transformation call, 
>>>>>> or
>>>>>> by letting your function implement the 'ResultTypeQueryable' interface.
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:587)
>>>>>> [info]   at
>>>>>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
>>>>>> [info]   at
>>>>>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
>>>>>> [info]   at
>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
>>>>>> [info]   at
>>>>>> pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
>>>>>> [info]   at
>>>>>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
>>>>>> [info]   at
>>>>>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
>>>>>> [info]   at
>>>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
>>>>>> [info]   ...
>>>>>> [info]   Cause:
>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: Type of
>>>>>> TypeVariable 'T' in 'class
>>>>>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not 
>>>>>> be
>>>>>> determined. This is most likely a type erasure problem. The type 
>>>>>> extraction
>>>>>> currently supports types with generic variables only in cases where all
>>>>>> variables in the return type can be deduced from the input type(s).
>>>>>> Otherwise the type has to be specified explicitly using type information.
>>>>>> [info]   at
>>>>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
>>>>>> [info]   at
>>>>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
>>>>>> [info]   at
>>>>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
>>>>>> [info]   at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
>>>>>> [info]   at
>>>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34)
>>>>>> [info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>> [info]   at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> [info]   at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> [info]   ...
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <walter...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I am not sure how the function `readStream` is implemented (also
>>>>>>> which version of Flink are you using?).
>>>>>>> Can you share more information on your code blocks and exception
>>>>>>> logs?
>>>>>>>
>>>>>>> Also to answer your question, DataStream return type is determined
>>>>>>> by its underlying transformation, so you cannot set it directly.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Rong
>>>>>>>
>>>>>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <
>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks .. I tried this ..
>>>>>>>>
>>>>>>>> DataStream<Data> ins = readStream(in, Data.class,
>>>>>>>> serdeData).map((Data d) -> d).returns(new 
>>>>>>>> TypeHint<Data>(){}.getTypeInfo());
>>>>>>>>
>>>>>>>> But still get the same error on this line ..
>>>>>>>>
>>>>>>>> (BTW I am not sure how to invoke returns on a DataStream and hence
>>>>>>>> had to do a fake map - any suggestions here ?)
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walter...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Debasish,
>>>>>>>>>
>>>>>>>>> I think the error refers to the output of your source instead of
>>>>>>>>> your result of the map function. E.g.
>>>>>>>>> DataStream<Data> ins = readStream(in, Data.class, 
>>>>>>>>> serdeData)*.returns(new
>>>>>>>>> TypeInformation<Data>);*
>>>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.
>>>>>>>>> getTypeInfo());
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Rong
>>>>>>>>>
>>>>>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <
>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello -
>>>>>>>>>>
>>>>>>>>>> I have the following call to addSource where I pass a Custom
>>>>>>>>>> SourceFunction ..
>>>>>>>>>>
>>>>>>>>>> env.<Data>addSource(
>>>>>>>>>>   new CollectionSourceFunctionJ<Data>(data,
>>>>>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){}))
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a
>>>>>>>>>> Scala case class ..
>>>>>>>>>>
>>>>>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T],
>>>>>>>>>> ti: TypeInformation[T]) extends SourceFunction[T] {
>>>>>>>>>>   def cancel(): Unit = {}
>>>>>>>>>>   def run(ctx: SourceContext[T]): Unit = {
>>>>>>>>>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> When the following transformation runs ..
>>>>>>>>>>
>>>>>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>>>>>>>>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>>>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>>>>>>
>>>>>>>>>> I get the following exception in the second line ..
>>>>>>>>>>
>>>>>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The
>>>>>>>>>>> return type of function 'Custom Source' could not be determined
>>>>>>>>>>> automatically, due to type erasure. You can give type information 
>>>>>>>>>>> hints by
>>>>>>>>>>> using the returns(...) method on the result of the transformation 
>>>>>>>>>>> call, or
>>>>>>>>>>> by letting your function implement the 'ResultTypeQueryable' 
>>>>>>>>>>> interface.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Initially the returns call was not there and I was getting the
>>>>>>>>>> same exception. Now after adding the returns call, nothing
>>>>>>>>>> changes.
>>>>>>>>>>
>>>>>>>>>> Any help will be appreciated ..
>>>>>>>>>>
>>>>>>>>>> regards.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Debasish Ghosh
>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>
>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
> --
>
> Seth Wiesman | Solutions Architect
>
> +1 314 387 1463
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to