Hi, all

I want to support UDTF(user-defined table function) in Doris. Following is
detailed design.

## Motivation

Currently in some scenarios, users have the need to expand a row of data
into multiple rows. For example, if a user's field stores an TAG list, then
if he want to count the number of occurrences of each tag. Like following
example, if he want to count how many times each tag appears.

| id | tags |
| --- | --- |
| 1 | tagA, tagC |
| 2 | tagA, tagB |
| 3 | tagB, tagD |
| 4 | tagE, tagF |

If we can expand the above table according to the tags as follows, it's
easy to get how many each tag appears by a simple SQL like `SELECT count(*)
from tbl GROUP BY tag`.

| id | tag |
| --- | --- |
| 1 | tagA
| 1 | tagC |
| 2 | tagA |
| 2 | tagB |
| 3 | tagB |
| 3 | tagD |
| 4 | tagE |
| 4 | tagF |

However, Doris lacks similar functionality, so it is very difficult and
inefficient to achieve this functionality.

In addition to enabling UDTF to make it easier for Doris users to analyze
data to achieve their desired goals, Doris can use UDTF to achieve better
display and analysis of nested type data in the future. Such as we can
expand a JSON field to rows like following.

We can change

```
{"a": "foo", "b": "bar"}
```

into rows like

| a | b |
| --- | --- |
| foo | bar |

or rows like

| key | value |
| --- | --- |
| a | foo |
| b | bar |

## Summary

And this work may be split into 3 phases to finish.

1. Implement analysis and execution of table function. After this work
finishes, user can use built-in table function. However we don't support
lateral join then, table function's argument can only be constant, which
can not change in one query. So the ability of table function is still very
limited.
2. Support for user defined table function. After this work, developers can
create new table functions to solve his own problem.
3. Support lateral join. After this work, table function will be more
complete, user can use it to solve most problems. But how to do this
function is not completely clear, so it is to do.

## Phase 1: Support Built-in Table Function

### Query Plan

In order to support user to use table function, we should support such
syntax. Now Doris supports table reference and subquery in SQL FROM clause,
table function should also be supported in FROM clause. And SQL analyzer
must be able to identify table function and check if it's used in a proper
way. For example table function can not be used in SQL WHERE and GROUP BY
clause.

When analyzer analyzes table function's usage, SQL Planner should generate
execution plan for table function. This execution plan should be scheduled
by coordinator to some backend to execute it.

### Table Function Execution

To make table function execute in backend, we should implement a new
`ExecNode` whose name is `TableFuncionScanNode`. Its responsibility is to
execute a type of table functions and collect result of table functions and
then provide them to data pipeline of this execution instance.

This node's main includes: first, it should prepare the environment and
parameters which are required by the TableFunction. This node will compute
these all before each function call. This node may be executed multiple
times because parameters are changed according to input value. In order to
achieve this, we need to support `ReOpen` or `ReScan` interface to start
scan again with different parameters. We will postpone this function util
we support later join. Second, this node will call table function to
execute it and collect function's result. Third, this function will convert
the result of function to what other `ExecNode` can recognize and handle.
Parent of this node get data from it in a batch way, this node should
return result in this way to make other nodes happy.

### UDTF Execution

In order to ensure the consistency of the user experience, we continue to
use the UDF implementation interface. To implement a table function, user
should implement three functions as follow. `process` interface must be
provided for a table function, the other two interfaces `prepare` and
`close` are optional. These interfaces are used to optimize execution of
function. For example, some context can be created in prepare function and
saved in ctx, which can be used in every process call. It will avoid
creating context for each call of process.

```
Void func_prepare(doris_udf::FunctionContext* ctx,
doris_udf::FunctionContext::FunctionStateScope scope)
Void func_process(doris_udf::FunctionContext* ctx, const doris_udf::xxxVal&
arg1 [, ...])
Void fun_close(doris_udf::FunctionContext* ctx,
doris_udf::FunctionContext::FunctionStateScope scope)
```

