Hi Till, Thanks for your reply, I manage do some experiments and has result as some worked and some not. I hope you can give me a bit more insight:
As your suggestion to impl a `RichAsyncFunction` with transient field, like this and having error ``` Class 'RichAsyncHttpClient' must either be declared abstract or implement abstract member 'executionContext: ExecutionContextExecutor' in ‘com.parcelperform.util.RichAsyncHttpClient’ ``` ``` class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ PPLogger.getActivityLogger.info("###########INIT ------------------- ") @transient implicit var materializer: ActorMaterializer @transient implicit var system: ActorSystem @transient implicit var executionContext: ExecutionContextExecutor override def asyncInvoke(input: Shipment, resultFuture: async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json")) resultFutureRequested.onComplete { case Success(res) => { resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection) } case Failure(x) => { resultFuture.complete(Iterable(Either(x)).asJavaCollection) } } } override def open(parameters: Configuration): Unit = { super.open(parameters) system = ActorSystem("my-system") executionContext = system.dispatcher materializer = ActorMaterializer() } } ``` Aslo the Usage of that class, I has error, I guess its because of java/scala issue. In flink doc, for java code they use RichAsyncFunction and for scala they use AsyncFunction: ``` // AsyncDataStream.unorderedWait(streamShipment, new RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: RichAsyncHttpClient ``` ### So I try to fix my current code again with transient field and move it into constructor: ``` class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ @transient implicit lazy val system = { PPLogger.getActivityLogger.info("###########INIT ------------------- ") ActorSystem("my-system") } @transient implicit lazy val executionContext = { system.dispatcher } @transient implicit lazy val materializer: ActorMaterializer = { PPLogger.getActivityLogger.info("###########DONE ------------------- ") ActorMaterializer() } override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { // PPLogger.getActivityLogger.info("###########INIT ------------------- ") // implicit val system = ActorSystem("my-system") // implicit val executionContext = system.dispatcher // implicit val materializer: ActorMaterializer = ActorMaterializer() // PPLogger.getActivityLogger.info("###########DONE ------------------- ") val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json")) resultFutureRequested.onComplete { case Success(res) => { resultFuture.complete(Iterable(Right(res.entity))) } case Failure(x) => { resultFuture.complete(Iterable(Left(x))) } } } override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out.")))) } } ``` And its run ok. The log was print only one. I still asking about this because I haven’t understand the term `That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability `. I though for all the code executed inside TaskManger? Thanks for being patient with me, till here Andy, > On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Andy, > > without being an expert of Akka's http client, I think you should not create > a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would > recommend you instead is to implement a `RichAsyncFunction` with a transient > field for `ActorMaterializer` which you initialize in the > `RichAsyncFunction#open` method. That way you only create the > `ActorMaterialier` on the `TaskManager` where the operator is executed and > solve the problem of serializability and you make it much more efficient > because you don't create a new `ActorSystem` for every request. > > Cheers, > Till > > On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com > <mailto:a...@parcelperform.com>> wrote: > Hi guys, > > I’m try to decide which http client to go with Flink, currently I tested with > scalaj and akka http client and both work ok with our current dev environment. > For scalaj its is pretty straight forward since its is just calling an http > request with its timeout. > > For akka http client its a bit more complicated (I’m new to scala and all), > so I’m asking if am I doing it right by create a AsyncFunction like this > ``` > class AsyncHttpClient( args: Array[String] = Array()) extends > AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ > > override def asyncInvoke(input: Shipment, resultFuture: > ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { > > PPLogger.getActivityLogger.info("###########INIT ------------------- ") > implicit val system = ActorSystem("my-system") > implicit val executionContext = system.dispatcher > implicit val materializer: ActorMaterializer = ActorMaterializer() > val resultFutureRequested: Future[HttpResponse] = > Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json > <https://httpbin.org/json>")) > PPLogger.getActivityLogger.info("###########DONE ------------------- ") > > > resultFutureRequested.onComplete { > case Success(res) => { > resultFuture.complete(Iterable(Right(res.entity))) > } > case Failure(x) => { > resultFuture.complete(Iterable(Left(x))) > } > } > > } > > override def timeout(input: Shipment, resultFuture: > ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { > resultFuture.complete(Iterable(Left(new TimeoutException("Async function > call has timed out.")))) > } > } > ``` > I notice that I have to implicit create a bunch of variable inside the > asyncInvoke method. I’m not sure if I’m doing it right, or just adding the > overhead. I did try to init them in the constructor of the class but the > compiler just throw a bunch of Not implemented Serializer error. > > My lib: > "com.typesafe.akka" %% "akka-http" % "10.1.8", > "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test, > > My flink: > scala 2.12 > flink 1.70 > > > > Any reply are appreciated! > > Thanks a bunch > > Andy, > > >