Hi Andy, without being an expert of Akka's http client, I think you should not create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would recommend you instead is to implement a `RichAsyncFunction` with a transient field for `ActorMaterializer` which you initialize in the `RichAsyncFunction#open` method. That way you only create the `ActorMaterialier` on the `TaskManager` where the operator is executed and solve the problem of serializability and you make it much more efficient because you don't create a new `ActorSystem` for every request.
Cheers, Till On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com> wrote: > Hi guys, > > I’m try to decide which http client to go with Flink, currently I tested > with scalaj and akka http client and both work ok with our current dev > environment. > For scalaj its is pretty straight forward since its is just calling an > http request with its timeout. > > For akka http client its a bit more complicated (I’m new to scala and > all), so I’m asking if am I doing it right by create a AsyncFunction like > this > ``` > > class AsyncHttpClient( args: Array[String] = Array()) extends > AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ > > override def asyncInvoke(input: Shipment, resultFuture: > ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { > > PPLogger.getActivityLogger.info("###########INIT ------------------- ") > implicit val system = ActorSystem("my-system") > implicit val executionContext = system.dispatcher > implicit val materializer: ActorMaterializer = ActorMaterializer() > val resultFutureRequested: Future[HttpResponse] = > Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json")) > PPLogger.getActivityLogger.info("###########DONE ------------------- ") > > > resultFutureRequested.onComplete { > case Success(res) => { > resultFuture.complete(Iterable(Right(res.entity))) > } > case Failure(x) => { > resultFuture.complete(Iterable(Left(x))) > } > } > > } > > override def timeout(input: Shipment, resultFuture: > ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { > resultFuture.complete(Iterable(Left(new TimeoutException("Async function > call has timed out.")))) > } > } > > ``` > I notice that I have to implicit create a bunch of variable inside the > asyncInvoke method. I’m not sure if I’m doing it right, or just adding the > overhead. I did try to init them in the constructor of the class but the > compiler just throw a bunch of Not implemented Serializer error. > > My lib: > "com.typesafe.akka" %% "akka-http" % "10.1.8", > "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test, > > My flink: > scala 2.12 > flink 1.70 > > > > Any reply are appreciated! > > Thanks a bunch > > Andy, > > > >