Thanks for the reply. Unfortunately that project was unexpectedly cancelled but for other reasons. I was happy to work on it, and hopefully gained some insight. I have another question today unrelated towards Elasticsearch sinks, and will ask there.
On Fri, Jan 5, 2018 at 2:52 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Colin, > > There are two things that come to my mind: > > 1) You mentioned "suspect jobs are grouping by a field of constant > values". Does that mean that the grouping key is always constant? Flink > parallelizes the window computation per key, i.e., there is one thread per > key. Although it would be possible to perform pre-aggregations, this is not > done yet. There is an effort to add support for this to the DataStream API > [1]. The Table API will hopefully leverage this once it has been added to > the DataStream API. > 2) Another reason for backpressure can be non-aligned watermarks, i.e., > the watermarks of different partitions diverge too much from each other. In > this case, windows cannot be finalized because everything is aligned to the > lowest watermark. > > Hope this helps to clarify things. > > Best, Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-7561 > > 2017-12-30 0:11 GMT+01:00 Colin Williams <colin.williams.seat...@gmail.com > >: > >> Hi Timo and flink-user, >> >> >> It's been a few weeks and we've made some changes to the application >> mentioned on this email. we've also updated for flink 1.4 . We are using >> the SQL / Table API with a tumbling window and user defined agg to generate >> a SQL query string like: >> >> >> SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)), >> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY >> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE). >> >> >> >> I've experimented with parallelism of the operators and setting the >> environments parallelism as suggested. I've been setting parallelism values >> of 2 or 4 to all operators except the consumer and sink. >> >> >> For some jobs with large kafka source topics, under load we experience >> back pressure and see some lag. But when trying to address via parallelism: >> so far I've only seen very degraded performance from the increased >> parallelism settings. >> >> >> Furthermore, the suspect jobs are grouping by a field of constant values. >> Then these jobs usually have 40,000 or so grouped records enter the >> aggregator for each minute window. >> >> >> >> I would think that the tumbling windows would allow the job to process >> each window in another task slot, parallelizing each window. But maybe >> that's not happening? >> >> >> >> Can you help us to understand why parallelizing the job only has a >> degraded impact on performance and what I can do to change this? >> >> >> >> >> Happy New Year! >> >> >> >> Colin Williams >> >> >> >> >> >> >> >> >> >> >> On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <twal...@apache.org> wrote: >> >>> Hi Colin, >>> >>> unfortunately, selecting the parallelism for parts of a SQL query is not >>> supported yet. By default, tumbling window operators use the default >>> parallelism of the environment. Simple project and select operations have >>> the same parallelism as the inputs they are applied on. >>> >>> I think the easiest solution so far is to explicilty set the parallelism >>> of operators that are not part of the Table API and use the environment's >>> parallelism to scale the SQL query. >>> >>> I hope that helps. >>> >>> Regards, >>> Timo >>> >>> >>> Am 12/9/17 um 3:06 AM schrieb Colin Williams: >>> >>> Hello, >>> >>> I've inherited some flink application code. >>> >>> We're currently creating a table using a Tumbling SQL query similar to >>> the first example in >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3 >>> /dev/table/sql.html#group-windows >>> >>> Where each generated SQL query looks something like >>> >>> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), >>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY >>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE) >>> >>> We are also using a UDFAGG function in some of the queries which I think >>> might be cleaned up and optimized a bit (using scala types and possibly not >>> well implemented) >>> >>> We then turn the result table back into a datastream using >>> toAppendStream, and eventually add a derivative stream to a sink. We've >>> configured TimeCharacteristic to event-time processing. >>> >>> In some streaming scenarios everything is working fine with a >>> parallelism of 1, but in others it appears that we can't keep up with the >>> event source. >>> >>> Then we are investigating how to enable parallelism specifically on the >>> SQL table query or aggregator. >>> >>> Can anyone suggest a good way to go about this? It wasn't clear from the >>> documentation. >>> >>> Best, >>> >>> Colin Williams >>> >>> >>> >>> >>> >> >