It ended up being a wrong configuration of the cluster; there was only 1 task manager with 1 slot.
If I submit a job with "flink run -p 24 ...", will the job hang until at least 24 slots are available? Regards, Alexis. On Fri, 10 Aug 2018, 14:01 Fabian Hueske <fhue...@gmail.com> wrote: > Can you share the plan for the program? > > Are you sure that more than 1 split is generated by the JdbcInputFormat? > > 2018-08-10 12:04 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: > >> It seems I may have spoken too soon. After executing the job with more >> data, I can see the following things in the Flink dashboard: >> >> - The first subtask is a chained DataSource -> GroupCombine. Even with >> parallelism set to 24 and a ParameterValuesProvider returning >> Array(Array("first"), Array("second")), only 1 thread processed all records. >> - The second subtask is a Sorted Group Reduce, and I see two weird things: >> + The first subtask sent 5,923,802 records, yet the second subtask only >> received 5,575,154 records? >> + Again, everything was done in a single thread, even though a groupBy >> was used. >> - The third and final subtask is a sink that saves back to the database. >> >> Does anyone know why parallelism is not being used? >> >> Regards, >> Alexis. >> >> >> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <alexis.sa...@gmail.com> >> wrote: >> >>> Hi Fabian, >>> >>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0, >>> declares javaSet as private[flink], so I cannot access it directly. >>> Nevertheless, I managed to get around it by using the java environment: >>> >>> val env = org.apache.flink.api.java.ExecutionEnvironment. >>> getExecutionEnvironment >>> >>> val inputFormat = getInputFormat(query, dbUrl, properties) >>> val outputFormat = getOutputFormat(dbUrl, properties) >>> >>> val source = env.createInput(inputFormat) >>> val sdp = source.getSplitDataProperties >>> sdp.splitsPartitionedBy(0) >>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING)) >>> >>> // transform java DataSet to scala DataSet... >>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]]) >>> .groupBy(0, 1) >>> .combineGroup(groupCombiner) >>> .withForwardedFields("f0->_1") >>> .groupBy(0, 1) >>> .reduceGroup(groupReducer) >>> .withForwardedFields("_1") >>> .output(outputFormat) >>> >>> It seems to work well, and the semantic annotation does remove a hash >>> partition from the execution plan. >>> >>> Regards, >>> Alexis. >>> >>> >>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Alexis, >>>> >>>> The Scala API does not expose a DataSource object but only a Scala >>>> DataSet which wraps the Java object. >>>> You can get the SplitDataProperties from the Scala DataSet as follows: >>>> >>>> val dbData: DataSet[...] = ??? >>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties >>>> >>>> So you first have to get the wrapped Java DataSet, cast it to >>>> DataSource and then get the properties. >>>> It's not very nice, but should work. >>>> >>>> In order to use SDPs, you should be a bit familiar how physical data >>>> properties are propagated and discarded in the optimizer. >>>> For example, applying a simple MapFunction removes all properties >>>> because the function might have changed the fields on which a DataSet is >>>> partitioned or sorted. >>>> You can expose the behavior of a function to the optimizer by using >>>> Semantic Annotations [1] >>>> >>>> Some comments on the code and plan you shared: >>>> - You might want to add hostname to ORDER BY to have the output grouped >>>> by (ts, hostname). >>>> - Check the Global and Local data properties in the plan to validate >>>> that the SDP were correctly interpreted. >>>> - If the data is already correctly partitioned and sorted, you might >>>> not need the Combiners. In either case, you properly want to annotate them >>>> with Forward Field annoations. >>>> >>>> The number of source tasks is unrelated to the number of splits. If you >>>> have more tasks than splits, some tasks won't process any data. >>>> >>>> Best, Fabian >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations >>>> >>>> >>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>>> >>>>> Hi Fabian, >>>>> >>>>> Thanks for the clarification. I have a few remarks, but let me provide >>>>> more concrete information. You can find the query I'm using, the >>>>> JDBCInputFormat creation, and the execution plan in this github gist: >>>>> >>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d >>>>> >>>>> I cannot call getSplitDataProperties because >>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the >>>>> code, I do this instead: >>>>> >>>>> val javaEnv = >>>>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment >>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, >>>>> "example") >>>>> >>>>> which feels wrong (the constructor doesn't accept a Scala >>>>> environment). Is there a better alternative? >>>>> >>>>> I see absolutely no difference in the execution plan whether I use SDP >>>>> or not, so therefore the results are indeed the same. Is this expected? >>>>> >>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan >>>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering >>>>> that the constructor for GenericInputSplit takes two parameters: >>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there >>>>> are >>>>> 2 splits divided into 24 partitions? >>>>> >>>>> Regards, >>>>> Alexis. >>>>> >>>>> >>>>> >>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Alexis, >>>>>> >>>>>> First of all, I think you leverage the partitioning and sorting >>>>>> properties of the data returned by the database using >>>>>> SplitDataProperties. >>>>>> However, please be aware that SplitDataProperties are a rather >>>>>> experimental feature. >>>>>> >>>>>> If used without query parameters, the JDBCInputFormat generates a >>>>>> single split and queries the database just once. If you want to leverage >>>>>> parallelism, you have to specify a query with parameters in the WHERE >>>>>> clause to read different parts of the table. >>>>>> Note, depending on the configuration of the database, multiple >>>>>> queries result in multiple full scans. Hence, it might make sense to have >>>>>> an index on the partitioning columns. >>>>>> >>>>>> If properly configured, the JDBCInputFormat generates multiple splits >>>>>> which are partitioned. Since the partitioning is encoded in the query, it >>>>>> is opaque to Flink and must be explicitly declared. >>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method >>>>>> tells Flink that all records with the same value in the partitioning >>>>>> field >>>>>> are read from the same split, i.e, the full data is partitioned on the >>>>>> attribute across splits. >>>>>> The same can be done for ordering if the queries of the >>>>>> JDBCInputFormat is specified with an ORDER BY clause. >>>>>> Partitioning and grouping are two different things. You can define a >>>>>> query that partitions on hostname and orders by hostname and timestamp >>>>>> and >>>>>> declare these properties in the SDP. >>>>>> >>>>>> You can get a SDP object by calling >>>>>> DataSource.getSplitDataProperties(). In your example this would be >>>>>> source.getSplitDataProperties(). >>>>>> >>>>>> Whatever you do, you should carefully check the execution plan >>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] >>>>>> and >>>>>> validate that the result are identical whether you use SDP or not. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> [1] https://flink.apache.org/visualizer/ >>>>>> >>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>>>>> >>>>>>> Hi everyone, >>>>>>> >>>>>>> I have the following scenario: I have a database table with 3 >>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually, >>>>>>> what I'd like to do is: >>>>>>> >>>>>>> group by host and timestamp -> based on all the IDs in each group, >>>>>>> create a mapping to n new tuples -> for each unique tuple, count how >>>>>>> many >>>>>>> times it appeared across the resulting data >>>>>>> >>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1 >>>>>>> >>>>>>> What I'm currently doing is roughly: >>>>>>> >>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish() >>>>>>> val source = environment.createInput(inut) >>>>>>> source.partitionByHash("host", >>>>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2) >>>>>>> >>>>>>> The query given to JDBCInputFormat provides results ordered by host >>>>>>> and timestamp, and I was wondering if performance can be improved by >>>>>>> specifying this in the code. I've looked at >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html >>>>>>> and >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, >>>>>>> but I still have some questions: >>>>>>> >>>>>>> - If a split is a subset of a partition, what is the meaning of >>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing >>>>>>> that a >>>>>>> split is divided into partitions, meaning that a partition would be a >>>>>>> subset of a split. >>>>>>> - At which point can I retrieve and adjust a SplitDataProperties >>>>>>> instance, if possible at all? >>>>>>> - If I wanted a coarser parallelization where each slot gets all the >>>>>>> data for the same host, would I have to manually create the sub-groups >>>>>>> based on timestamp? >>>>>>> >>>>>>> Regards, >>>>>>> Alexis. >>>>>>> >>>>>>> >>>>>> >>>> >