Hi Chesnay,

Thanks for your time. Much appreciated.

I am getting error on the below line:

val res:DataStream[String]=AsyncDataStream.unorderedWait(goodRecords,new
CassandraAsyncSink(),1000,TimeUnit.SECONDS,100)

Yes, earlier it was a wrong import but it is still giving me the below
error:

type mismatch;
 found   : KafkaAsSource.CassandraAsyncSink
 required:
org.apache.flink.streaming.api.scala.async.AsyncFunction[?,String]


P.S: I am not sure whether this would make any difference but in the code
of asyncInvoke, I am not using any ResultFuture or the template given on
the official doc for the AsyncIO example. Anyways the method's return type
is Unit so I am using session.execute("query") under the method which is
used for some RichSinkFunction.

What else am I missing?



On Wed, Jan 12, 2022 at 4:47 PM Chesnay Schepler <ches...@apache.org> wrote:

> It would have good to clarify which line causes the error; as is I can
> only guess.
>
> Please make sure you use the scala variant of the AsyncDataStream
> (org.apache.flink.streaming.api.scala.AsyncDataStream).
>
>
> On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote:
> > I am using below code to get the data from the side output which has
> > filtered records.
> > So, it goes like this:
> >
> > val filterRecords: DataStream[String] = src.process(new
> > ProcessFunction()).getSideOutput(filteredOutputTag)
> >
> > It has filtered records in it.
> >
> > Now, I want to add these records to the db asynchronously. Therefore,
> > I wrote below code using documentation reference:
> >
> > val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO
> > reference
> > AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000,
> > TimeUnit.SECONDS, 100) //Documentation Reference
> >
> > and the class for the "DBAsyncSink" is as follows:
> >
> > class DBAsyncSink extends RichAsyncFunction[String,String] {
> >
> >   override def open(parameters: Configuration): Unit = {
> >
> >   }
> >
> >   override def asyncInvoke(input:String, resultFuture:
> > ResultFuture[String]): Unit = {
> >
> >   }
> >
> >   override def close(): Unit = {
> >     session.close()
> >   }
> >
> > }
> >
> > I am getting below error:
> >
> > type mismatch;
> >  found   : org.apache.flink.streaming.api.scala.DataStream[String]
> >  required: org.apache.flink.streaming.api.datastream.DataStream[?]
> >
> > What am I missing over here? I tried a couple of examples but it
> > didn't work.
> >
> > Thanks,
> > Sid
>
>
>

Reply via email to