Thanks for the clear explanation ..
On Mon, Aug 26, 2019 at 10:34 PM Seth Wiesman wrote:
> Hi Debasish,
>
> As it seems your aware TypeInformation is Flinkās internal type system
> used for serialization between tasks and in/out of state backends.
>
> The issue you are seeing is because you are
actually the scala and java code are completely separate - in fact they are
part of separate test suites. We have both scala and Java API in our
application but they r completely separate .. and yeah in Scala the
implicits did the trick while I had to pass the TypeInformation explicitly
with addSou
Glad that you sort it out and sorry for the late reply.
yes. I think the problem is how your `TypeInformation` for `Data` is being
passed to the DataStreamSource construct.
Regarding why scala side works but not java, there might've been something
to do with the implicit variable passing for your
Looks like using the following overload of
StreamExecutionEnvironment.addSource which takes a TypeInformation as well,
does the trick ..
env.addSource(
FlinkSource.collectionSourceFunction(data),
TypeInformation.of(Data.class)
)
regards.
On Mon, Aug 26, 2019 at 11:24 AM Debasish Ghosh
wrote
oh .. and I am using Flink 1.8 ..
On Mon, Aug 26, 2019 at 12:09 AM Debasish Ghosh
wrote:
> Thanks for the feedback .. here are the details ..
>
> Just to give u some background the original API is a Scala API as follows
> ..
>
> final def readStream[In: TypeInformation: DeserializationSchema](in
Thanks for the feedback .. here are the details ..
Just to give u some background the original API is a Scala API as follows ..
final def readStream[In: TypeInformation: DeserializationSchema](inlet:
CodecInlet[In]): DataStream[In] =
context.readStream(inlet)
and the *Scala version of the co
I am not sure how the function `readStream` is implemented (also which
version of Flink are you using?).
Can you share more information on your code blocks and exception logs?
Also to answer your question, DataStream return type is determined by its
underlying transformation, so you cannot set it
Thanks .. I tried this ..
DataStream ins = readStream(in, Data.class, serdeData).map((Data d)
-> d).returns(new TypeHint(){}.getTypeInfo());
But still get the same error on this line ..
(BTW I am not sure how to invoke returns on a DataStream and hence had to
do a fake map - any suggestions here
Hi Debasish,
I think the error refers to the output of your source instead of your
result of the map function. E.g.
DataStream ins = readStream(in, Data.class, serdeData)*.returns(new
TypeInformation);*
DataStream simples = ins.map((Data d) -> new Simple(d.getName()))
.returns(new TypeHint(){}.get
Hello -
I have the following call to addSource where I pass a Custom SourceFunction
..
env.addSource(
new CollectionSourceFunctionJ(data, TypeInformation.of(new
TypeHint(){}))
)
where data is List and CollectionSourceFunctionJ is a Scala case
class ..
case class CollectionSourceFunctionJ[T](d
10 matches
Mail list logo