Hi guys, I’m try to decide which http client to go with Flink, currently I tested with scalaj and akka http client and both work ok with our current dev environment. For scalaj its is pretty straight forward since its is just calling an http request with its timeout.
For akka http client its a bit more complicated (I’m new to scala and all), so I’m asking if am I doing it right by create a AsyncFunction like this ``` class AsyncHttpClient( args: Array[String] = Array()) extends AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{ override def asyncInvoke(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { PPLogger.getActivityLogger.info("###########INIT ------------------- ") implicit val system = ActorSystem("my-system") implicit val executionContext = system.dispatcher implicit val materializer: ActorMaterializer = ActorMaterializer() val resultFutureRequested: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json")) PPLogger.getActivityLogger.info("###########DONE ------------------- ") resultFutureRequested.onComplete { case Success(res) => { resultFuture.complete(Iterable(Right(res.entity))) } case Failure(x) => { resultFuture.complete(Iterable(Left(x))) } } } override def timeout(input: Shipment, resultFuture: ResultFuture[Either[Throwable, ResponseEntity]]): Unit = { resultFuture.complete(Iterable(Left(new TimeoutException("Async function call has timed out.")))) } } ``` I notice that I have to implicit create a bunch of variable inside the asyncInvoke method. I’m not sure if I’m doing it right, or just adding the overhead. I did try to init them in the constructor of the class but the compiler just throw a bunch of Not implemented Serializer error. My lib: "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test, My flink: scala 2.12 flink 1.70 Any reply are appreciated! Thanks a bunch Andy,