Hi, all I want to support UDTF(user-defined table function) in Doris. Following is detailed design.
## Motivation Currently in some scenarios, users have the need to expand a row of data into multiple rows. For example, if a user's field stores an TAG list, then if he want to count the number of occurrences of each tag. Like following example, if he want to count how many times each tag appears. | id | tags | | --- | --- | | 1 | tagA, tagC | | 2 | tagA, tagB | | 3 | tagB, tagD | | 4 | tagE, tagF | If we can expand the above table according to the tags as follows, it's easy to get how many each tag appears by a simple SQL like `SELECT count(*) from tbl GROUP BY tag`. | id | tag | | --- | --- | | 1 | tagA | 1 | tagC | | 2 | tagA | | 2 | tagB | | 3 | tagB | | 3 | tagD | | 4 | tagE | | 4 | tagF | However, Doris lacks similar functionality, so it is very difficult and inefficient to achieve this functionality. In addition to enabling UDTF to make it easier for Doris users to analyze data to achieve their desired goals, Doris can use UDTF to achieve better display and analysis of nested type data in the future. Such as we can expand a JSON field to rows like following. We can change ``` {"a": "foo", "b": "bar"} ``` into rows like | a | b | | --- | --- | | foo | bar | or rows like | key | value | | --- | --- | | a | foo | | b | bar | ## Summary And this work may be split into 3 phases to finish. 1. Implement analysis and execution of table function. After this work finishes, user can use built-in table function. However we don't support lateral join then, table function's argument can only be constant, which can not change in one query. So the ability of table function is still very limited. 2. Support for user defined table function. After this work, developers can create new table functions to solve his own problem. 3. Support lateral join. After this work, table function will be more complete, user can use it to solve most problems. But how to do this function is not completely clear, so it is to do. ## Phase 1: Support Built-in Table Function ### Query Plan In order to support user to use table function, we should support such syntax. Now Doris supports table reference and subquery in SQL FROM clause, table function should also be supported in FROM clause. And SQL analyzer must be able to identify table function and check if it's used in a proper way. For example table function can not be used in SQL WHERE and GROUP BY clause. When analyzer analyzes table function's usage, SQL Planner should generate execution plan for table function. This execution plan should be scheduled by coordinator to some backend to execute it. ### Table Function Execution To make table function execute in backend, we should implement a new `ExecNode` whose name is `TableFuncionScanNode`. Its responsibility is to execute a type of table functions and collect result of table functions and then provide them to data pipeline of this execution instance. This node's main includes: first, it should prepare the environment and parameters which are required by the TableFunction. This node will compute these all before each function call. This node may be executed multiple times because parameters are changed according to input value. In order to achieve this, we need to support `ReOpen` or `ReScan` interface to start scan again with different parameters. We will postpone this function util we support later join. Second, this node will call table function to execute it and collect function's result. Third, this function will convert the result of function to what other `ExecNode` can recognize and handle. Parent of this node get data from it in a batch way, this node should return result in this way to make other nodes happy. ### UDTF Execution In order to ensure the consistency of the user experience, we continue to use the UDF implementation interface. To implement a table function, user should implement three functions as follow. `process` interface must be provided for a table function, the other two interfaces `prepare` and `close` are optional. These interfaces are used to optimize execution of function. For example, some context can be created in prepare function and saved in ctx, which can be used in every process call. It will avoid creating context for each call of process. ``` Void func_prepare(doris_udf::FunctionContext* ctx, doris_udf::FunctionContext::FunctionStateScope scope) Void func_process(doris_udf::FunctionContext* ctx, const doris_udf::xxxVal& arg1 [, ...]) Void fun_close(doris_udf::FunctionContext* ctx, doris_udf::FunctionContext::FunctionStateScope scope) ``` Unlike other functions, the table function returns multiple rows of data for a single call. This version assumes that the number of rows returned by each call function will not be too large, such as less than 10000 rows. So the use of a `process` function to fully obtain the data processed by the function. If there is a large amount of data returned by the function in the future, then we can add the incremental access interface such as `get_next()`. If this interface is supported, result of function will be get by calling `get_next` many times. The data generated by the `process` function is stored in `RecordStore`, which can be obtained by `ctx->record_store()`. The function result used memory should be allocated from `RecordStore`. The specific interface corresponding to `RecordStore` is defined as follows. ``` Class RecordStore { Public: // Allocate a record to store data. // Returned record can be added to this store through calling // append_record function. If returned record is not added back, // client should call free_record to free it. Record* allocate_record(); // Append a record to this store. The input record must be returned // by allocate_record function of this RecordStore. Otherwise // undefined error would happen. void append_record(Record* record); // This function is to free the unused record created by allocate_record // function. void free_record(Record* record); // Allocate memory for variable length filed in record, such as string // type. The allocated memory need not to be freed by client, they will // be freed when this store is destroyed. void* allocate(size_t size); Private: RecordStoreImpl* _impl; }; ``` The interface of each Record is provided as follows. ``` Class Record { Public: // Set idx field to null void set_null(int idx); // set idx filed to val as int void set_int(int idx, int val); // set idx filed to ptr with len as string, this function will // use input buffer directly without copy. Client should allocate // memory from RecordStore. void set_string(int idx, const uint8_t* ptr, int len); }; ``` Below is a simple example to implement a table function. ``` // duplicate input argument many times. All results will be save // in RecordStore. void duplicate(FunctionContext* context, const IntVal& times, const StringVal& val) { RecordStore* store = context->record_store(); for (int i = 0; i < times.val; ++i) { Record* record = store->allocate_record(); // set index record.set_int(0, i); // set value char* ptr = store->allocate(val.len); memcpy(ptr, val.ptr, val.len); record.set_string(1, ptr, val.len); store->append_record(record); } } ``` The implementation framework requires two sets of code to be implemented, one for Doris internal and the other for SDK. Why do we need two sets of code to implement? Simple reason is because of execution performance. I want that Table function's result can be processed by other execution nodes directly without memory copy. However, in order to ensure the decoupling relationship between the table function and the internal implementation, it is not necessary to expose the internal implementation to the users of the UDTF. This ensures future changes in the internal format and ensures that the existing UDTF can continue to be used normally. So we need to implement it two times to separate the SDK from the internal implementation of Doris internal. After above work, we can support built-in table function in Doris. ## Phase 2: Support User-Defined Table Function ## SQL Syntax The syntax to create a UDTF is as follows. Compared to UDF, UDTF also needs to define a function to return the schema of Table, the other is the same as UDF. ``` CREATE FUNCTION func_name (arg_type [, arg_type]) RETURNS TABLE (col_name col_type [, ...]) [PROPERTIES ("key" = "value" [, ...]) ] ``` ## Meta Store Internally, you need to save UDTF-related metadata. ## Phase 3: Support Lateral Join After Supporting Layer Join, TableFunction can be called multiple times during a function execution, using the following methods: ``` SELECT * from tbl as a, LATERAL table_func(a.c1, a.c2); ``` What we should support is as follows. 1. Query parsing supports Lateral syntax 2. The planning part can correctly plan Lateral Join 3. Support Lateral Join Node or parameter system supporting the entire query And How to achieve this is TODO... If you have any thoughts on this, please let me know. Looking forward to your feedback. I have already create an issue[1] in GitHub too. 1. https://github.com/apache/incubator-doris/issues/1922 Thanks, Zhao Chun