Ufuk,

I configured the a super high number:


bin/flink run \
    -m yarn-cluster \
    -ynm "UMS ETL Flow - No Negative Links" \
    -yjm 10240 \
    -yn 10 \
    -ytm 40960 \
    -ys 32 \
    -yD taskmanager.network.numberOfBuffers=*284800* \

and there are still issues. :(

I did however, narrow down the issue to:

   - Using a large number of large Avro files on S3
   - For high parallelism, there is a lot of pressure on S3, so I have to
   tune down the parallelism for that task
   - Also, I am processing a few inputs using code similar to:

def readRequestData(env: ExecutionEnvironment, parameters: Configuration,
requestDataPath: String): DataSet[(String, Long)] = {
    val allPaths =
requestDataPath.split(',').filter(!_.isEmpty()).toSet.toSeq
    Logger.info(f"Using parallelism of ${parallelism}%d for loading request
data.")
    val allRequestsDS = allPaths.map { aPath =>
      {
        val trimmedPath = aPath.trim
        Logger.info(s"Loading request data from '${trimmedPath}'")
        val avroInputFormat = new AvroGenericRecordInputFormat(new
Path(trimmedPath))
        avroInputFormat.setUnsplittable(shouldSplit)
        val requests = env.readFile(avroInputFormat,
trimmedPath).withParameters(parameters).setParallelism(30)
          (r.get("identifiers").asInstanceOf[Record].get("utid").toString,

r.get("request_properties").asInstanceOf[Record].get("ip_address").asInstanceOf[Long])
        }
      }
    }
*return* env.union(allRequestsDS)


This causes the read tasks to run concurrently. So, if not tuned down,
resource usage will be high.


   - Using splits when there are lots of big files also causes a lot of
   resource usage for S3


For now, I've gotten things to work.

Is there any way to map allPaths and have the readFile tasks execute
sequentially rather than concurrently or specify multiple paths for
FileInputFormat using Flink?

Thank you for your help.

- Gna















































On Sat, Mar 19, 2016 at 10:09 AM, Ufuk Celebi <u...@apache.org> wrote:

> Can you please follow the formula from [1]. For high parallelism you have
> to configure the number of buffers accordingly, using number of slots for
> #cores and number of task managers for #machines.
>
> Does this help?
>
> – Ufuk
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
>
> On Fri, Mar 18, 2016 at 9:38 PM, Sourigna Phetsarath <
> gna.phetsar...@teamaol.com> wrote:
>
>> It's batch.
>>
>> Reading a few thousand Avro files from S3 with a parallelism of 320.
>> It's just failing on the read right now and not making to the next
>> operations.
>>
>> Here's a screenshot of the flink dashboard.  It usually fails in the 1st
>> task.
>>
>> Cheers,
>> Gna
>>
>> On Fri, Mar 18, 2016 at 4:03 PM, Ufuk Celebi <u...@apache.org> wrote:
>>
>>> Hey Souringa,
>>>
>>> could you provide some context about the program you are running?
>>>
>>> Is it batch or streaming? What is the parallelism? How many operators
>>> are you running?
>>>
>>> Thanks for reporting the issue. I think we will figure it out once you
>>> provide further information. :-)
>>>
>>> – Ufuk
>>>
>>> On Fri, Mar 18, 2016 at 7:15 PM, Sourigna Phetsarath <
>>> gna.phetsar...@teamaol.com> wrote:
>>>
>>>> All:
>>>>
>>>> Flink Version 0.10.2
>>>>
>>>> The number that I set for *taskmanager.network.numberOfBuffers* doesn't
>>>> seem to have any affect, even if I set it to a very high number. There
>>>> might be a race condition here where the upper bound is not enforced or
>>>> computer correctly.
>>>>
>>>> java.io.IOException: Insufficient number of network buffers: required
>>>> *320*, but only *32* available. The total number of network buffers is
>>>> currently set to *36864*. You can increase this number by setting the
>>>> configuration key 'taskmanager.network.numberOfBuffers'.
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>>>> at
>>>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:296)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Has anyone else encountered this issue?
>>>>
>>>> Thanks in advance for any help that anyone can provide.
>>>> --
>>>>
>>>>
>>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>>> Applied Research Chapter
>>>> 770 Broadway, 5th Floor, New York, NY 10003
>>>> o: 212.402.4871 // m: 917.373.7363
>>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>>
>>>> * <http://www.aolplatforms.com>*
>>>>
>>>
>>>
>>
>>
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>> Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * <http://www.aolplatforms.com>*
>>
>
>


-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Reply via email to