Hi Till,
Sorry to bother you again, so I manage to build and work with akka http client
in my local
After deploy to yarn node, the actorsystem cant be connected.
```
PPLogger.getActivityLogger.info("########### 1")
implicit val system = ActorSystem("my-system")
PPLogger.getActivityLogger.info("########### 2")
```
So the line ### 2 was never print, and the method ended up timeout. Honestly I
dont know how to debug with this case.
I’m just curious how people ended up using any async http client without hassle?
Thanks,
Andy,
> On Apr 12, 2019, at 10:23 PM, Till Rohrmann <[email protected]> wrote:
>
> Hi Andy,
>
> you can do some micro benchmarks where you instantiate your AsyncHttpClient
> and call the invoke method. But better would be to benchmark it end-to-end by
> running it on a cluster with a realistic workload which you also expect to
> occur in production.
>
> Cheers,
> Till
>
> On Fri, Apr 12, 2019 at 11:29 AM Andy Hoang <[email protected]
> <mailto:[email protected]>> wrote:
> Hi Till,
> Unfortunately I have to wait for the cluster to upgrade to 1.8 to use that
> feature: https://issues.apache.org/jira/browse/FLINK-6756
> <https://issues.apache.org/jira/browse/FLINK-6756>
> Meanwhile I can reimplement it in the copy-patse manner but I’m still curious
> if my AsyncHttpClient
> work nicely or not, what would be the down side when you look at it.
> I understand the open/close method is will help in term of init/cleaning
> resource, but how can we benchmark the solution to make sure one is better
> than the other? What is the key to decide here or we have to try it in
> production first?
> Thank a lot, again
>
> Andy,
>
>
>> On Apr 12, 2019, at 2:44 PM, Till Rohrmann <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Andy,
>>
>> there is also a Scala version of the `RichAsyncFunction`.
>>
>> In Scala you have to specify a value for class members. This is different
>> from Java.
>>
>> User code is first instantiated on the client where you create the job
>> topology (basically where you call new RichAsyncHttpClient). The code is
>> then serialized and shipped to the cluster where it is actually executed.
>>
>> Cheers,
>> Till
>>
>> On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi Till,
>> Thanks for your reply, I manage do some experiments and has result as some
>> worked and some not. I hope you can give me a bit more insight:
>>
>> As your suggestion to impl a `RichAsyncFunction` with transient field, like
>> this and having error
>>
>> ```
>> Class 'RichAsyncHttpClient' must either be declared abstract or implement
>> abstract member 'executionContext: ExecutionContextExecutor' in
>> ‘com.parcelperform.util.RichAsyncHttpClient’
>> ```
>>
>> ```
>> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment,
>> Either[Throwable, ResponseEntity]]{
>>
>> PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>> @transient implicit var materializer: ActorMaterializer
>> @transient implicit var system: ActorSystem
>> @transient implicit var executionContext: ExecutionContextExecutor
>>
>>
>> override def asyncInvoke(input: Shipment, resultFuture:
>> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>
>> val resultFutureRequested: Future[HttpResponse] =
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json
>> <https://httpbin.org/json>"))
>>
>> resultFutureRequested.onComplete {
>> case Success(res) => {
>> resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
>> }
>> case Failure(x) => {
>> resultFuture.complete(Iterable(Either(x)).asJavaCollection)
>> }
>> }
>> }
>>
>> override def open(parameters: Configuration): Unit = {
>> super.open(parameters)
>> system = ActorSystem("my-system")
>> executionContext = system.dispatcher
>> materializer = ActorMaterializer()
>> }
>> }
>> ```
>>
>> Aslo the Usage of that class, I has error, I guess its because of java/scala
>> issue. In flink doc, for java code they use RichAsyncFunction and for scala
>> they use AsyncFunction:
>> ```
>> // AsyncDataStream.unorderedWait(streamShipment, new
>> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type
>> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual:
>> RichAsyncHttpClient
>>
>> ```
>>
>> ###
>>
>> So I try to fix my current code again with transient field and move it into
>> constructor:
>> ```
>>
>>
>> class AsyncHttpClient( args: Array[String] = Array()) extends
>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>
>> @transient implicit lazy val system = {
>> PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>> ActorSystem("my-system")
>> }
>> @transient implicit lazy val executionContext = {
>> system.dispatcher
>> }
>> @transient implicit lazy val materializer: ActorMaterializer = {
>> PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>> ActorMaterializer()
>> }
>>
>> override def asyncInvoke(input: Shipment, resultFuture:
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>> // PPLogger.getActivityLogger.info
>> <http://pplogger.getactivitylogger.info/>("###########INIT
>> ------------------- ")
>> // implicit val system = ActorSystem("my-system")
>> // implicit val executionContext = system.dispatcher
>> // implicit val materializer: ActorMaterializer = ActorMaterializer()
>> // PPLogger.getActivityLogger.info
>> <http://pplogger.getactivitylogger.info/>("###########DONE
>> ------------------- ")
>>
>> val resultFutureRequested: Future[HttpResponse] =
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json
>> <https://httpbin.org/json>"))
>> resultFutureRequested.onComplete {
>> case Success(res) => {
>> resultFuture.complete(Iterable(Right(res.entity)))
>> }
>> case Failure(x) => {
>> resultFuture.complete(Iterable(Left(x)))
>> }
>> }
>>
>> }
>>
>> override def timeout(input: Shipment, resultFuture:
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>> resultFuture.complete(Iterable(Left(new TimeoutException("Async function
>> call has timed out."))))
>> }
>> }
>> ```
>> And its run ok. The log was print only one.
>>
>> I still asking about this because I haven’t understand the term `That way
>> you only create the `ActorMaterialier` on the `TaskManager` where the
>> operator is executed and solve the problem of serializability `. I though
>> for all the code executed inside TaskManger?
>>
>> Thanks for being patient with me, till here
>>
>> Andy,
>>
>>
>>
>>
>>> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi Andy,
>>>
>>> without being an expert of Akka's http client, I think you should not
>>> create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`.
>>> What I would recommend you instead is to implement a `RichAsyncFunction`
>>> with a transient field for `ActorMaterializer` which you initialize in the
>>> `RichAsyncFunction#open` method. That way you only create the
>>> `ActorMaterialier` on the `TaskManager` where the operator is executed and
>>> solve the problem of serializability and you make it much more efficient
>>> because you don't create a new `ActorSystem` for every request.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <[email protected]
>>> <mailto:[email protected]>> wrote:
>>> Hi guys,
>>>
>>> I’m try to decide which http client to go with Flink, currently I tested
>>> with scalaj and akka http client and both work ok with our current dev
>>> environment.
>>> For scalaj its is pretty straight forward since its is just calling an http
>>> request with its timeout.
>>>
>>> For akka http client its a bit more complicated (I’m new to scala and all),
>>> so I’m asking if am I doing it right by create a AsyncFunction like this
>>> ```
>>> class AsyncHttpClient( args: Array[String] = Array()) extends
>>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>>
>>> override def asyncInvoke(input: Shipment, resultFuture:
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>>
>>> PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>> implicit val system = ActorSystem("my-system")
>>> implicit val executionContext = system.dispatcher
>>> implicit val materializer: ActorMaterializer = ActorMaterializer()
>>> val resultFutureRequested: Future[HttpResponse] =
>>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json
>>> <https://httpbin.org/json>"))
>>> PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>>
>>>
>>> resultFutureRequested.onComplete {
>>> case Success(res) => {
>>> resultFuture.complete(Iterable(Right(res.entity)))
>>> }
>>> case Failure(x) => {
>>> resultFuture.complete(Iterable(Left(x)))
>>> }
>>> }
>>>
>>> }
>>>
>>> override def timeout(input: Shipment, resultFuture:
>>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>> resultFuture.complete(Iterable(Left(new TimeoutException("Async
>>> function call has timed out."))))
>>> }
>>> }
>>> ```
>>> I notice that I have to implicit create a bunch of variable inside the
>>> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
>>> overhead. I did try to init them in the constructor of the class but the
>>> compiler just throw a bunch of Not implemented Serializer error.
>>>
>>> My lib:
>>> "com.typesafe.akka" %% "akka-http" % "10.1.8",
>>> "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>>>
>>> My flink:
>>> scala 2.12
>>> flink 1.70
>>>
>>>
>>>
>>> Any reply are appreciated!
>>>
>>> Thanks a bunch
>>>
>>> Andy,
>>>
>>>
>>>
>>
>