Sorry for the previous incomplete email. Didn't realize I hit send! I was facing a weird compilation error in Scala when I did val joinedStream = stream1.connect(stream2) .transform("funName", outTypeInfo, joinOperator)
It turned out to be due to a difference in API signature between Scala and Java API. I was refering to javadoc. Is there a scaladoc? Java API has public <R> SingleOutputStreamOperator<R> transform( String functionName, TypeInformation<R> outTypeInfo, TwoInputStreamOperator<IN1, IN2, R> operator) Scala API has def transform[R: TypeInformation]( functionName: String, operator: TwoInputStreamOperator[IN1, IN2, R]) Srikanth On Mon, May 2, 2016 at 7:18 PM, Srikanth <srikanth...@gmail.com> wrote: > Hello, > > I'm fac > > val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic", > new SimpleStringSchema(), properties)) > val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => > BidderRawLogs(b)).keyBy(b => b.strategyId) > > val metaStrategy: KeyedStream[(Int, String), Int] = > env.readTextFile("path").name("Strategy") > .map((1, _) ).keyBy(_._1) > > val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo() > val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo() > val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), > (Int, BidderRawLogs, (Int, String))] = > new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, > staticTypeInfo) > val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() > {}.getTypeInfo() > > val funName = "test" > val joinedStream = bidderStream.connect(metaStrategy) > .transform(funName, joinOperator, outTypeInfo) > >