Hi Andy,

there is also a Scala version of the `RichAsyncFunction`.

In Scala you have to specify a value for class members. This is different
from Java.

User code is first instantiated on the client where you create the job
topology (basically where you call new RichAsyncHttpClient). The code is
then serialized and shipped to the cluster where it is actually executed.

Cheers,
Till

On Fri, Apr 12, 2019 at 5:44 AM Andy Hoang <a...@parcelperform.com> wrote:

> Hi Till,
> Thanks for your reply, I manage do some experiments and has result as some
> worked and some not. I hope you can give me a bit more insight:
>
> As your suggestion to impl a `RichAsyncFunction` with transient field,
> like this and having error
>
> ```
> Class 'RichAsyncHttpClient' must either be declared abstract or implement
> abstract member 'executionContext: ExecutionContextExecutor' in
> ‘com.parcelperform.util.RichAsyncHttpClient’
> ```
>
> ```
>
> class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
> Either[Throwable, ResponseEntity]]{
>
>   PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>   @transient implicit var materializer: ActorMaterializer
>   @transient implicit var system: ActorSystem
>   @transient implicit var executionContext: ExecutionContextExecutor
>
>
>   override def asyncInvoke(input: Shipment, resultFuture: 
> async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>
>     val resultFutureRequested: Future[HttpResponse] = 
> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>
>     resultFutureRequested.onComplete {
>       case Success(res) => {
>         resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
>       }
>       case Failure(x)   => {
>         resultFuture.complete(Iterable(Either(x)).asJavaCollection)
>       }
>     }
>   }
>
>   override def open(parameters: Configuration): Unit = {
>     super.open(parameters)
>     system = ActorSystem("my-system")
>     executionContext = system.dispatcher
>     materializer = ActorMaterializer()
>   }
> }
>
> ```
>
> Aslo the Usage of that class, I has error, I guess its because of
> java/scala issue. In flink doc, for java code they use RichAsyncFunction
> and for scala they use AsyncFunction:
> ```
> //    AsyncDataStream.unorderedWait(streamShipment, new
> RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type
> mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual:
> RichAsyncHttpClient
>
> ```
>
> ###
>
> So I try to fix my current code again with transient field and move it
> into constructor:
> ```
>
>
>
> class AsyncHttpClient( args: Array[String] = Array()) extends 
> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>
>   @transient implicit lazy val system = {
>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>     ActorSystem("my-system")
>   }
>   @transient implicit lazy val executionContext = {
>     system.dispatcher
>   }
>   @transient implicit lazy val materializer: ActorMaterializer = {
>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>     ActorMaterializer()
>   }
>
>   override def asyncInvoke(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
> //    PPLogger.getActivityLogger.info("###########INIT ------------------- ")
> //    implicit val system = ActorSystem("my-system")
> //    implicit val executionContext = system.dispatcher
> //    implicit val materializer: ActorMaterializer = ActorMaterializer()
> //    PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>
>     val resultFutureRequested: Future[HttpResponse] = 
> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>     resultFutureRequested.onComplete {
>       case Success(res) => {
>         resultFuture.complete(Iterable(Right(res.entity)))
>       }
>       case Failure(x)   => {
>         resultFuture.complete(Iterable(Left(x)))
>       }
>     }
>
>   }
>
>   override def timeout(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>     resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
> call has timed out."))))
>   }
> }
>
> ```
> And its run ok. The log was print only one.
>
> I still asking about this because I haven’t understand the term `That way
> you only create the `ActorMaterialier` on the `TaskManager` where the
> operator is executed and solve the problem of serializability `. I though
> for all the code executed inside TaskManger?
>
> Thanks for being patient with me, till here
>
> Andy,
>
>
>
>
> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Andy,
>
> without being an expert of Akka's http client, I think you should not
> create a new ActorSystem for every call to `AsyncFunction#asyncInvoke`.
> What I would recommend you instead is to implement a `RichAsyncFunction`
> with a transient field for `ActorMaterializer` which you initialize in the
> `RichAsyncFunction#open` method. That way you only create the
> `ActorMaterialier` on the `TaskManager` where the operator is executed and
> solve the problem of serializability and you make it much more efficient
> because you don't create a new `ActorSystem` for every request.
>
> Cheers,
> Till
>
> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com> wrote:
>
>> Hi guys,
>>
>> I’m try to decide which http client to go with Flink, currently I tested
>> with scalaj and akka http client and both work ok with our current dev
>> environment.
>> For scalaj its is pretty straight forward since its is just calling an
>> http request with its timeout.
>>
>> For akka http client its a bit more complicated (I’m new to scala and
>> all), so I’m asking if am I doing it right by create a AsyncFunction like
>> this
>> ```
>>
>> class AsyncHttpClient( args: Array[String] = Array()) extends 
>> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
>>
>>   override def asyncInvoke(input: Shipment, resultFuture: 
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>
>>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>>     implicit val system = ActorSystem("my-system")
>>     implicit val executionContext = system.dispatcher
>>     implicit val materializer: ActorMaterializer = ActorMaterializer()
>>     val resultFutureRequested: Future[HttpResponse] = 
>> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
>>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
>>
>>
>>     resultFutureRequested.onComplete {
>>       case Success(res) => {
>>         resultFuture.complete(Iterable(Right(res.entity)))
>>       }
>>       case Failure(x)   => {
>>         resultFuture.complete(Iterable(Left(x)))
>>       }
>>     }
>>
>>   }
>>
>>   override def timeout(input: Shipment, resultFuture: 
>> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>>     resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
>> call has timed out."))))
>>   }
>> }
>>
>> ```
>> I notice that I have to implicit create a bunch of variable inside the
>> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the
>> overhead. I did try to init them in the constructor of the class but the
>> compiler just throw a bunch of Not implemented Serializer error.
>>
>> My lib:
>>    "com.typesafe.akka" %% "akka-http" % "10.1.8",
>>   "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
>>
>> My flink:
>> scala 2.12
>> flink 1.70
>>
>>
>>
>> Any reply are appreciated!
>>
>> Thanks a bunch
>>
>> Andy,
>>
>>
>>
>>
>

Reply via email to