Sorry for the previous incomplete email. Didn't realize I hit send!

I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)

It turned out to be due to a difference in API signature between Scala and
Java API. I was refering to javadoc. Is there a scaladoc?

Java API has
public <R> SingleOutputStreamOperator<R> transform(
                        String functionName,
TypeInformation<R> outTypeInfo,
TwoInputStreamOperator<IN1, IN2, R> operator)

Scala API has
def transform[R: TypeInformation](
      functionName: String,
      operator: TwoInputStreamOperator[IN1, IN2, R])

Srikanth

On Mon, May 2, 2016 at 7:18 PM, Srikanth <srikanth...@gmail.com> wrote:

> Hello,
>
> I'm fac
>
> val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
> new SimpleStringSchema(), properties))
> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
> BidderRawLogs(b)).keyBy(b => b.strategyId)
>
> val metaStrategy: KeyedStream[(Int, String), Int] =
> env.readTextFile("path").name("Strategy")
>  .map((1, _) ).keyBy(_._1)
>
> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
> (Int, BidderRawLogs, (Int, String))] =
>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
> staticTypeInfo)
> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
> {}.getTypeInfo()
>
> val funName = "test"
> val joinedStream = bidderStream.connect(metaStrategy)
> .transform(funName, joinOperator, outTypeInfo)
>
>

Reply via email to