Hi guys,
I’m try to decide which http client to go with Flink, currently I tested with
scalaj and akka http client and both work ok with our current dev environment.
For scalaj its is pretty straight forward since its is just calling an http
request with its timeout.
For akka http client its a bit more complicated (I’m new to scala and all), so
I’m asking if am I doing it right by create a AsyncFunction like this
```
class AsyncHttpClient( args: Array[String] = Array()) extends
AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
override def asyncInvoke(input: Shipment, resultFuture:
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
PPLogger.getActivityLogger.info("###########INIT ------------------- ")
implicit val system = ActorSystem("my-system")
implicit val executionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val resultFutureRequested: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json"))
PPLogger.getActivityLogger.info("###########DONE ------------------- ")
resultFutureRequested.onComplete {
case Success(res) => {
resultFuture.complete(Iterable(Right(res.entity)))
}
case Failure(x) => {
resultFuture.complete(Iterable(Left(x)))
}
}
}
override def timeout(input: Shipment, resultFuture:
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
resultFuture.complete(Iterable(Left(new TimeoutException("Async function
call has timed out."))))
}
}
```
I notice that I have to implicit create a bunch of variable inside the
asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
overhead. I did try to init them in the constructor of the class but the
compiler just throw a bunch of Not implemented Serializer error.
My lib:
"com.typesafe.akka" %% "akka-http" % "10.1.8",
"com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
My flink:
scala 2.12
flink 1.70
Any reply are appreciated!
Thanks a bunch
Andy,