Hi Timo and flink-user,
It's been a few weeks and we've made some changes to the application mentioned on this email. we've also updated for flink 1.4 . We are using the SQL / Table API with a tumbling window and user defined agg to generate a SQL query string like: SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)), TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE). I've experimented with parallelism of the operators and setting the environments parallelism as suggested. I've been setting parallelism values of 2 or 4 to all operators except the consumer and sink. For some jobs with large kafka source topics, under load we experience back pressure and see some lag. But when trying to address via parallelism: so far I've only seen very degraded performance from the increased parallelism settings. Furthermore, the suspect jobs are grouping by a field of constant values. Then these jobs usually have 40,000 or so grouped records enter the aggregator for each minute window. I would think that the tumbling windows would allow the job to process each window in another task slot, parallelizing each window. But maybe that's not happening? Can you help us to understand why parallelizing the job only has a degraded impact on performance and what I can do to change this? Happy New Year! Colin Williams On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <twal...@apache.org> wrote: > Hi Colin, > > unfortunately, selecting the parallelism for parts of a SQL query is not > supported yet. By default, tumbling window operators use the default > parallelism of the environment. Simple project and select operations have > the same parallelism as the inputs they are applied on. > > I think the easiest solution so far is to explicilty set the parallelism > of operators that are not part of the Table API and use the environment's > parallelism to scale the SQL query. > > I hope that helps. > > Regards, > Timo > > > Am 12/9/17 um 3:06 AM schrieb Colin Williams: > > Hello, > > I've inherited some flink application code. > > We're currently creating a table using a Tumbling SQL query similar to the > first example in > > https://ci.apache.org/projects/flink/flink-docs-release-1. > 3/dev/table/sql.html#group-windows > > Where each generated SQL query looks something like > > SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), > TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY > measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE) > > We are also using a UDFAGG function in some of the queries which I think > might be cleaned up and optimized a bit (using scala types and possibly not > well implemented) > > We then turn the result table back into a datastream using toAppendStream, > and eventually add a derivative stream to a sink. We've configured > TimeCharacteristic to event-time processing. > > In some streaming scenarios everything is working fine with a parallelism > of 1, but in others it appears that we can't keep up with the event source. > > Then we are investigating how to enable parallelism specifically on the > SQL table query or aggregator. > > Can anyone suggest a good way to go about this? It wasn't clear from the > documentation. > > Best, > > Colin Williams > > > > >