Unlike other functions, the table function returns multiple rows of data
for a single call. This version assumes that the number of rows returned by
each call function will not be too large, such as less than 10000 rows. So
the use of a `process` function to fully obtain the data processed by the
function. If there is a large amount of data returned by the function in
the future, then we can add the incremental access interface such as
`get_next()`. If this interface is supported, result of function will be
get by calling `get_next` many times.

The data generated by the `process` function is stored in `RecordStore`,
which can be obtained by `ctx->record_store()`. The function result used
memory should be allocated from `RecordStore`.

The specific interface corresponding to `RecordStore` is defined as follows.

```
Class RecordStore {
Public:
    // Allocate a record to store data.
    // Returned record can be added to this store through calling
    // append_record function. If returned record is not added back,
    // client should call free_record to free it.
    Record* allocate_record();

    // Append a record to this store. The input record must be returned
    // by allocate_record function of this RecordStore. Otherwise
    // undefined error would happen.
    void append_record(Record* record);

    // This function is to free the unused record created by allocate_record
    // function.
    void free_record(Record* record);

    // Allocate memory for variable length filed in record, such as string
    // type. The allocated memory need not to be freed by client, they will
    // be freed when this store is destroyed.
    void* allocate(size_t size);
Private:
    RecordStoreImpl* _impl;
};
```

The interface of each Record is provided as follows.

```
Class Record {
Public:
    // Set idx field to null
    void set_null(int idx);

    // set idx filed to val as int
    void set_int(int idx, int val);

    // set idx filed to ptr with len as string, this function will
    // use input buffer directly without copy. Client should allocate
    // memory from RecordStore.
    void set_string(int idx, const uint8_t* ptr, int len);
};
```

Below is a simple example to implement a table function.

```
// duplicate input argument many times. All results will be save
// in RecordStore.
void duplicate(FunctionContext* context, const IntVal& times, const
StringVal& val) {
    RecordStore* store = context->record_store();
    for (int i = 0; i < times.val; ++i) {
        Record* record = store->allocate_record();

        // set index
        record.set_int(0, i);

        // set value
        char* ptr = store->allocate(val.len);
        memcpy(ptr, val.ptr, val.len);
        record.set_string(1, ptr, val.len);

        store->append_record(record);
    }
}
```

The implementation framework requires two sets of code to be implemented,
one for Doris internal and the other for SDK. Why do we need two sets of
code to implement? Simple reason is because of execution performance. I
want that Table function's result can be processed by other execution nodes
directly without memory copy. However, in order to ensure the decoupling
relationship between the table function and the internal implementation, it
is not necessary to expose the internal implementation to the users of the
UDTF. This ensures future changes in the internal format and ensures that
the existing UDTF can continue to be used normally. So we need to implement
it two times to separate the SDK from the internal implementation of Doris
internal.

After above work, we can support built-in table function in Doris.

## Phase 2: Support User-Defined Table Function

## SQL Syntax

The syntax to create a UDTF is as follows. Compared to UDF, UDTF also needs
to define a function to return the schema of Table, the other is the same
as UDF.

```
CREATE FUNCTION func_name (arg_type [, arg_type])
RETURNS TABLE (col_name col_type [, ...])
[PROPERTIES ("key" = "value" [, ...]) ]
```

## Meta Store

Internally, you need to save UDTF-related metadata.

## Phase 3: Support Lateral Join

After Supporting Layer Join, TableFunction can be called multiple times
during a function execution, using the following methods:

```
SELECT * from tbl as a, LATERAL table_func(a.c1, a.c2);
```

What we should support is as follows.

1. Query parsing supports Lateral syntax
2. The planning part can correctly plan Lateral Join
3. Support Lateral Join Node or parameter system supporting the entire query

And How to achieve this is TODO...

If you have any thoughts on this, please let me know. Looking forward to
your feedback.

I have already create an issue[1] in GitHub too.

1. https://github.com/apache/incubator-doris/issues/1922

Thanks,
Zhao Chun

Reply via email to