Hi Fabian, Thanks a lot for the help. The scala DataSet, at least in version 1.5.0, declares javaSet as private[flink], so I cannot access it directly. Nevertheless, I managed to get around it by using the java environment:
val env = org.apache.flink.api.java.ExecutionEnvironment. getExecutionEnvironment val inputFormat = getInputFormat(query, dbUrl, properties) val outputFormat = getOutputFormat(dbUrl, properties) val source = env.createInput(inputFormat) val sdp = source.getSplitDataProperties sdp.splitsPartitionedBy(0) sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING)) // transform java DataSet to scala DataSet... new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]]) .groupBy(0, 1) .combineGroup(groupCombiner) .withForwardedFields("f0->_1") .groupBy(0, 1) .reduceGroup(groupReducer) .withForwardedFields("_1") .output(outputFormat) It seems to work well, and the semantic annotation does remove a hash partition from the execution plan. Regards, Alexis. On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Alexis, > > The Scala API does not expose a DataSource object but only a Scala DataSet > which wraps the Java object. > You can get the SplitDataProperties from the Scala DataSet as follows: > > val dbData: DataSet[...] = ??? > val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties > > So you first have to get the wrapped Java DataSet, cast it to DataSource > and then get the properties. > It's not very nice, but should work. > > In order to use SDPs, you should be a bit familiar how physical data > properties are propagated and discarded in the optimizer. > For example, applying a simple MapFunction removes all properties because > the function might have changed the fields on which a DataSet is > partitioned or sorted. > You can expose the behavior of a function to the optimizer by using > Semantic Annotations [1] > > Some comments on the code and plan you shared: > - You might want to add hostname to ORDER BY to have the output grouped by > (ts, hostname). > - Check the Global and Local data properties in the plan to validate that > the SDP were correctly interpreted. > - If the data is already correctly partitioned and sorted, you might not > need the Combiners. In either case, you properly want to annotate them with > Forward Field annoations. > > The number of source tasks is unrelated to the number of splits. If you > have more tasks than splits, some tasks won't process any data. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations > > > 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: > >> Hi Fabian, >> >> Thanks for the clarification. I have a few remarks, but let me provide >> more concrete information. You can find the query I'm using, the >> JDBCInputFormat creation, and the execution plan in this github gist: >> >> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d >> >> I cannot call getSplitDataProperties because env.createInput(inputFormat) >> returns a DataSet, not a DataSource. In the code, I do this instead: >> >> val javaEnv = >> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment >> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, >> "example") >> >> which feels wrong (the constructor doesn't accept a Scala environment). >> Is there a better alternative? >> >> I see absolutely no difference in the execution plan whether I use SDP or >> not, so therefore the results are indeed the same. Is this expected? >> >> My ParameterValuesProvider specifies 2 splits, yet the execution plan >> shows Parallelism=24. Even the source code is a bit ambiguous, considering >> that the constructor for GenericInputSplit takes two parameters: >> partitionNumber and totalNumberOfPartitions. Should I assume that there are >> 2 splits divided into 24 partitions? >> >> Regards, >> Alexis. >> >> >> >> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Alexis, >>> >>> First of all, I think you leverage the partitioning and sorting >>> properties of the data returned by the database using SplitDataProperties. >>> However, please be aware that SplitDataProperties are a rather >>> experimental feature. >>> >>> If used without query parameters, the JDBCInputFormat generates a single >>> split and queries the database just once. If you want to leverage >>> parallelism, you have to specify a query with parameters in the WHERE >>> clause to read different parts of the table. >>> Note, depending on the configuration of the database, multiple queries >>> result in multiple full scans. Hence, it might make sense to have an index >>> on the partitioning columns. >>> >>> If properly configured, the JDBCInputFormat generates multiple splits >>> which are partitioned. Since the partitioning is encoded in the query, it >>> is opaque to Flink and must be explicitly declared. >>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells >>> Flink that all records with the same value in the partitioning field are >>> read from the same split, i.e, the full data is partitioned on the >>> attribute across splits. >>> The same can be done for ordering if the queries of the JDBCInputFormat >>> is specified with an ORDER BY clause. >>> Partitioning and grouping are two different things. You can define a >>> query that partitions on hostname and orders by hostname and timestamp and >>> declare these properties in the SDP. >>> >>> You can get a SDP object by calling DataSource.getSplitDataProperties(). >>> In your example this would be source.getSplitDataProperties(). >>> >>> Whatever you do, you should carefully check the execution plan >>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and >>> validate that the result are identical whether you use SDP or not. >>> >>> Best, Fabian >>> >>> [1] https://flink.apache.org/visualizer/ >>> >>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>: >>> >>>> Hi everyone, >>>> >>>> I have the following scenario: I have a database table with 3 columns: >>>> a host (string), a timestamp, and an integer ID. Conceptually, what I'd >>>> like to do is: >>>> >>>> group by host and timestamp -> based on all the IDs in each group, >>>> create a mapping to n new tuples -> for each unique tuple, count how many >>>> times it appeared across the resulting data >>>> >>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1 >>>> >>>> What I'm currently doing is roughly: >>>> >>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish() >>>> val source = environment.createInput(inut) >>>> source.partitionByHash("host", >>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2) >>>> >>>> The query given to JDBCInputFormat provides results ordered by host and >>>> timestamp, and I was wondering if performance can be improved by specifying >>>> this in the code. I've looked at >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html >>>> and >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html, >>>> but I still have some questions: >>>> >>>> - If a split is a subset of a partition, what is the meaning of >>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a >>>> split is divided into partitions, meaning that a partition would be a >>>> subset of a split. >>>> - At which point can I retrieve and adjust a SplitDataProperties >>>> instance, if possible at all? >>>> - If I wanted a coarser parallelization where each slot gets all the >>>> data for the same host, would I have to manually create the sub-groups >>>> based on timestamp? >>>> >>>> Regards, >>>> Alexis. >>>> >>>> >>> >