Hi ZhaoChun:
I have some questions: 1. How does the TableFunctionScanNode works with other ScanNode? For example: select * from B left join lateral from (select * from B where B.c1 > A.c1) In this SQL, What ScanNodes will be generated for the subquery in FROM clause? A single TableFunctionScanNode? Or a OlapScanNode follows by a TableFunctionScanNode? 2. Can TableFunctionScanNode replace some implementations in GROUPING SET? -- 此致!Best Regards 陈明雨 Mingyu Chen Email: chenmin...@apache.org At 2019-09-30 18:00:34, "Zhao Chun" <zh...@apache.org> wrote: >Hi, all > >I want to support UDTF(user-defined table function) in Doris. Following is >detailed design. > >## Motivation > >Currently in some scenarios, users have the need to expand a row of data >into multiple rows. For example, if a user's field stores an TAG list, then >if he want to count the number of occurrences of each tag. Like following >example, if he want to count how many times each tag appears. > >| id | tags | >| --- | --- | >| 1 | tagA, tagC | >| 2 | tagA, tagB | >| 3 | tagB, tagD | >| 4 | tagE, tagF | > >If we can expand the above table according to the tags as follows, it's >easy to get how many each tag appears by a simple SQL like `SELECT count(*) >from tbl GROUP BY tag`. > >| id | tag | >| --- | --- | >| 1 | tagA >| 1 | tagC | >| 2 | tagA | >| 2 | tagB | >| 3 | tagB | >| 3 | tagD | >| 4 | tagE | >| 4 | tagF | > >However, Doris lacks similar functionality, so it is very difficult and >inefficient to achieve this functionality. > >In addition to enabling UDTF to make it easier for Doris users to analyze >data to achieve their desired goals, Doris can use UDTF to achieve better >display and analysis of nested type data in the future. Such as we can >expand a JSON field to rows like following. > >We can change > >``` >{"a": "foo", "b": "bar"} >``` > >into rows like > >| a | b | >| --- | --- | >| foo | bar | > >or rows like > >| key | value | >| --- | --- | >| a | foo | >| b | bar | > >## Summary > >And this work may be split into 3 phases to finish. > >1. Implement analysis and execution of table function. After this work >finishes, user can use built-in table function. However we don't support >lateral join then, table function's argument can only be constant, which >can not change in one query. So the ability of table function is still very >limited. >2. Support for user defined table function. After this work, developers can >create new table functions to solve his own problem. >3. Support lateral join. After this work, table function will be more >complete, user can use it to solve most problems. But how to do this >function is not completely clear, so it is to do. > >## Phase 1: Support Built-in Table Function > >### Query Plan > >In order to support user to use table function, we should support such >syntax. Now Doris supports table reference and subquery in SQL FROM clause, >table function should also be supported in FROM clause. And SQL analyzer >must be able to identify table function and check if it's used in a proper >way. For example table function can not be used in SQL WHERE and GROUP BY >clause. > >When analyzer analyzes table function's usage, SQL Planner should generate >execution plan for table function. This execution plan should be scheduled >by coordinator to some backend to execute it. > >### Table Function Execution > >To make table function execute in backend, we should implement a new >`ExecNode` whose name is `TableFuncionScanNode`. Its responsibility is to >execute a type of table functions and collect result of table functions and >then provide them to data pipeline of this execution instance. > >This node's main includes: first, it should prepare the environment and >parameters which are required by the TableFunction. This node will compute >these all before each function call. This node may be executed multiple >times because parameters are changed according to input value. In order to >achieve this, we need to support `ReOpen` or `ReScan` interface to start >scan again with different parameters. We will postpone this function util >we support later join. Second, this node will call table function to >execute it and collect function's result. Third, this function will convert >the result of function to what other `ExecNode` can recognize and handle. >Parent of this node get data from it in a batch way, this node should >return result in this way to make other nodes happy. > >### UDTF Execution > >In order to ensure the consistency of the user experience, we continue to >use the UDF implementation interface. To implement a table function, user >should implement three functions as follow. `process` interface must be >provided for a table function, the other two interfaces `prepare` and >`close` are optional. These interfaces are used to optimize execution of >function. For example, some context can be created in prepare function and >saved in ctx, which can be used in every process call. It will avoid >creating context for each call of process. > >``` >Void func_prepare(doris_udf::FunctionContext* ctx, >doris_udf::FunctionContext::FunctionStateScope scope) >Void func_process(doris_udf::FunctionContext* ctx, const doris_udf::xxxVal& >arg1 [, ...]) >Void fun_close(doris_udf::FunctionContext* ctx, >doris_udf::FunctionContext::FunctionStateScope scope) >``` > >Unlike other functions, the table function returns multiple rows of data >for a single call. This version assumes that the number of rows returned by >each call function will not be too large, such as less than 10000 rows. So >the use of a `process` function to fully obtain the data processed by the >function. If there is a large amount of data returned by the function in >the future, then we can add the incremental access interface such as >`get_next()`. If this interface is supported, result of function will be >get by calling `get_next` many times. > >The data generated by the `process` function is stored in `RecordStore`, >which can be obtained by `ctx->record_store()`. The function result used >memory should be allocated from `RecordStore`. > >The specific interface corresponding to `RecordStore` is defined as follows. > >``` >Class RecordStore { >Public: > // Allocate a record to store data. > // Returned record can be added to this store through calling > // append_record function. If returned record is not added back, > // client should call free_record to free it. > Record* allocate_record(); > > // Append a record to this store. The input record must be returned > // by allocate_record function of this RecordStore. Otherwise > // undefined error would happen. > void append_record(Record* record); > > // This function is to free the unused record created by allocate_record > // function. > void free_record(Record* record); > > // Allocate memory for variable length filed in record, such as string > // type. The allocated memory need not to be freed by client, they will > // be freed when this store is destroyed. > void* allocate(size_t size); >Private: > RecordStoreImpl* _impl; >}; >``` > >The interface of each Record is provided as follows. > >``` >Class Record { >Public: > // Set idx field to null > void set_null(int idx); > > // set idx filed to val as int > void set_int(int idx, int val); > > // set idx filed to ptr with len as string, this function will > // use input buffer directly without copy. Client should allocate > // memory from RecordStore. > void set_string(int idx, const uint8_t* ptr, int len); >}; >``` > >Below is a simple example to implement a table function. > >``` >// duplicate input argument many times. All results will be save >// in RecordStore. >void duplicate(FunctionContext* context, const IntVal& times, const >StringVal& val) { > RecordStore* store = context->record_store(); > for (int i = 0; i < times.val; ++i) { > Record* record = store->allocate_record(); > > // set index > record.set_int(0, i); > > // set value > char* ptr = store->allocate(val.len); > memcpy(ptr, val.ptr, val.len); > record.set_string(1, ptr, val.len); > > store->append_record(record); > } >} >``` > >The implementation framework requires two sets of code to be implemented, >one for Doris internal and the other for SDK. Why do we need two sets of >code to implement? Simple reason is because of execution performance. I >want that Table function's result can be processed by other execution nodes >directly without memory copy. However, in order to ensure the decoupling >relationship between the table function and the internal implementation, it >is not necessary to expose the internal implementation to the users of the >UDTF. This ensures future changes in the internal format and ensures that >the existing UDTF can continue to be used normally. So we need to implement >it two times to separate the SDK from the internal implementation of Doris >internal. > >After above work, we can support built-in table function in Doris. > >## Phase 2: Support User-Defined Table Function > >## SQL Syntax > >The syntax to create a UDTF is as follows. Compared to UDF, UDTF also needs >to define a function to return the schema of Table, the other is the same >as UDF. > >``` >CREATE FUNCTION func_name (arg_type [, arg_type]) >RETURNS TABLE (col_name col_type [, ...]) >[PROPERTIES ("key" = "value" [, ...]) ] >``` > >## Meta Store > >Internally, you need to save UDTF-related metadata. > >## Phase 3: Support Lateral Join > >After Supporting Layer Join, TableFunction can be called multiple times >during a function execution, using the following methods: > >``` >SELECT * from tbl as a, LATERAL table_func(a.c1, a.c2); >``` > >What we should support is as follows. > >1. Query parsing supports Lateral syntax >2. The planning part can correctly plan Lateral Join >3. Support Lateral Join Node or parameter system supporting the entire query > >And How to achieve this is TODO... > >If you have any thoughts on this, please let me know. Looking forward to >your feedback. > >I have already create an issue[1] in GitHub too. > >1. https://github.com/apache/incubator-doris/issues/1922 > >Thanks, >Zhao Chun