Hi Colin, There are two things that come to my mind:
1) You mentioned "suspect jobs are grouping by a field of constant values". Does that mean that the grouping key is always constant? Flink parallelizes the window computation per key, i.e., there is one thread per key. Although it would be possible to perform pre-aggregations, this is not done yet. There is an effort to add support for this to the DataStream API [1]. The Table API will hopefully leverage this once it has been added to the DataStream API. 2) Another reason for backpressure can be non-aligned watermarks, i.e., the watermarks of different partitions diverge too much from each other. In this case, windows cannot be finalized because everything is aligned to the lowest watermark. Hope this helps to clarify things. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-7561 2017-12-30 0:11 GMT+01:00 Colin Williams <colin.williams.seat...@gmail.com>: > Hi Timo and flink-user, > > > It's been a few weeks and we've made some changes to the application > mentioned on this email. we've also updated for flink 1.4 . We are using > the SQL / Table API with a tumbling window and user defined agg to generate > a SQL query string like: > > > SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)), > TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY > measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE). > > > > I've experimented with parallelism of the operators and setting the > environments parallelism as suggested. I've been setting parallelism values > of 2 or 4 to all operators except the consumer and sink. > > > For some jobs with large kafka source topics, under load we experience > back pressure and see some lag. But when trying to address via parallelism: > so far I've only seen very degraded performance from the increased > parallelism settings. > > > Furthermore, the suspect jobs are grouping by a field of constant values. > Then these jobs usually have 40,000 or so grouped records enter the > aggregator for each minute window. > > > > I would think that the tumbling windows would allow the job to process > each window in another task slot, parallelizing each window. But maybe > that's not happening? > > > > Can you help us to understand why parallelizing the job only has a > degraded impact on performance and what I can do to change this? > > > > > Happy New Year! > > > > Colin Williams > > > > > > > > > > > On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <twal...@apache.org> wrote: > >> Hi Colin, >> >> unfortunately, selecting the parallelism for parts of a SQL query is not >> supported yet. By default, tumbling window operators use the default >> parallelism of the environment. Simple project and select operations have >> the same parallelism as the inputs they are applied on. >> >> I think the easiest solution so far is to explicilty set the parallelism >> of operators that are not part of the Table API and use the environment's >> parallelism to scale the SQL query. >> >> I hope that helps. >> >> Regards, >> Timo >> >> >> Am 12/9/17 um 3:06 AM schrieb Colin Williams: >> >> Hello, >> >> I've inherited some flink application code. >> >> We're currently creating a table using a Tumbling SQL query similar to >> the first example in >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3 >> /dev/table/sql.html#group-windows >> >> Where each generated SQL query looks something like >> >> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), >> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY >> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE) >> >> We are also using a UDFAGG function in some of the queries which I think >> might be cleaned up and optimized a bit (using scala types and possibly not >> well implemented) >> >> We then turn the result table back into a datastream using >> toAppendStream, and eventually add a derivative stream to a sink. We've >> configured TimeCharacteristic to event-time processing. >> >> In some streaming scenarios everything is working fine with a parallelism >> of 1, but in others it appears that we can't keep up with the event source. >> >> Then we are investigating how to enable parallelism specifically on the >> SQL table query or aggregator. >> >> Can anyone suggest a good way to go about this? It wasn't clear from the >> documentation. >> >> Best, >> >> Colin Williams >> >> >> >> >> >