I am using below code to get the data from the side output which has filtered records. So, it goes like this:
val filterRecords: DataStream[String] = src.process(new ProcessFunction()).getSideOutput(filteredOutputTag) It has filtered records in it. Now, I want to add these records to the db asynchronously. Therefore, I wrote below code using documentation reference: val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO reference AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000, TimeUnit.SECONDS, 100) //Documentation Reference and the class for the "DBAsyncSink" is as follows: class DBAsyncSink extends RichAsyncFunction[String,String] { override def open(parameters: Configuration): Unit = { } override def asyncInvoke(input:String, resultFuture: ResultFuture[String]): Unit = { } override def close(): Unit = { session.close() } } I am getting below error: type mismatch; found : org.apache.flink.streaming.api.scala.DataStream[String] required: org.apache.flink.streaming.api.datastream.DataStream[?] What am I missing over here? I tried a couple of examples but it didn't work. Thanks, Sid