A quick question. When running a stream job that executes DataStream.map(MapFunction) , after data is read from Kafka, does each MapFunction is created per item or based on parallelism?
For instance, for the following code snippet val env = StreamExecutionEnvironment.getExeutionEnvironment val stream = env.addSource(FlinkKafkaConsumer09(...)) stream.map(new RichMapFunction[String, Unit] { // my AsyncHttpClient instance override def open(params: Configuration) { /* create my AsyncHttpClient instance, etc. */ } override def close() { /* close my AsyncHttpClient instance*/ } override def map(record: String) { // my code } }) Is RichMapFunction created for each record (as String in the above example)? Or say the program set parallelism to 4 so 4 RichMapFunction instances are created first, then data read from Kafka consumer is divided into 4 partitions (or something similar), and then map(record: String) is called within something like while loop? Or what is the actual flow? Or source code I can start from (I trace through StreamExecutionEnvironment/ addSource/ DataStream/ transform/ addOperator etc., but I then get lost in source code)? Basically my problem is I have an AsyncHttpClient instance opened within open() function and close in close function according to the RichMapFunction doc. However, an issue is that in some cases my AsyncHttpClient instance is not executed which displays warning like AsyncHttpClient.close() hasn't been invoked, which may produce file descriptor leaks Therefore I would like to know the life cycle so that I can close resource appropriately. Thanks