Hi Fabian, Thanks for your detailed response and sorry for the late response. Your opinions all make sense to me, and here is some thoughts to your open questions:
- Regarding to table without sufficient statistics, especially these kind of "dynamic" table which derived from some arbitrary DataSet whose statistics cannot be analyzed beforehand, i think in first version we can just provide some fake and fixed statistics to let the process work. Another approach is we can save the DataSet as some intermediate result table and do the statistics analyze before further operations. In the future, a more advanced and ideal way is we keep collecting statistics when we running the job and we can have a way to dynamic modify the plan during job executions. - Regrading to parallelism control, i think it's a good use case of statistics. Once we have a good cost estimation and how user expects the performance of the job, we can definitely do some auto tuning for them. I have opened a jira to track the status and detailed implementation steps for this issue: https://issues.apache.org/jira/browse/FLINK-5565. Whoever interests with this topic can continue the discussion there, either in parent jira or sub-tasks. Best, Kurt On Wed, Jan 11, 2017 at 5:56 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Kurt, > > thanks for starting this discussion! > Although, we use Calcite's cost based optimizer we do not use its full > potential. As you correctly identified, this is mainly due to the lack of > reliable statistics. > Moreover, we use Calcite only for logical optimization, i.e., the optimizer > basically rewrites the query and pushed down filters and projections (no > join reodering yet). > For batch queries, the logically optimized plan is translated into a > DataSet program and the DataSet optimizer chooses the physical execution > plan (shipping strategies, hash vs. merge join, etc.). > It would be great if we could improve this situation for batch tables > (stats on streaming tables might change while a query is executed) by doing > the complete optimization (logical & physical) in Calcite. > However, this will be a long way. > > I agree with your first steps to designing a catalog / stats store > component that can store and provide table and column statistics. > Once we have stats about tables, we can start to improve the optimization > step-by-step. > The first thing should be to improve the logical optimization and enable > join reordering. > Once we have that, we can start to chose execution plans for operators by > using the optimizer hints of the DataSet optimizer. This will also involve > tracking the physical properties of intermediate results (sorting, > partitioning, etc.) in Calcite. > > I would also recommend to keep the cost model as simple as possible. > A detailed cost model is hard to reason about and does not really help if > its parameters are imprecise. > There are just too many numbers to get wrong like input cardinalities, > selectivities, or cost ratio of disk to net IO. > > A few open questions remain: > - How do we handle cases where there is not sufficient statistics for all > tables? For example if we have a query on a Table which was derived from at > DataSet (no stats) which is joined with some external tables with stats. > - Should we control the parallelism of operators based on cardinality > information? > > > Best, Fabian > > 2017-01-10 15:22 GMT+01:00 Kurt Young <ykt...@gmail.com>: > > > Hi, > > > > Currently flink already uses cost-based optimizer, but due to the reason > > we didn’t have accurate statistics and the simple cost model, we actually > > don't gain much from this framework. I proposed some improvements in the > > following document and some rough implementation plan: > > https://docs.google.com/document/d/1X7pEWHmuVywbJ8xFtiUY8Cpyw5A1u > > -4c4tKeODp-W-0/ > > > > Hope to hear some feedbacks from you. > > > > best, > > Kurt > > >