Any feedback about our JDBC InputFormat issue..? On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <[email protected]> wrote:
> We've finally created a running example (For Flink 0.10.2) of our improved > JDBC imputformat that you can run from an IDE (it creates an in-memory > derby database with 1000 rows and batch of 10) at > https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351. > The first time you run the program you have to comment the following line: > > stmt.executeUpdate("Drop Table users "); > > In your pom declare the following dependencies: > > <dependency> > <groupId>org.apache.derby</groupId> > <artifactId>derby</artifactId> > <version>10.10.1.1</version> > </dependency> > <dependency> > <groupId>org.apache.commons</groupId> > <artifactId>commons-pool2</artifactId> > <version>2.4.2</version> > </dependency> > > In my laptop I have 8 cores and if I put parallelism to 16 I expect to see > 16 calls to the connection pool (i.e. '==================== CREATING NEW > CONNECTION!') while I see only 8 (up to my maximum number of cores). > The number of created task instead is correct (16). > > I hope this could help in understanding where the problem is! > > Best and thank in advance, > Flavio > > On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <[email protected]> > wrote: > >> Hi Ufuk, >> >> here is our preliminary input formar implementation: >> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119 >> >> if you need a running project, I will have to create a test one cause I >> cannot share the current configuration. >> >> thanks a lot in advance! >> >> >> >> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <[email protected]>: >> >>> Do you have the code somewhere online? Maybe someone can have a quick >>> look over it later. I'm pretty sure that is indeed a problem with the >>> custom input format. >>> >>> – Ufuk >>> >>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <[email protected]> >>> wrote: >>> > Perhaps there is a misunderstanding on my side over the parallelism and >>> > split management given a data source. >>> > >>> > We started from the current JDBCInputFormat to make it multi-thread. >>> Then, >>> > given a space of keys, we create the splits based on a fetchsize set >>> as a >>> > parameter. In the open, we get a connection from the pool, and execute >>> a >>> > query using the split interval. This sets the 'resultSet', and then the >>> > DatasourceTask iterates between reachedEnd, next and close. On close, >>> the >>> > connection is returned to the pool. We set parallelism to 32, and we >>> would >>> > expect 32 connection opened but the connections opened are just 8. >>> > >>> > We tried to make an example with the textinputformat, but being a >>> > delimitedinpurformat, the open is called sequentially when statistics >>> are >>> > built, and then the processing is executed in parallel just after all >>> the >>> > open are executed. This is not feasible in our case, because there >>> would be >>> > millions of queries before the statistics are collected. >>> > >>> > Perhaps we are doing something wrong, still to figure out what. :-/ >>> > >>> > thanks a lot for your help. >>> > >>> > saluti, >>> > Stefano >>> > >>> > >>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <[email protected]>: >>> >> >>> >> That is exactly my point. I should have 32 threads running, but I have >>> >> only 8. 32 Task are created, but only only 8 are run concurrently. >>> Flavio >>> >> and I will try to make a simple program to produce the problem. If we >>> solve >>> >> our issues on the way, we'll let you know. >>> >> >>> >> thanks a lot anyway. >>> >> >>> >> saluti, >>> >> Stefano >>> >> >>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <[email protected]>: >>> >>> >>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run >>> >>> futures and their callbacks. But as Ufuk said, each task will spawn >>> it’s own >>> >>> thread and if you set the parallelism to 32 then you should have 32 >>> threads >>> >>> running. >>> >>> >>> >>> >>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli < >>> [email protected]> >>> >>> wrote: >>> >>>> >>> >>>> In fact, I don't use it. I just had to crawl back the runtime >>> >>>> implementation to get to the point where parallelism was switching >>> from 32 >>> >>>> to 8. >>> >>>> >>> >>>> saluti, >>> >>>> Stefano >>> >>>> >>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <[email protected]>: >>> >>>>> >>> >>>>> Hi, >>> >>>>> >>> >>>>> for what do you use the ExecutionContext? That should actually be >>> >>>>> something which you shouldn’t be concerned with since it is only >>> used >>> >>>>> internally by the runtime. >>> >>>>> >>> >>>>> Cheers, >>> >>>>> Till >>> >>>>> >>> >>>>> >>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli < >>> [email protected]> >>> >>>>> wrote: >>> >>>>>> >>> >>>>>> Well, in theory yes. Each task has a thread, but only a number is >>> run >>> >>>>>> in parallel (the job of the scheduler). Parallelism is set in the >>> >>>>>> environment. However, whereas the parallelism parameter is set >>> and read >>> >>>>>> correctly, when it comes to actual starting of the threads, the >>> number is >>> >>>>>> fix to 8. We run a debugger to get to the point where the thread >>> was >>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the >>> parallelims set >>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the >>> creation of >>> >>>>>> just 8 connections although parallelism is much higher. >>> >>>>>> >>> >>>>>> My question is whether this is a bug (or a feature) of the >>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some >>> variable >>> >>>>>> assignment in setting up of the MiniCluster, involving >>> parallelism and >>> >>>>>> 'default values'. Default values in terms of parallelism are >>> based on the >>> >>>>>> number of cores. >>> >>>>>> >>> >>>>>> thanks a lot for the support! >>> >>>>>> >>> >>>>>> saluti, >>> >>>>>> Stefano >>> >>>>>> >>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <[email protected]>: >>> >>>>>>> >>> >>>>>>> Hey Stefano, >>> >>>>>>> >>> >>>>>>> this should work by setting the parallelism on the environment, >>> e.g. >>> >>>>>>> >>> >>>>>>> env.setParallelism(32) >>> >>>>>>> >>> >>>>>>> Is this what you are doing? >>> >>>>>>> >>> >>>>>>> The task threads are not part of a pool, but each submitted task >>> >>>>>>> creates its own Thread. >>> >>>>>>> >>> >>>>>>> – Ufuk >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier >>> >>>>>>> <[email protected]> wrote: >>> >>>>>>> > Any help here? I think that the problem is that the JobManager >>> >>>>>>> > creates the >>> >>>>>>> > executionContext of the scheduler with >>> >>>>>>> > >>> >>>>>>> > val executionContext = ExecutionContext.fromExecutor(new >>> >>>>>>> > ForkJoinPool()) >>> >>>>>>> > >>> >>>>>>> > and thus the number of concurrently running threads is limited >>> to >>> >>>>>>> > the number >>> >>>>>>> > of cores (using the default constructor of the ForkJoinPool). >>> >>>>>>> > What do you think? >>> >>>>>>> > >>> >>>>>>> > >>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli >>> >>>>>>> > <[email protected]> >>> >>>>>>> > wrote: >>> >>>>>>> >> >>> >>>>>>> >> Hi guys, >>> >>>>>>> >> >>> >>>>>>> >> I am trying to test a job that should run a number of tasks to >>> >>>>>>> >> read from a >>> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and the >>> >>>>>>> >> reading run >>> >>>>>>> >> smoothly, but I cannot seem to be able to move above the >>> limit of >>> >>>>>>> >> 8 >>> >>>>>>> >> concurrent threads running. 8 is of course the number of >>> cores of >>> >>>>>>> >> my >>> >>>>>>> >> machine. >>> >>>>>>> >> >>> >>>>>>> >> I have tried working around configurations and settings, but >>> the >>> >>>>>>> >> Executor >>> >>>>>>> >> within the ExecutionContext keeps on having a parallelism of >>> 8. >>> >>>>>>> >> Although, of >>> >>>>>>> >> course, the parallelism of the execution environment is much >>> >>>>>>> >> higher (in fact >>> >>>>>>> >> I have many more tasks to be allocated). >>> >>>>>>> >> >>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster >>> configuration >>> >>>>>>> >> that may >>> >>>>>>> >> just override/neglect my wish for higher degree of >>> parallelism. Is >>> >>>>>>> >> there a >>> >>>>>>> >> way for me to work around this issue? >>> >>>>>>> >> >>> >>>>>>> >> please let me know. Thanks a lot for you help! :-) >>> >>>>>>> >> >>> >>>>>>> >> saluti, >>> >>>>>>> >> Stefano >>> >>>>>>> > >>> >>>>>>> > >>> >>>>>>> > >>> >>>>>> >>> >>>>>> >>> >>>>> >>> >>>> >>> >>> >>> >> >>> > >>> >> >> >
