Hi Till, 
Thanks for your reply, I manage do some experiments and has result as some 
worked and some not. I hope you can give me a bit more insight:

As your suggestion to impl a `RichAsyncFunction` with transient field, like 
this and having error

```
Class 'RichAsyncHttpClient' must either be declared abstract or implement 
abstract member 'executionContext: ExecutionContextExecutor' in 
‘com.parcelperform.util.RichAsyncHttpClient’
```

```
class RichAsyncHttpClient() extends RichAsyncFunction[Shipment, 
Either[Throwable, ResponseEntity]]{

  PPLogger.getActivityLogger.info("###########INIT ------------------- ")
  @transient implicit var materializer: ActorMaterializer
  @transient implicit var system: ActorSystem
  @transient implicit var executionContext: ExecutionContextExecutor


  override def asyncInvoke(input: Shipment, resultFuture: 
async.ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {

    val resultFutureRequested: Future[HttpResponse] = 
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))

    resultFutureRequested.onComplete {
      case Success(res) => {
        resultFuture.complete(Iterable(Either(res.entity)).asJavaCollection)
      }
      case Failure(x)   => {
        resultFuture.complete(Iterable(Either(x)).asJavaCollection)
      }
    }
  }

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    system = ActorSystem("my-system")
    executionContext = system.dispatcher
    materializer = ActorMaterializer()
  }
}
```

Aslo the Usage of that class, I has error, I guess its because of java/scala 
issue. In flink doc, for java code they use RichAsyncFunction and for scala 
they use AsyncFunction:
```
//    AsyncDataStream.unorderedWait(streamShipment, new 
RichAsyncHttpClient(),5, TimeUnit.SECONDS, 2 ).print() <= ###### error Type 
mismatch, expected: AsyncFunction[NotInferedIN, NotInferedOUT], actual: 
RichAsyncHttpClient

```

### 

So I try to fix my current code again with transient field and move it into 
constructor:
```


class AsyncHttpClient( args: Array[String] = Array()) extends 
AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{

  @transient implicit lazy val system = {
    PPLogger.getActivityLogger.info("###########INIT ------------------- ")
    ActorSystem("my-system")
  }
  @transient implicit lazy val executionContext = {
    system.dispatcher
  }
  @transient implicit lazy val materializer: ActorMaterializer = {
    PPLogger.getActivityLogger.info("###########DONE ------------------- ")
    ActorMaterializer()
  }

  override def asyncInvoke(input: Shipment, resultFuture: 
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
//    PPLogger.getActivityLogger.info("###########INIT ------------------- ")
//    implicit val system = ActorSystem("my-system")
//    implicit val executionContext = system.dispatcher
//    implicit val materializer: ActorMaterializer = ActorMaterializer()
//    PPLogger.getActivityLogger.info("###########DONE ------------------- ")

    val resultFutureRequested: Future[HttpResponse] = 
Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json";))
    resultFutureRequested.onComplete {
      case Success(res) => {
        resultFuture.complete(Iterable(Right(res.entity)))
      }
      case Failure(x)   => {
        resultFuture.complete(Iterable(Left(x)))
      }
    }

  }

  override def timeout(input: Shipment, resultFuture: 
ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
    resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
call has timed out."))))
  }
}
```
And its run ok. The log was print only one. 

I still asking about this because I haven’t understand the term `That way you 
only create the `ActorMaterialier` on the `TaskManager` where the operator is 
executed and solve the problem of serializability `. I though for all the code 
executed inside TaskManger?

Thanks for being patient with me, till here

Andy,




> On Apr 11, 2019, at 7:12 PM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi Andy,
> 
> without being an expert of Akka's http client, I think you should not create 
> a new ActorSystem for every call to `AsyncFunction#asyncInvoke`. What I would 
> recommend you instead is to implement a `RichAsyncFunction` with a transient 
> field for `ActorMaterializer` which you initialize in the 
> `RichAsyncFunction#open` method. That way you only create the 
> `ActorMaterialier` on the `TaskManager` where the operator is executed and 
> solve the problem of serializability and you make it much more efficient 
> because you don't create a new `ActorSystem` for every request.
> 
> Cheers,
> Till
> 
> On Thu, Apr 11, 2019 at 1:17 PM Andy Hoang <a...@parcelperform.com 
> <mailto:a...@parcelperform.com>> wrote:
> Hi guys,
> 
> I’m try to decide which http client to go with Flink, currently I tested with 
> scalaj and akka http client and both work ok with our current dev environment.
> For scalaj its is pretty straight forward since its is just calling an http 
> request with its timeout.
> 
> For akka http client its a bit more complicated (I’m new to scala and all), 
> so I’m asking if am I doing it right by create a AsyncFunction like this
> ```
> class AsyncHttpClient( args: Array[String] = Array()) extends 
> AsyncFunction[Shipment, Either[Throwable, ResponseEntity]]{
> 
>   override def asyncInvoke(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
> 
>     PPLogger.getActivityLogger.info("###########INIT ------------------- ")
>     implicit val system = ActorSystem("my-system")
>     implicit val executionContext = system.dispatcher
>     implicit val materializer: ActorMaterializer = ActorMaterializer()
>     val resultFutureRequested: Future[HttpResponse] = 
> Http().singleRequest(HttpRequest(uri = "https://httpbin.org/json 
> <https://httpbin.org/json>"))
>     PPLogger.getActivityLogger.info("###########DONE ------------------- ")
> 
> 
>     resultFutureRequested.onComplete {
>       case Success(res) => {
>         resultFuture.complete(Iterable(Right(res.entity)))
>       }
>       case Failure(x)   => {
>         resultFuture.complete(Iterable(Left(x)))
>       }
>     }
> 
>   }
> 
>   override def timeout(input: Shipment, resultFuture: 
> ResultFuture[Either[Throwable, ResponseEntity]]): Unit = {
>     resultFuture.complete(Iterable(Left(new TimeoutException("Async function 
> call has timed out."))))
>   }
> }
> ```
> I notice that I have to implicit create a bunch of variable inside the 
> asyncInvoke method. I’m not sure if I’m doing it right, or just adding the 
> overhead. I did try to init them in the constructor of the class but the 
> compiler just throw a bunch of Not implemented Serializer error.
> 
> My lib:
>    "com.typesafe.akka" %% "akka-http" % "10.1.8",
>   "com.typesafe.akka" %% "akka-http-testkit" % "10.1.8" % Test,
> 
> My flink:
> scala 2.12
> flink 1.70
> 
> 
> 
> Any reply are appreciated!
> 
> Thanks a bunch
> 
> Andy,
> 
> 
> 

Reply via email to