Hmm, 113k splits is quite a lot.
However, the IF uses the DefaultInputSplitAssigner which is very
lightweight and should handle a large number of splits well.



2016-04-28 13:50 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> We generate 113k splits because we can't query more than 100k or records
> per split (and we have to manage 11 billions of records). We tried to run
> the job only once, before running it the 2nd time we would like to
> understand which parameter to tune in order to (try to at least to) avoid
> such an error.
>
> Of course I pasted the wrong TM heap size...that is indeed 3Gb (
> taskmanager.heap.mb:512)
>
> Best,
> Flavio
>
> On Thu, Apr 28, 2016 at 1:29 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Is the problem reproducible?
>> Maybe the SplitAssigner gets stuck somehow, but I've never observed
>> something like that.
>>
>> How many splits do you generate?
>>
>> I guess it is not related, but 512MB for a TM is not a lot on machines
>> with 16GB RAM.
>>
>> 2016-04-28 12:12 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>
>>> When does this usually happens? Is it because the JobManager has too few
>>> resources (of some type)?
>>>
>>> Our current configuration of the cluster has 4 machines (with 4 CPUs and
>>> 16 GB of RAM) and one machine has both a JobManager and a TaskManger (the
>>> other 3 just a TM).
>>>
>>> Our flink-conf.yml on every machine has the following params:
>>>
>>>    - jobmanager.heap.mb:512
>>>    - taskmanager.heap.mb:512
>>>    - taskmanager.numberOfTaskSlots:6
>>>    - prallelism.default:24
>>>    - env.java.home=/usr/lib/jvm/java-8-oracle/
>>>    - taskmanager.network.numberOfBuffers:16384
>>>
>>> The job just read a window of max 100k elements and then writes a Tuple5
>>> into a CSV on the jobmanger fs with parallelism 1 (in order to produce a
>>> single file). The job dies after 40 minutes and hundreds of millions of
>>> records read.
>>>
>>> Do you see anything sospicious?
>>>
>>> Thanks for the support,
>>> Flavio
>>>
>>> On Thu, Apr 28, 2016 at 11:54 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> I checked the input format from your PR, but didn't see anything
>>>> suspicious.
>>>>
>>>> It is definitely OK if the processing of an input split tasks more than
>>>> 10 seconds. That should not be the cause.
>>>> It rather looks like the DataSourceTask fails to request a new split
>>>> from the JobManager.
>>>>
>>>> 2016-04-28 9:37 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
>>>>
>>>>> Digging the logs, we found this:
>>>>>
>>>>> WARN  Remoting - Tried to associate with unreachable remote address
>>>>> [akka.tcp://flink@127.0.0.1:34984]. Address is now gated for 5000 ms,
>>>>> all messages to this address will be delivered to dead letters. Reason:
>>>>> Connessione rifiutata: /127.0.0.1:34984
>>>>>
>>>>> however, it is not clear why it should refuse a connection to itself
>>>>> after 40min of run. we'll try to figure out possible environment issues.
>>>>> Its a fresh installation, therefore we may have left out some
>>>>> configurations.
>>>>>
>>>>> saluti,
>>>>> Stefano
>>>>>
>>>>> 2016-04-28 9:22 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
>>>>>
>>>>>> I had this type of exception when trying to build and test Flink on a
>>>>>> "small machine". I worked around the test increasing the timeout for 
>>>>>> Akka.
>>>>>>
>>>>>>
>>>>>> https://github.com/stefanobortoli/flink/blob/FLINK-1827/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
>>>>>>
>>>>>> it happened only on my machine (a VirtualBox I use for development),
>>>>>> but not on Flavio's. Is it possible that on load situations the 
>>>>>> JobManager
>>>>>> slows down a bit too much?
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> 2016-04-27 17:50 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>
>>>>>>> A precursor of the modified connector (since we started a long time
>>>>>>> ago). However the idea is the same, I compute the inputSplits and then I
>>>>>>> get the data split by split (similarly to what it happens in FLINK-3750 
>>>>>>> -
>>>>>>> https://github.com/apache/flink/pull/1941 )
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>> On Wed, Apr 27, 2016 at 5:38 PM, Chesnay Schepler <
>>>>>>> ches...@apache.org> wrote:
>>>>>>>
>>>>>>>> Are you using your modified connector or the currently available
>>>>>>>> one?
>>>>>>>>
>>>>>>>>
>>>>>>>> On 27.04.2016 17:35, Flavio Pompermaier wrote:
>>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>> I'm running a Flink Job on a JDBC datasource and I obtain the
>>>>>>>> following exception:
>>>>>>>>
>>>>>>>> java.lang.RuntimeException: Requesting the next InputSplit failed.
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:91)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:342)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:137)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>>>>> after [10000 milliseconds]
>>>>>>>> at
>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>> at
>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>> at
>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>> at scala.concurrent.Await.result(package.scala)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskmanager.TaskInputSplitProvider.getNextInputSplit(TaskInputSplitProvider.java:71)
>>>>>>>> ... 4 more
>>>>>>>>
>>>>>>>> What can be the cause? Is it because the whole DataSource reading
>>>>>>>> has cannot take more than 10000 milliseconds?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to