Yes, I did notice the usage of implicit in ConnectedStreams.scala.
Better Scaladoc will be helpful, especially when compiler errors are not
clear.

Thanks

On Tue, May 3, 2016 at 5:02 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> There is a Scaladoc but it is not covering all packages, unfortunately. In
> the Scala API you can call transform without specifying a TypeInformation,
> it works using implicits/context bounds.
>
> On Tue, 3 May 2016 at 01:48 Srikanth <srikanth...@gmail.com> wrote:
>
>> Sorry for the previous incomplete email. Didn't realize I hit send!
>>
>> I was facing a weird compilation error in Scala when I did
>> val joinedStream = stream1.connect(stream2)
>> .transform("funName", outTypeInfo, joinOperator)
>>
>> It turned out to be due to a difference in API signature between Scala
>> and Java API. I was refering to javadoc. Is there a scaladoc?
>>
>> Java API has
>> public <R> SingleOutputStreamOperator<R> transform(
>>                         String functionName,
>> TypeInformation<R> outTypeInfo,
>> TwoInputStreamOperator<IN1, IN2, R> operator)
>>
>> Scala API has
>> def transform[R: TypeInformation](
>>       functionName: String,
>>       operator: TwoInputStreamOperator[IN1, IN2, R])
>>
>> Srikanth
>>
>> On Mon, May 2, 2016 at 7:18 PM, Srikanth <srikanth...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I'm fac
>>>
>>> val stream = env.addSource(new
>>> FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(),
>>> properties))
>>> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
>>> BidderRawLogs(b)).keyBy(b => b.strategyId)
>>>
>>> val metaStrategy: KeyedStream[(Int, String), Int] =
>>> env.readTextFile("path").name("Strategy")
>>>  .map((1, _) ).keyBy(_._1)
>>>
>>> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
>>> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
>>> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
>>> (Int, BidderRawLogs, (Int, String))] =
>>>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
>>> staticTypeInfo)
>>> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
>>> {}.getTypeInfo()
>>>
>>> val funName = "test"
>>> val joinedStream = bidderStream.connect(metaStrategy)
>>> .transform(funName, joinOperator, outTypeInfo)
>>>
>>>
>>

Reply via email to