It would have good to clarify which line causes the error; as is I can only guess.

Please make sure you use the scala variant of the AsyncDataStream (org.apache.flink.streaming.api.scala.AsyncDataStream).


On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote:
I am using below code to get the data from the side output which has filtered records.
So, it goes like this:

val filterRecords: DataStream[String] = src.process(new ProcessFunction()).getSideOutput(filteredOutputTag)

It has filtered records in it.

Now, I want to add these records to the db asynchronously. Therefore, I wrote below code using documentation reference:

val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO reference AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000, TimeUnit.SECONDS, 100) //Documentation Reference

and the class for the "DBAsyncSink" is as follows:

class DBAsyncSink extends RichAsyncFunction[String,String] {

  override def open(parameters: Configuration): Unit = {

  }

  override def asyncInvoke(input:String, resultFuture: ResultFuture[String]): Unit = {

  }

  override def close(): Unit = {
    session.close()
  }

}

I am getting below error:

type mismatch;
 found   : org.apache.flink.streaming.api.scala.DataStream[String]
 required: org.apache.flink.streaming.api.datastream.DataStream[?]

What am I missing over here? I tried a couple of examples but it didn't work.

Thanks,
Sid


Reply via email to