Hi Fabian,

Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
declares javaSet as private[flink], so I cannot access it directly.
Nevertheless, I managed to get around it by using the java environment:

val env = org.apache.flink.api.java.ExecutionEnvironment.
getExecutionEnvironment

val inputFormat = getInputFormat(query, dbUrl, properties)
val outputFormat = getOutputFormat(dbUrl, properties)

val source = env.createInput(inputFormat)
val sdp = source.getSplitDataProperties
sdp.splitsPartitionedBy(0)
sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))

// transform java DataSet to scala DataSet...
new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
  .groupBy(0, 1)
  .combineGroup(groupCombiner)
  .withForwardedFields("f0->_1")
  .groupBy(0, 1)
  .reduceGroup(groupReducer)
  .withForwardedFields("_1")
  .output(outputFormat)

It seems to work well, and the semantic annotation does remove a hash
partition from the execution plan.

Regards,
Alexis.


On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Alexis,
>
> The Scala API does not expose a DataSource object but only a Scala DataSet
> which wraps the Java object.
> You can get the SplitDataProperties from the Scala DataSet as follows:
>
> val dbData: DataSet[...] = ???
> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>
> So you first have to get the wrapped Java DataSet, cast it to DataSource
> and then get the properties.
> It's not very nice, but should work.
>
> In order to use SDPs, you should be a bit familiar how physical data
> properties are propagated and discarded in the optimizer.
> For example, applying a simple MapFunction removes all properties because
> the function might have changed the fields on which a DataSet is
> partitioned or sorted.
> You can expose the behavior of a function to the optimizer by using
> Semantic Annotations [1]
>
> Some comments on the code and plan you shared:
> - You might want to add hostname to ORDER BY to have the output grouped by
> (ts, hostname).
> - Check the Global and Local data properties in the plan to validate that
> the SDP were correctly interpreted.
> - If the data is already correctly partitioned and sorted, you might not
> need the Combiners. In either case, you properly want to annotate them with
> Forward Field annoations.
>
> The number of source tasks is unrelated to the number of splits. If you
> have more tasks than splits, some tasks won't process any data.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>
>
> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the clarification. I have a few remarks, but let me provide
>> more concrete information. You can find the query I'm using, the
>> JDBCInputFormat creation, and the execution plan in this github gist:
>>
>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>
>> I cannot call getSplitDataProperties because env.createInput(inputFormat)
>> returns a DataSet, not a DataSource. In the code, I do this instead:
>>
>> val javaEnv =
>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>> "example")
>>
>> which feels wrong (the constructor doesn't accept a Scala environment).
>> Is there a better alternative?
>>
>> I see absolutely no difference in the execution plan whether I use SDP or
>> not, so therefore the results are indeed the same. Is this expected?
>>
>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>> that the constructor for GenericInputSplit takes two parameters:
>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>> 2 splits divided into 24 partitions?
>>
>> Regards,
>> Alexis.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Alexis,
>>>
>>> First of all, I think you leverage the partitioning and sorting
>>> properties of the data returned by the database using SplitDataProperties.
>>> However, please be aware that SplitDataProperties are a rather
>>> experimental feature.
>>>
>>> If used without query parameters, the JDBCInputFormat generates a single
>>> split and queries the database just once. If you want to leverage
>>> parallelism, you have to specify a query with parameters in the WHERE
>>> clause to read different parts of the table.
>>> Note, depending on the configuration of the database, multiple queries
>>> result in multiple full scans. Hence, it might make sense to have an index
>>> on the partitioning columns.
>>>
>>> If properly configured, the JDBCInputFormat generates multiple splits
>>> which are partitioned. Since the partitioning is encoded in the query, it
>>> is opaque to Flink and must be explicitly declared.
>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
>>> Flink that all records with the same value in the partitioning field are
>>> read from the same split, i.e, the full data is partitioned on the
>>> attribute across splits.
>>> The same can be done for ordering if the queries of the JDBCInputFormat
>>> is specified with an ORDER BY clause.
>>> Partitioning and grouping are two different things. You can define a
>>> query that partitions on hostname and orders by hostname and timestamp and
>>> declare these properties in the SDP.
>>>
>>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>>> In your example this would be source.getSplitDataProperties().
>>>
>>> Whatever you do, you should carefully check the execution plan
>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
>>> validate that the result are identical whether you use SDP or not.
>>>
>>> Best, Fabian
>>>
>>> [1] https://flink.apache.org/visualizer/
>>>
>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>>
>>>> Hi everyone,
>>>>
>>>> I have the following scenario: I have a database table with 3 columns:
>>>> a host (string), a timestamp, and an integer ID. Conceptually, what I'd
>>>> like to do is:
>>>>
>>>> group by host and timestamp -> based on all the IDs in each group,
>>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>>> times it appeared across the resulting data
>>>>
>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>
>>>> What I'm currently doing is roughly:
>>>>
>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>> val source = environment.createInput(inut)
>>>> source.partitionByHash("host",
>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2)
>>>>
>>>> The query given to JDBCInputFormat provides results ordered by host and
>>>> timestamp, and I was wondering if performance can be improved by specifying
>>>> this in the code. I've looked at
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>>>> and
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>>>> but I still have some questions:
>>>>
>>>> - If a split is a subset of a partition, what is the meaning of
>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>>>> split is divided into partitions, meaning that a partition would be a
>>>> subset of a split.
>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>> instance, if possible at all?
>>>> - If I wanted a coarser parallelization where each slot gets all the
>>>> data for the same host, would I have to manually create the sub-groups
>>>> based on timestamp?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>>>
>

Reply via email to