Yes, I did notice the usage of implicit in ConnectedStreams.scala. Better Scaladoc will be helpful, especially when compiler errors are not clear.
Thanks On Tue, May 3, 2016 at 5:02 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > There is a Scaladoc but it is not covering all packages, unfortunately. In > the Scala API you can call transform without specifying a TypeInformation, > it works using implicits/context bounds. > > On Tue, 3 May 2016 at 01:48 Srikanth <srikanth...@gmail.com> wrote: > >> Sorry for the previous incomplete email. Didn't realize I hit send! >> >> I was facing a weird compilation error in Scala when I did >> val joinedStream = stream1.connect(stream2) >> .transform("funName", outTypeInfo, joinOperator) >> >> It turned out to be due to a difference in API signature between Scala >> and Java API. I was refering to javadoc. Is there a scaladoc? >> >> Java API has >> public <R> SingleOutputStreamOperator<R> transform( >> String functionName, >> TypeInformation<R> outTypeInfo, >> TwoInputStreamOperator<IN1, IN2, R> operator) >> >> Scala API has >> def transform[R: TypeInformation]( >> functionName: String, >> operator: TwoInputStreamOperator[IN1, IN2, R]) >> >> Srikanth >> >> On Mon, May 2, 2016 at 7:18 PM, Srikanth <srikanth...@gmail.com> wrote: >> >>> Hello, >>> >>> I'm fac >>> >>> val stream = env.addSource(new >>> FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(), >>> properties)) >>> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => >>> BidderRawLogs(b)).keyBy(b => b.strategyId) >>> >>> val metaStrategy: KeyedStream[(Int, String), Int] = >>> env.readTextFile("path").name("Strategy") >>> .map((1, _) ).keyBy(_._1) >>> >>> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo() >>> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo() >>> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), >>> (Int, BidderRawLogs, (Int, String))] = >>> new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, >>> staticTypeInfo) >>> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() >>> {}.getTypeInfo() >>> >>> val funName = "test" >>> val joinedStream = bidderStream.connect(metaStrategy) >>> .transform(funName, joinOperator, outTypeInfo) >>> >>> >>