It would have good to clarify which line causes the error; as is I can
only guess.
Please make sure you use the scala variant of the AsyncDataStream
(org.apache.flink.streaming.api.scala.AsyncDataStream).
On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote:
I am using below code to get the data from the side output which has
filtered records.
So, it goes like this:
val filterRecords: DataStream[String] = src.process(new
ProcessFunction()).getSideOutput(filteredOutputTag)
It has filtered records in it.
Now, I want to add these records to the db asynchronously. Therefore,
I wrote below code using documentation reference:
val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO
reference
AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000,
TimeUnit.SECONDS, 100) //Documentation Reference
and the class for the "DBAsyncSink" is as follows:
class DBAsyncSink extends RichAsyncFunction[String,String] {
override def open(parameters: Configuration): Unit = {
}
override def asyncInvoke(input:String, resultFuture:
ResultFuture[String]): Unit = {
}
override def close(): Unit = {
session.close()
}
}
I am getting below error:
type mismatch;
found : org.apache.flink.streaming.api.scala.DataStream[String]
required: org.apache.flink.streaming.api.datastream.DataStream[?]
What am I missing over here? I tried a couple of examples but it
didn't work.
Thanks,
Sid