Hi Chesnay, Thanks for your time. Much appreciated.
I am getting error on the below line: val res:DataStream[String]=AsyncDataStream.unorderedWait(goodRecords,new CassandraAsyncSink(),1000,TimeUnit.SECONDS,100) Yes, earlier it was a wrong import but it is still giving me the below error: type mismatch; found : KafkaAsSource.CassandraAsyncSink required: org.apache.flink.streaming.api.scala.async.AsyncFunction[?,String] P.S: I am not sure whether this would make any difference but in the code of asyncInvoke, I am not using any ResultFuture or the template given on the official doc for the AsyncIO example. Anyways the method's return type is Unit so I am using session.execute("query") under the method which is used for some RichSinkFunction. What else am I missing? On Wed, Jan 12, 2022 at 4:47 PM Chesnay Schepler <ches...@apache.org> wrote: > It would have good to clarify which line causes the error; as is I can > only guess. > > Please make sure you use the scala variant of the AsyncDataStream > (org.apache.flink.streaming.api.scala.AsyncDataStream). > > > On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote: > > I am using below code to get the data from the side output which has > > filtered records. > > So, it goes like this: > > > > val filterRecords: DataStream[String] = src.process(new > > ProcessFunction()).getSideOutput(filteredOutputTag) > > > > It has filtered records in it. > > > > Now, I want to add these records to the db asynchronously. Therefore, > > I wrote below code using documentation reference: > > > > val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO > > reference > > AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000, > > TimeUnit.SECONDS, 100) //Documentation Reference > > > > and the class for the "DBAsyncSink" is as follows: > > > > class DBAsyncSink extends RichAsyncFunction[String,String] { > > > > override def open(parameters: Configuration): Unit = { > > > > } > > > > override def asyncInvoke(input:String, resultFuture: > > ResultFuture[String]): Unit = { > > > > } > > > > override def close(): Unit = { > > session.close() > > } > > > > } > > > > I am getting below error: > > > > type mismatch; > > found : org.apache.flink.streaming.api.scala.DataStream[String] > > required: org.apache.flink.streaming.api.datastream.DataStream[?] > > > > What am I missing over here? I tried a couple of examples but it > > didn't work. > > > > Thanks, > > Sid > > >