We've finally created a running example (For Flink 0.10.2) of our improved JDBC imputformat that you can run from an IDE (it creates an in-memory derby database with 1000 rows and batch of 10) at https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351. The first time you run the program you have to comment the following line:
stmt.executeUpdate("Drop Table users "); In your pom declare the following dependencies: <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <version>10.10.1.1</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.4.2</version> </dependency> In my laptop I have 8 cores and if I put parallelism to 16 I expect to see 16 calls to the connection pool (i.e. '==================== CREATING NEW CONNECTION!') while I see only 8 (up to my maximum number of cores). The number of created task instead is correct (16). I hope this could help in understanding where the problem is! Best and thank in advance, Flavio On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s.bort...@gmail.com> wrote: > Hi Ufuk, > > here is our preliminary input formar implementation: > https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119 > > if you need a running project, I will have to create a test one cause I > cannot share the current configuration. > > thanks a lot in advance! > > > > 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <u...@apache.org>: > >> Do you have the code somewhere online? Maybe someone can have a quick >> look over it later. I'm pretty sure that is indeed a problem with the >> custom input format. >> >> – Ufuk >> >> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s.bort...@gmail.com> >> wrote: >> > Perhaps there is a misunderstanding on my side over the parallelism and >> > split management given a data source. >> > >> > We started from the current JDBCInputFormat to make it multi-thread. >> Then, >> > given a space of keys, we create the splits based on a fetchsize set as >> a >> > parameter. In the open, we get a connection from the pool, and execute a >> > query using the split interval. This sets the 'resultSet', and then the >> > DatasourceTask iterates between reachedEnd, next and close. On close, >> the >> > connection is returned to the pool. We set parallelism to 32, and we >> would >> > expect 32 connection opened but the connections opened are just 8. >> > >> > We tried to make an example with the textinputformat, but being a >> > delimitedinpurformat, the open is called sequentially when statistics >> are >> > built, and then the processing is executed in parallel just after all >> the >> > open are executed. This is not feasible in our case, because there >> would be >> > millions of queries before the statistics are collected. >> > >> > Perhaps we are doing something wrong, still to figure out what. :-/ >> > >> > thanks a lot for your help. >> > >> > saluti, >> > Stefano >> > >> > >> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: >> >> >> >> That is exactly my point. I should have 32 threads running, but I have >> >> only 8. 32 Task are created, but only only 8 are run concurrently. >> Flavio >> >> and I will try to make a simple program to produce the problem. If we >> solve >> >> our issues on the way, we'll let you know. >> >> >> >> thanks a lot anyway. >> >> >> >> saluti, >> >> Stefano >> >> >> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >> >>> >> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run >> >>> futures and their callbacks. But as Ufuk said, each task will spawn >> it’s own >> >>> thread and if you set the parallelism to 32 then you should have 32 >> threads >> >>> running. >> >>> >> >>> >> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli < >> s.bort...@gmail.com> >> >>> wrote: >> >>>> >> >>>> In fact, I don't use it. I just had to crawl back the runtime >> >>>> implementation to get to the point where parallelism was switching >> from 32 >> >>>> to 8. >> >>>> >> >>>> saluti, >> >>>> Stefano >> >>>> >> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> for what do you use the ExecutionContext? That should actually be >> >>>>> something which you shouldn’t be concerned with since it is only >> used >> >>>>> internally by the runtime. >> >>>>> >> >>>>> Cheers, >> >>>>> Till >> >>>>> >> >>>>> >> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli < >> s.bort...@gmail.com> >> >>>>> wrote: >> >>>>>> >> >>>>>> Well, in theory yes. Each task has a thread, but only a number is >> run >> >>>>>> in parallel (the job of the scheduler). Parallelism is set in the >> >>>>>> environment. However, whereas the parallelism parameter is set and >> read >> >>>>>> correctly, when it comes to actual starting of the threads, the >> number is >> >>>>>> fix to 8. We run a debugger to get to the point where the thread >> was >> >>>>>> started. As Flavio mentioned, the ExecutionContext has the >> parallelims set >> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the >> creation of >> >>>>>> just 8 connections although parallelism is much higher. >> >>>>>> >> >>>>>> My question is whether this is a bug (or a feature) of the >> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some >> variable >> >>>>>> assignment in setting up of the MiniCluster, involving parallelism >> and >> >>>>>> 'default values'. Default values in terms of parallelism are based >> on the >> >>>>>> number of cores. >> >>>>>> >> >>>>>> thanks a lot for the support! >> >>>>>> >> >>>>>> saluti, >> >>>>>> Stefano >> >>>>>> >> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <u...@apache.org>: >> >>>>>>> >> >>>>>>> Hey Stefano, >> >>>>>>> >> >>>>>>> this should work by setting the parallelism on the environment, >> e.g. >> >>>>>>> >> >>>>>>> env.setParallelism(32) >> >>>>>>> >> >>>>>>> Is this what you are doing? >> >>>>>>> >> >>>>>>> The task threads are not part of a pool, but each submitted task >> >>>>>>> creates its own Thread. >> >>>>>>> >> >>>>>>> – Ufuk >> >>>>>>> >> >>>>>>> >> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier >> >>>>>>> <pomperma...@okkam.it> wrote: >> >>>>>>> > Any help here? I think that the problem is that the JobManager >> >>>>>>> > creates the >> >>>>>>> > executionContext of the scheduler with >> >>>>>>> > >> >>>>>>> > val executionContext = ExecutionContext.fromExecutor(new >> >>>>>>> > ForkJoinPool()) >> >>>>>>> > >> >>>>>>> > and thus the number of concurrently running threads is limited >> to >> >>>>>>> > the number >> >>>>>>> > of cores (using the default constructor of the ForkJoinPool). >> >>>>>>> > What do you think? >> >>>>>>> > >> >>>>>>> > >> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli >> >>>>>>> > <s.bort...@gmail.com> >> >>>>>>> > wrote: >> >>>>>>> >> >> >>>>>>> >> Hi guys, >> >>>>>>> >> >> >>>>>>> >> I am trying to test a job that should run a number of tasks to >> >>>>>>> >> read from a >> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and the >> >>>>>>> >> reading run >> >>>>>>> >> smoothly, but I cannot seem to be able to move above the limit >> of >> >>>>>>> >> 8 >> >>>>>>> >> concurrent threads running. 8 is of course the number of cores >> of >> >>>>>>> >> my >> >>>>>>> >> machine. >> >>>>>>> >> >> >>>>>>> >> I have tried working around configurations and settings, but >> the >> >>>>>>> >> Executor >> >>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8. >> >>>>>>> >> Although, of >> >>>>>>> >> course, the parallelism of the execution environment is much >> >>>>>>> >> higher (in fact >> >>>>>>> >> I have many more tasks to be allocated). >> >>>>>>> >> >> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration >> >>>>>>> >> that may >> >>>>>>> >> just override/neglect my wish for higher degree of >> parallelism. Is >> >>>>>>> >> there a >> >>>>>>> >> way for me to work around this issue? >> >>>>>>> >> >> >>>>>>> >> please let me know. Thanks a lot for you help! :-) >> >>>>>>> >> >> >>>>>>> >> saluti, >> >>>>>>> >> Stefano >> >>>>>>> > >> >>>>>>> > >> >>>>>>> > >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> > >