It ended up being a wrong configuration of the cluster; there was only 1
task manager with 1 slot.

If I submit a job with "flink run -p 24 ...", will the job hang until at
least 24 slots are available?

Regards,
Alexis.

On Fri, 10 Aug 2018, 14:01 Fabian Hueske <fhue...@gmail.com> wrote:

> Can you share the plan for the program?
>
> Are you sure that more than 1 split is generated by the JdbcInputFormat?
>
> 2018-08-10 12:04 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>
>> It seems I may have spoken too soon. After executing the job with more
>> data, I can see the following things in the Flink dashboard:
>>
>> - The first subtask is a chained DataSource -> GroupCombine. Even with
>> parallelism set to 24 and a ParameterValuesProvider returning
>> Array(Array("first"), Array("second")), only 1 thread processed all records.
>> - The second subtask is a Sorted Group Reduce, and I see two weird things:
>>   + The first subtask sent 5,923,802 records, yet the second subtask only
>> received 5,575,154 records?
>>   + Again, everything was done in a single thread, even though a groupBy
>> was used.
>> - The third and final subtask is a sink that saves back to the database.
>>
>> Does anyone know why parallelism is not being used?
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <alexis.sa...@gmail.com>
>> wrote:
>>
>>> Hi Fabian,
>>>
>>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
>>> declares javaSet as private[flink], so I cannot access it directly.
>>> Nevertheless, I managed to get around it by using the java environment:
>>>
>>> val env = org.apache.flink.api.java.ExecutionEnvironment.
>>> getExecutionEnvironment
>>>
>>> val inputFormat = getInputFormat(query, dbUrl, properties)
>>> val outputFormat = getOutputFormat(dbUrl, properties)
>>>
>>> val source = env.createInput(inputFormat)
>>> val sdp = source.getSplitDataProperties
>>> sdp.splitsPartitionedBy(0)
>>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>>
>>> // transform java DataSet to scala DataSet...
>>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>>   .groupBy(0, 1)
>>>   .combineGroup(groupCombiner)
>>>   .withForwardedFields("f0->_1")
>>>   .groupBy(0, 1)
>>>   .reduceGroup(groupReducer)
>>>   .withForwardedFields("_1")
>>>   .output(outputFormat)
>>>
>>> It seems to work well, and the semantic annotation does remove a hash
>>> partition from the execution plan.
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Alexis,
>>>>
>>>> The Scala API does not expose a DataSource object but only a Scala
>>>> DataSet which wraps the Java object.
>>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>>
>>>> val dbData: DataSet[...] = ???
>>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>>>
>>>> So you first have to get the wrapped Java DataSet, cast it to
>>>> DataSource and then get the properties.
>>>> It's not very nice, but should work.
>>>>
>>>> In order to use SDPs, you should be a bit familiar how physical data
>>>> properties are propagated and discarded in the optimizer.
>>>> For example, applying a simple MapFunction removes all properties
>>>> because the function might have changed the fields on which a DataSet is
>>>> partitioned or sorted.
>>>> You can expose the behavior of a function to the optimizer by using
>>>> Semantic Annotations [1]
>>>>
>>>> Some comments on the code and plan you shared:
>>>> - You might want to add hostname to ORDER BY to have the output grouped
>>>> by (ts, hostname).
>>>> - Check the Global and Local data properties in the plan to validate
>>>> that the SDP were correctly interpreted.
>>>> - If the data is already correctly partitioned and sorted, you might
>>>> not need the Combiners. In either case, you properly want to annotate them
>>>> with Forward Field annoations.
>>>>
>>>> The number of source tasks is unrelated to the number of splits. If you
>>>> have more tasks than splits, some tasks won't process any data.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>>>>
>>>>
>>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the clarification. I have a few remarks, but let me provide
>>>>> more concrete information. You can find the query I'm using, the
>>>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>>>
>>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>>>
>>>>> I cannot call getSplitDataProperties because
>>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>>>> code, I do this instead:
>>>>>
>>>>> val javaEnv =
>>>>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>>>> "example")
>>>>>
>>>>> which feels wrong (the constructor doesn't accept a Scala
>>>>> environment). Is there a better alternative?
>>>>>
>>>>> I see absolutely no difference in the execution plan whether I use SDP
>>>>> or not, so therefore the results are indeed the same. Is this expected?
>>>>>
>>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>>>>> that the constructor for GenericInputSplit takes two parameters:
>>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there 
>>>>> are
>>>>> 2 splits divided into 24 partitions?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Alexis,
>>>>>>
>>>>>> First of all, I think you leverage the partitioning and sorting
>>>>>> properties of the data returned by the database using 
>>>>>> SplitDataProperties.
>>>>>> However, please be aware that SplitDataProperties are a rather
>>>>>> experimental feature.
>>>>>>
>>>>>> If used without query parameters, the JDBCInputFormat generates a
>>>>>> single split and queries the database just once. If you want to leverage
>>>>>> parallelism, you have to specify a query with parameters in the WHERE
>>>>>> clause to read different parts of the table.
>>>>>> Note, depending on the configuration of the database, multiple
>>>>>> queries result in multiple full scans. Hence, it might make sense to have
>>>>>> an index on the partitioning columns.
>>>>>>
>>>>>> If properly configured, the JDBCInputFormat generates multiple splits
>>>>>> which are partitioned. Since the partitioning is encoded in the query, it
>>>>>> is opaque to Flink and must be explicitly declared.
>>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method
>>>>>> tells Flink that all records with the same value in the partitioning 
>>>>>> field
>>>>>> are read from the same split, i.e, the full data is partitioned on the
>>>>>> attribute across splits.
>>>>>> The same can be done for ordering if the queries of the
>>>>>> JDBCInputFormat is specified with an ORDER BY clause.
>>>>>> Partitioning and grouping are two different things. You can define a
>>>>>> query that partitions on hostname and orders by hostname and timestamp 
>>>>>> and
>>>>>> declare these properties in the SDP.
>>>>>>
>>>>>> You can get a SDP object by calling
>>>>>> DataSource.getSplitDataProperties(). In your example this would be
>>>>>> source.getSplitDataProperties().
>>>>>>
>>>>>> Whatever you do, you should carefully check the execution plan
>>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] 
>>>>>> and
>>>>>> validate that the result are identical whether you use SDP or not.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1] https://flink.apache.org/visualizer/
>>>>>>
>>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <alexis.sa...@gmail.com>:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I have the following scenario: I have a database table with 3
>>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually,
>>>>>>> what I'd like to do is:
>>>>>>>
>>>>>>> group by host and timestamp -> based on all the IDs in each group,
>>>>>>> create a mapping to n new tuples -> for each unique tuple, count how 
>>>>>>> many
>>>>>>> times it appeared across the resulting data
>>>>>>>
>>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>>>>
>>>>>>> What I'm currently doing is roughly:
>>>>>>>
>>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>>>>> val source = environment.createInput(inut)
>>>>>>> source.partitionByHash("host",
>>>>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2)
>>>>>>>
>>>>>>> The query given to JDBCInputFormat provides results ordered by host
>>>>>>> and timestamp, and I was wondering if performance can be improved by
>>>>>>> specifying this in the code. I've looked at
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>>>>>>> and
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>>>>>>> but I still have some questions:
>>>>>>>
>>>>>>> - If a split is a subset of a partition, what is the meaning of
>>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing 
>>>>>>> that a
>>>>>>> split is divided into partitions, meaning that a partition would be a
>>>>>>> subset of a split.
>>>>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>>>>> instance, if possible at all?
>>>>>>> - If I wanted a coarser parallelization where each slot gets all the
>>>>>>> data for the same host, would I have to manually create the sub-groups
>>>>>>> based on timestamp?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Alexis.
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>

Reply via email to