actually the scala and java code are completely separate - in fact they are
part of separate test suites. We have both scala and Java API in our
application but they r completely separate .. and yeah in Scala the
implicits did the trick while I had to pass the TypeInformation explicitly
with addSource ..


On Mon, Aug 26, 2019 at 10:00 PM Rong Rong <> wrote:

> Glad that you sort it out and sorry for the late reply.
> yes. I think the problem is how your `TypeInformation` for `Data` is being
> passed to the DataStreamSource construct.
> Regarding why scala side works but not java, there might've been something
> to do with the implicit variable passing for your `readStream`, which is
> very tricky mixing with Java code. So I would avoid mixing them if possible.
> --
> Rong
> On Sun, Aug 25, 2019 at 11:10 PM Debasish Ghosh <>
> wrote:
>> Looks like using the following overload of
>> StreamExecutionEnvironment.addSource which takes a TypeInformation as well,
>> does the trick ..
>> env.<Data>addSource(
>>   FlinkSource.<Data>collectionSourceFunction(data),
>>   TypeInformation.<Data>of(Data.class)
>> )
>> regards.
>> On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh <>
>> wrote:
>>> oh .. and I am using Flink 1.8 ..
>>> On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh <
>>>> wrote:
>>>> Thanks for the feedback .. here are the details ..
>>>> Just to give u some background the original API is a Scala API as
>>>> follows ..
>>>> final def readStream[In: TypeInformation: DeserializationSchema](inlet:
>>>> CodecInlet[In]): DataStream[In] =
>>>>     context.readStream(inlet)
>>>> and the *Scala version of the code runs fine* .. Here's the Java API
>>>> (also written in Scala though but passing type information and
>>>> deserialization schema explicitly and using the DataStream class from Flink
>>>> Java) ..
>>>> final def readStream[In](inlet: CodecInlet[In], clazz: Class[In],
>>>> deserializationSchema: DeserializationSchema[In]): JDataStream[In] =
>>>>     context.readStream(inlet)(TypeInformation.of[In](clazz),
>>>> deserializationSchema)
>>>>       .javaStream
>>>> Here's the Java code for transformation where I get the error ..
>>>> DataStream<Data> ins =
>>>>   this.<Data>readStream(in, Data.class, serdeData)
>>>>       .map((Data d) -> d)
>>>>       .returns(new TypeHint<Data>(){}.getTypeInfo());
>>>> DataStream<Simple> simples = d) -> new
>>>> Simple(d.getName())); // .returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>> DataStreamSink<Simple> sink = writeStream(out, simples, Simple.class,
>>>> serdeSimple);
>>>> Here's the corresponding Scala code that runs fine ..
>>>> val ins: DataStream[Data] = readStream(in)
>>>> val simples: DataStream[Simple] = ⇒ new Simple(r.getName()))
>>>> writeStream(out, simples)
>>>> Here's the custom source that's also referred in the exception .. the
>>>> case class is directly used in Scala while I use the Java API that uses
>>>> that case class from Java ..
>>>> object FlinkSource {
>>>>   case class CollectionSourceFunction[T](data: Seq[T]) extends
>>>> SourceFunction[T] {
>>>>     def cancel(): Unit = {}
>>>>     def run(ctx: SourceContext[T]): Unit = {
>>>>       data.foreach(d ⇒ ctx.collect(d))
>>>>     }
>>>>   }
>>>>   /**
>>>>    * Java API
>>>>    */
>>>>   def collectionSourceFunction[T](data: java.util.List[T]) =
>>>>     CollectionSourceFunction(data.asScala.toSeq)
>>>> }
>>>> Here's how I use the custom source from Java .. (which gives exception)
>>>> .. here data is a java.util.List<Data>
>>>> env.<Data>addSource(
>>>>   FlinkSource.<Data>collectionSourceFunction(data)
>>>> )
>>>> and here's the Scala version, which runs fine .. here data is a
>>>> scala.Seq[Data]
>>>> env.addSource(FlinkSource.CollectionSourceFunction(data))
>>>> Here's the complete exception ..
>>>> [info]   org.apache.flink.api.common.functions.InvalidTypesException:
>>>> The return type of function 'Custom Source' could not be determined
>>>> automatically, due to type erasure. You can give type information hints by
>>>> using the returns(...) method on the result of the transformation call, or
>>>> by letting your function implement the 'ResultTypeQueryable' interface.
>>>> [info]   at
>>>> org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(
>>>> [info]   at
>>>> org.apache.flink.streaming.api.datastream.DataStream.getType(
>>>> [info]   at
>>>> [info]   at
>>>> pipelines.flink.FlinkStreamletLogic.readStream(FlinkStreamlet.scala:237)
>>>> [info]   at
>>>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(
>>>> [info]   at
>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
>>>> [info]   at
>>>> [info]   at
>>>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
>>>> [info]   at
>>>> [info]   at
>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(
>>>> [info]   ...
>>>> [info]   Cause:
>>>> org.apache.flink.api.common.functions.InvalidTypesException: Type of
>>>> TypeVariable 'T' in 'class
>>>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
>>>> determined. This is most likely a type erasure problem. The type extraction
>>>> currently supports types with generic variables only in cases where all
>>>> variables in the return type can be deduced from the input type(s).
>>>> Otherwise the type has to be specified explicitly using type information.
>>>> [info]   at
>>>> [info]   at
>>>> [info]   at
>>>> [info]   at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(
>>>> [info]   at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(
>>>> [info]   at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(
>>>> [info]   at
>>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(
>>>> [info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> [info]   at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(
>>>> [info]   at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>> [info]   ...
>>>> regards.
>>>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong <> wrote:
>>>>> I am not sure how the function `readStream` is implemented (also which
>>>>> version of Flink are you using?).
>>>>> Can you share more information on your code blocks and exception logs?
>>>>> Also to answer your question, DataStream return type is determined by
>>>>> its underlying transformation, so you cannot set it directly.
>>>>> Thanks,
>>>>> Rong
>>>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <
>>>>>> wrote:
>>>>>> Thanks .. I tried this ..
>>>>>> DataStream<Data> ins = readStream(in, Data.class,
>>>>>> serdeData).map((Data d) -> d).returns(new 
>>>>>> TypeHint<Data>(){}.getTypeInfo());
>>>>>> But still get the same error on this line ..
>>>>>> (BTW I am not sure how to invoke returns on a DataStream and hence
>>>>>> had to do a fake map - any suggestions here ?)
>>>>>> regards.
>>>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <>
>>>>>> wrote:
>>>>>>> Hi Debasish,
>>>>>>> I think the error refers to the output of your source instead of
>>>>>>> your result of the map function. E.g.
>>>>>>> DataStream<Data> ins = readStream(in, Data.class, 
>>>>>>> serdeData)*.returns(new
>>>>>>> TypeInformation<Data>);*
>>>>>>> DataStream<Simple> simples = d) -> new
>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.
>>>>>>> getTypeInfo());
>>>>>>> --
>>>>>>> Rong
>>>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <
>>>>>>>> wrote:
>>>>>>>> Hello -
>>>>>>>> I have the following call to addSource where I pass a Custom
>>>>>>>> SourceFunction ..
>>>>>>>> env.<Data>addSource(
>>>>>>>>   new CollectionSourceFunctionJ<Data>(data,
>>>>>>>> TypeInformation.<Data>of(new TypeHint<Data>(){}))
>>>>>>>> )
>>>>>>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala
>>>>>>>> case class ..
>>>>>>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T],
>>>>>>>> ti: TypeInformation[T]) extends SourceFunction[T] {
>>>>>>>>   def cancel(): Unit = {}
>>>>>>>>   def run(ctx: SourceContext[T]): Unit = {
>>>>>>>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>>>>>>>   }
>>>>>>>> }
>>>>>>>> When the following transformation runs ..
>>>>>>>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>>>>>>>> DataStream<Simple> simples = d) -> new
>>>>>>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>>>>>> I get the following exception in the second line ..
>>>>>>>> org.apache.flink.api.common.functions.InvalidTypesException: The
>>>>>>>>> return type of function 'Custom Source' could not be determined
>>>>>>>>> automatically, due to type erasure. You can give type information 
>>>>>>>>> hints by
>>>>>>>>> using the returns(...) method on the result of the transformation 
>>>>>>>>> call, or
>>>>>>>>> by letting your function implement the 'ResultTypeQueryable' 
>>>>>>>>> interface.
>>>>>>>> Initially the returns call was not there and I was getting the
>>>>>>>> same exception. Now after adding the returns call, nothing changes.
>>>>>>>> Any help will be appreciated ..
>>>>>>>> regards.
