Sounds actually not like a Flink issue. I would look into the commons pool docs. Maybe they size their pools by default with the number of cores, so the pool has only 8 threads, and other requests are queues?
On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Any feedback about our JDBC InputFormat issue..? > > On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> We've finally created a running example (For Flink 0.10.2) of our >> improved JDBC imputformat that you can run from an IDE (it creates an >> in-memory derby database with 1000 rows and batch of 10) at >> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351. >> The first time you run the program you have to comment the following line: >> >> stmt.executeUpdate("Drop Table users "); >> >> In your pom declare the following dependencies: >> >> <dependency> >> <groupId>org.apache.derby</groupId> >> <artifactId>derby</artifactId> >> <version>10.10.1.1</version> >> </dependency> >> <dependency> >> <groupId>org.apache.commons</groupId> >> <artifactId>commons-pool2</artifactId> >> <version>2.4.2</version> >> </dependency> >> >> In my laptop I have 8 cores and if I put parallelism to 16 I expect to >> see 16 calls to the connection pool (i.e. '==================== CREATING >> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores). >> The number of created task instead is correct (16). >> >> I hope this could help in understanding where the problem is! >> >> Best and thank in advance, >> Flavio >> >> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s.bort...@gmail.com> >> wrote: >> >>> Hi Ufuk, >>> >>> here is our preliminary input formar implementation: >>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119 >>> >>> if you need a running project, I will have to create a test one cause I >>> cannot share the current configuration. >>> >>> thanks a lot in advance! >>> >>> >>> >>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <u...@apache.org>: >>> >>>> Do you have the code somewhere online? Maybe someone can have a quick >>>> look over it later. I'm pretty sure that is indeed a problem with the >>>> custom input format. >>>> >>>> – Ufuk >>>> >>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s.bort...@gmail.com> >>>> wrote: >>>> > Perhaps there is a misunderstanding on my side over the parallelism >>>> and >>>> > split management given a data source. >>>> > >>>> > We started from the current JDBCInputFormat to make it multi-thread. >>>> Then, >>>> > given a space of keys, we create the splits based on a fetchsize set >>>> as a >>>> > parameter. In the open, we get a connection from the pool, and >>>> execute a >>>> > query using the split interval. This sets the 'resultSet', and then >>>> the >>>> > DatasourceTask iterates between reachedEnd, next and close. On close, >>>> the >>>> > connection is returned to the pool. We set parallelism to 32, and we >>>> would >>>> > expect 32 connection opened but the connections opened are just 8. >>>> > >>>> > We tried to make an example with the textinputformat, but being a >>>> > delimitedinpurformat, the open is called sequentially when statistics >>>> are >>>> > built, and then the processing is executed in parallel just after all >>>> the >>>> > open are executed. This is not feasible in our case, because there >>>> would be >>>> > millions of queries before the statistics are collected. >>>> > >>>> > Perhaps we are doing something wrong, still to figure out what. :-/ >>>> > >>>> > thanks a lot for your help. >>>> > >>>> > saluti, >>>> > Stefano >>>> > >>>> > >>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: >>>> >> >>>> >> That is exactly my point. I should have 32 threads running, but I >>>> have >>>> >> only 8. 32 Task are created, but only only 8 are run concurrently. >>>> Flavio >>>> >> and I will try to make a simple program to produce the problem. If >>>> we solve >>>> >> our issues on the way, we'll let you know. >>>> >> >>>> >> thanks a lot anyway. >>>> >> >>>> >> saluti, >>>> >> Stefano >>>> >> >>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>> >>> >>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run >>>> >>> futures and their callbacks. But as Ufuk said, each task will spawn >>>> it’s own >>>> >>> thread and if you set the parallelism to 32 then you should have 32 >>>> threads >>>> >>> running. >>>> >>> >>>> >>> >>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli < >>>> s.bort...@gmail.com> >>>> >>> wrote: >>>> >>>> >>>> >>>> In fact, I don't use it. I just had to crawl back the runtime >>>> >>>> implementation to get to the point where parallelism was switching >>>> from 32 >>>> >>>> to 8. >>>> >>>> >>>> >>>> saluti, >>>> >>>> Stefano >>>> >>>> >>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com >>>> >: >>>> >>>>> >>>> >>>>> Hi, >>>> >>>>> >>>> >>>>> for what do you use the ExecutionContext? That should actually be >>>> >>>>> something which you shouldn’t be concerned with since it is only >>>> used >>>> >>>>> internally by the runtime. >>>> >>>>> >>>> >>>>> Cheers, >>>> >>>>> Till >>>> >>>>> >>>> >>>>> >>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli < >>>> s.bort...@gmail.com> >>>> >>>>> wrote: >>>> >>>>>> >>>> >>>>>> Well, in theory yes. Each task has a thread, but only a number >>>> is run >>>> >>>>>> in parallel (the job of the scheduler). Parallelism is set in >>>> the >>>> >>>>>> environment. However, whereas the parallelism parameter is set >>>> and read >>>> >>>>>> correctly, when it comes to actual starting of the threads, the >>>> number is >>>> >>>>>> fix to 8. We run a debugger to get to the point where the thread >>>> was >>>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the >>>> parallelims set >>>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the >>>> creation of >>>> >>>>>> just 8 connections although parallelism is much higher. >>>> >>>>>> >>>> >>>>>> My question is whether this is a bug (or a feature) of the >>>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some >>>> variable >>>> >>>>>> assignment in setting up of the MiniCluster, involving >>>> parallelism and >>>> >>>>>> 'default values'. Default values in terms of parallelism are >>>> based on the >>>> >>>>>> number of cores. >>>> >>>>>> >>>> >>>>>> thanks a lot for the support! >>>> >>>>>> >>>> >>>>>> saluti, >>>> >>>>>> Stefano >>>> >>>>>> >>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <u...@apache.org>: >>>> >>>>>>> >>>> >>>>>>> Hey Stefano, >>>> >>>>>>> >>>> >>>>>>> this should work by setting the parallelism on the environment, >>>> e.g. >>>> >>>>>>> >>>> >>>>>>> env.setParallelism(32) >>>> >>>>>>> >>>> >>>>>>> Is this what you are doing? >>>> >>>>>>> >>>> >>>>>>> The task threads are not part of a pool, but each submitted task >>>> >>>>>>> creates its own Thread. >>>> >>>>>>> >>>> >>>>>>> – Ufuk >>>> >>>>>>> >>>> >>>>>>> >>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier >>>> >>>>>>> <pomperma...@okkam.it> wrote: >>>> >>>>>>> > Any help here? I think that the problem is that the JobManager >>>> >>>>>>> > creates the >>>> >>>>>>> > executionContext of the scheduler with >>>> >>>>>>> > >>>> >>>>>>> > val executionContext = >>>> ExecutionContext.fromExecutor(new >>>> >>>>>>> > ForkJoinPool()) >>>> >>>>>>> > >>>> >>>>>>> > and thus the number of concurrently running threads is >>>> limited to >>>> >>>>>>> > the number >>>> >>>>>>> > of cores (using the default constructor of the ForkJoinPool). >>>> >>>>>>> > What do you think? >>>> >>>>>>> > >>>> >>>>>>> > >>>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli >>>> >>>>>>> > <s.bort...@gmail.com> >>>> >>>>>>> > wrote: >>>> >>>>>>> >> >>>> >>>>>>> >> Hi guys, >>>> >>>>>>> >> >>>> >>>>>>> >> I am trying to test a job that should run a number of tasks >>>> to >>>> >>>>>>> >> read from a >>>> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and >>>> the >>>> >>>>>>> >> reading run >>>> >>>>>>> >> smoothly, but I cannot seem to be able to move above the >>>> limit of >>>> >>>>>>> >> 8 >>>> >>>>>>> >> concurrent threads running. 8 is of course the number of >>>> cores of >>>> >>>>>>> >> my >>>> >>>>>>> >> machine. >>>> >>>>>>> >> >>>> >>>>>>> >> I have tried working around configurations and settings, but >>>> the >>>> >>>>>>> >> Executor >>>> >>>>>>> >> within the ExecutionContext keeps on having a parallelism of >>>> 8. >>>> >>>>>>> >> Although, of >>>> >>>>>>> >> course, the parallelism of the execution environment is much >>>> >>>>>>> >> higher (in fact >>>> >>>>>>> >> I have many more tasks to be allocated). >>>> >>>>>>> >> >>>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster >>>> configuration >>>> >>>>>>> >> that may >>>> >>>>>>> >> just override/neglect my wish for higher degree of >>>> parallelism. Is >>>> >>>>>>> >> there a >>>> >>>>>>> >> way for me to work around this issue? >>>> >>>>>>> >> >>>> >>>>>>> >> please let me know. Thanks a lot for you help! :-) >>>> >>>>>>> >> >>>> >>>>>>> >> saluti, >>>> >>>>>>> >> Stefano >>>> >>>>>>> > >>>> >>>>>>> > >>>> >>>>>>> > >>>> >>>>>> >>>> >>>>>> >>>> >>>>> >>>> >>>> >>>> >>> >>>> >> >>>> > >>>> >>> >>> >> >