Do you have the code somewhere online? Maybe someone can have a quick
look over it later. I'm pretty sure that is indeed a problem with the
custom input format.

– Ufuk

On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <> wrote:
> Perhaps there is a misunderstanding on my side over the parallelism and
> split management given a data source.
> We started from the current JDBCInputFormat to make it multi-thread. Then,
> given a space of keys, we create the splits based on a fetchsize set as a
> parameter. In the open, we get a connection from the pool, and execute a
> query using the split interval. This sets the 'resultSet', and then the
> DatasourceTask iterates between reachedEnd, next and close. On close, the
> connection is returned to the pool. We set parallelism to 32, and we would
> expect 32 connection opened but the connections opened are just 8.
> We tried to make an example with the textinputformat, but being a
> delimitedinpurformat, the open is called sequentially when statistics are
> built, and then the processing is executed in parallel just after all the
> open are executed. This is not feasible in our case, because there would be
> millions of queries before the statistics are collected.
> Perhaps we are doing something wrong, still to figure out what. :-/
> thanks a lot for your help.
> saluti,
> Stefano
> 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <>:
>> That is exactly my point. I should have 32 threads running, but I have
>> only 8. 32 Task are created, but only only 8 are run concurrently. Flavio
>> and I will try to make a simple program to produce the problem. If we solve
>> our issues on the way, we'll let you know.
>> thanks a lot anyway.
>> saluti,
>> Stefano
>> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <>:
>>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> futures and their callbacks. But as Ufuk said, each task will spawn it’s own
>>> thread and if you set the parallelism to 32 then you should have 32 threads
>>> running.
>>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <>
>>> wrote:
>>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> implementation to get to the point where parallelism was switching from 32
>>>> to 8.
>>>> saluti,
>>>> Stefano
>>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <>:
>>>>> Hi,
>>>>> for what do you use the ExecutionContext? That should actually be
>>>>> something which you shouldn’t be concerned with since it is only used
>>>>> internally by the runtime.
>>>>> Cheers,
>>>>> Till
>>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <>
>>>>> wrote:
>>>>>> Well, in theory yes. Each task has a thread, but only a number is run
>>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>>>>> environment. However, whereas the parallelism parameter is set and read
>>>>>> correctly, when it comes to actual starting of the threads, the number is
>>>>>> fix to 8. We run a debugger to get to the point where the thread was
>>>>>> started. As Flavio mentioned, the ExecutionContext has the parallelims 
>>>>>> set
>>>>>> to 8. We have a pool of connections to a RDBS and il logs the creation of
>>>>>> just 8 connections although parallelism is much higher.
>>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some variable
>>>>>> assignment in setting up of the MiniCluster, involving parallelism and
>>>>>> 'default values'. Default values in terms of parallelism are based on the
>>>>>> number of cores.
>>>>>> thanks a lot for the support!
>>>>>> saluti,
>>>>>> Stefano
>>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <>:
>>>>>>> Hey Stefano,
>>>>>>> this should work by setting the parallelism on the environment, e.g.
>>>>>>> env.setParallelism(32)
>>>>>>> Is this what you are doing?
>>>>>>> The task threads are not part of a pool, but each submitted task
>>>>>>> creates its own Thread.
>>>>>>> – Ufuk
>>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>>> <> wrote:
>>>>>>> > Any help here? I think that the problem is that the JobManager
>>>>>>> > creates the
>>>>>>> > executionContext of the scheduler with
>>>>>>> >
>>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>>>>>> > ForkJoinPool())
>>>>>>> >
>>>>>>> > and thus the number of concurrently running threads is limited to
>>>>>>> > the number
>>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>>>> > What do you think?
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>>> > <>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I am trying to test a job that should run a number of tasks to
>>>>>>> >> read from a
>>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>>>>>> >> reading run
>>>>>>> >> smoothly, but I cannot seem to be able to move above the limit of
>>>>>>> >> 8
>>>>>>> >> concurrent threads running. 8 is of course the number of cores of
>>>>>>> >> my
>>>>>>> >> machine.
>>>>>>> >>
>>>>>>> >> I have tried working around configurations and settings, but the
>>>>>>> >> Executor
>>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>>>>>>> >> Although, of
>>>>>>> >> course, the parallelism of the execution environment is much
>>>>>>> >> higher (in fact
>>>>>>> >> I have many more tasks to be allocated).
>>>>>>> >>
>>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>>>>>>> >> that may
>>>>>>> >> just override/neglect my wish for higher degree of parallelism. Is
>>>>>>> >> there a
>>>>>>> >> way for me to work around this issue?
>>>>>>> >>
>>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>>> >>
>>>>>>> >> saluti,
>>>>>>> >> Stefano
>>>>>>> >
>>>>>>> >
>>>>>>> >

Reply via email to