This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch tdigest in repository https://gitbox.apache.org/repos/asf/datasketches-bigquery.git
commit b13dc37880c6703ce42432c81d289ee0dd196bff Author: AlexanderSaydakov <[email protected]> AuthorDate: Wed Oct 23 16:17:32 2024 -0700 tdigest functions --- Makefile | 2 +- README.md | 63 ++++++++++++- README_template.md | 13 ++- tdigest/Makefile | 64 +++++++++++++ tdigest/sqlx/tdigest_double_build.sqlx | 36 ++++++++ tdigest/sqlx/tdigest_double_build_k.sqlx | 106 ++++++++++++++++++++++ tdigest/sqlx/tdigest_double_get_max_value.sqlx | 49 ++++++++++ tdigest/sqlx/tdigest_double_get_min_value.sqlx | 49 ++++++++++ tdigest/sqlx/tdigest_double_get_quantile.sqlx | 51 +++++++++++ tdigest/sqlx/tdigest_double_get_rank.sqlx | 51 +++++++++++ tdigest/sqlx/tdigest_double_get_total_weight.sqlx | 49 ++++++++++ tdigest/sqlx/tdigest_double_merge.sqlx | 36 ++++++++ tdigest/sqlx/tdigest_double_merge_k.sqlx | 106 ++++++++++++++++++++++ tdigest/sqlx/tdigest_double_to_string.sqlx | 49 ++++++++++ tdigest/tdigest_double.cpp | 61 +++++++++++++ tdigest/test/tdigest_double_test.sql | 53 +++++++++++ 16 files changed, 834 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 3768f1b..eb508ed 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -MODULES := theta tuple cpc hll kll fi +MODULES := theta tuple cpc hll kll fi tdigest $(MODULES): $(MAKE) -C $@ diff --git a/README.md b/README.md index 30bb7ff..0bc141c 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ page for how to contact us. ## Requirements - Requires [Emscripten (emcc compiler)](https://emscripten.org/) -- Requires a link to **/datasketches-cpp** in this repository +- Requires a link to **datasketches-cpp** in this repository - Requires make utility - Requires [Google Cloud CLI](https://cloud.google.com/sdk/docs/install) @@ -71,6 +71,7 @@ This package includes BigQuery UD(A)Fs for the following Sketch types: | [**KLL Sketch**](#kll-sketch-functions) | Estimates the distribution of values, allowing you to find quantiles (like median, percentiles) without storing all the data. | | [**Theta Sketch**](#theta-sketch-functions) | Estimates unique items and supports set operations (union, intersection, difference) on those items. | | [**Tuple Sketch**](#tuple-sketch-functions) | Similar to Theta Sketch but allows associating values with each unique item, enabling operations like sum, min, max on those values. | +| [**TDigest**](#tdigest-functions) | Another algorithm and very compact data structure for estimating quantiles and ranks of numeric values. | ## CPC Sketch Functions @@ -790,3 +791,63 @@ select `$BQ_DATASET`.tuple_sketch_int64_to_string_seed( 111 ) from unnest(["a", "b", "c", "c"]) as key; ``` + + +## TDigest Functions + +**Description:** Similar to KLL sketch, estimates distributions of numeric values, +provides approximate quantiles and ranks. + +| Function Name | Function Type | Signature | Description | +|---|---|---|---| +| [tdigest_double_build](tdigest/sqlx/tdigest_double_build.sqlx) | AGGREGATE | (value FLOAT64) -> BYTES | Creates a sketch that represents the distribution of the given column.<br><br>Param value: the column of FLOAT64 values.<br>Defaults: k = 200.<br>Returns: a t\-Digest, as bytes.<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_merge](tdigest/sqlx/tdigest_double_merge.sqlx) | AGGREGATE | (sketch BYTES) -> BYTES | Merges sketches from the given column.<br><br>Param sketch: the column of values.<br>Defaults: k = 200.<br>Returns: a serialized t\-Digest as BYTES.<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_merge_k](tdigest/sqlx/tdigest_double_merge_k.sqlx) | AGGREGATE | (sketch BYTES, k INT NOT AGGREGATE) -> BYTES | Merges sketches from the given column.<br><br>Param sketch: the column of values.<br>Param k: the sketch accuracy/size parameter as an integer in the range \[10, 65535\].<br>Returns: a serialized t\-Digest as BYTES.<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_build_k](tdigest/sqlx/tdigest_double_build_k.sqlx) | AGGREGATE | (value FLOAT64, k INT NOT AGGREGATE) -> BYTES | Creates a sketch that represents the distribution of the given column.<br><br>Param value: the column of FLOAT64 values.<br>Param k: the sketch accuracy/size parameter as an INT in the range \[10, 65535\].<br>Returns: a t\-Digest, as bytes.<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_get_max_value](tdigest/sqlx/tdigest_double_get_max_value.sqlx) | SCALAR | (sketch BYTES) -> FLOAT64 | Returns the maximum value of the input stream.<br><br>Param sketch: the given sketch as BYTES.<br>Returns: max value as FLOAT64<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_to_string](tdigest/sqlx/tdigest_double_to_string.sqlx) | SCALAR | (sketch BYTES) -> STRING | Returns a summary string that represents the state of the given sketch.<br><br>Param sketch: the given sketch as sketch encoded bytes.<br>Returns: a string that represents the state of the given sketch.<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_get_total_weight](tdigest/sqlx/tdigest_double_get_total_weight.sqlx) | SCALAR | (sketch BYTES) -> INT64 | Returns the total weight of the input stream.<br><br>Param sketch: the given sketch as BYTES.<br>Returns: total weight as INT64<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_get_min_value](tdigest/sqlx/tdigest_double_get_min_value.sqlx) | SCALAR | (sketch BYTES) -> FLOAT64 | Returns the minimum value of the input stream.<br><br>Param sketch: the given sketch as BYTES.<br>Returns: min value as FLOAT64<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_get_rank](tdigest/sqlx/tdigest_double_get_rank.sqlx) | SCALAR | (sketch BYTES, value FLOAT64) -> FLOAT64 | Returns an approximation to the normalized rank, on the interval \[0.0, 1.0\], of the given value.<br><br>Param sketch: the given sketch in serialized form.<br>Param value: value to be ranked.<br>Returns: an approximate rank of the given value.<br><br>For more information:<br> \- https://github.com/tdunning/t\-digest | +| [tdigest_double_get_quantile](tdigest/sqlx/tdigest_double_get_quantile.sqlx) | SCALAR | (sketch BYTES, rank FLOAT64) -> FLOAT64 | Returns a value from the sketch that is the best approximation to a value from the original stream with the given rank.<br><br>Param sketch: the given sketch in serialized form.<br>Param rank: rank of a value in the hypothetical sorted stream.<br>Returns: an approximate quantile associated with the given rank.<br><br>For more information:<br> \- https://gith [...] + +**Examples:** + +```sql + +create or replace table `$BQ_DATASET`.tdigest_double(sketch bytes); + +# using default +insert into `$BQ_DATASET`.tdigest_double +(select `$BQ_DATASET`.tdigest_double_build(value) from unnest([1,2,3,4,5,6,7,8,9,10]) as value); + +# using full signature +insert into `$BQ_DATASET`.tdigest_double +(select `$BQ_DATASET`.tdigest_double_build_k(value, 100) from unnest([11,12,13,14,15,16,17,18,19,20]) as value); + +select `$BQ_DATASET`.tdigest_double_to_string(sketch) from `$BQ_DATASET`.tdigest_double; + +# using default +select `$BQ_DATASET`.tdigest_double_to_string(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +# using full signature +select `$BQ_DATASET`.tdigest_double_to_string(`$BQ_DATASET`.tdigest_double_merge_k(sketch, 100)) from `$BQ_DATASET`.tdigest_double; + +# expected 0.5 +select `$BQ_DATASET`.tdigest_double_get_rank(`$BQ_DATASET`.tdigest_double_merge(sketch), 10) from `$BQ_DATASET`.tdigest_double; + +# expected 10 +select `$BQ_DATASET`.tdigest_double_get_quantile(`$BQ_DATASET`.tdigest_double_merge(sketch), 0.5) from `$BQ_DATASET`.tdigest_double; + +# expected 20 +select `$BQ_DATASET`.tdigest_double_get_total_weight(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +# expected 1 +select `$BQ_DATASET`.tdigest_double_get_min_value(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +# expected 20 +select `$BQ_DATASET`.tdigest_double_get_max_value(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +drop table `$BQ_DATASET`.tdigest_double; +``` + diff --git a/README_template.md b/README_template.md index f7d6489..62e2760 100644 --- a/README_template.md +++ b/README_template.md @@ -34,7 +34,7 @@ page for how to contact us. ## Requirements - Requires [Emscripten (emcc compiler)](https://emscripten.org/) -- Requires a link to **/datasketches-cpp** in this repository +- Requires a link to **datasketches-cpp** in this repository - Requires make utility - Requires [Google Cloud CLI](https://cloud.google.com/sdk/docs/install) @@ -71,6 +71,7 @@ This package includes BigQuery UD(A)Fs for the following Sketch types: | [**KLL Sketch**](#kll-sketch-functions) | Estimates the distribution of values, allowing you to find quantiles (like median, percentiles) without storing all the data. | | [**Theta Sketch**](#theta-sketch-functions) | Estimates unique items and supports set operations (union, intersection, difference) on those items. | | [**Tuple Sketch**](#tuple-sketch-functions) | Similar to Theta Sketch but allows associating values with each unique item, enabling operations like sum, min, max on those values. | +| [**TDigest**](#tdigest-functions) | Another algorithm and very compact data structure for estimating quantiles and ranks of numeric values. | ## CPC Sketch Functions @@ -129,4 +130,12 @@ enables calculations like the sum, minimum, or maximum of values associated with the distinct items. | Function Name | Function Type | Signature | Description | -|---|---|---|---| \ No newline at end of file +|---|---|---|---| + +## TDigest Functions + +**Description:** Similar to KLL sketch, estimates distributions of numeric values, +provides approximate quantiles and ranks. + +| Function Name | Function Type | Signature | Description | +|---|---|---|---| diff --git a/tdigest/Makefile b/tdigest/Makefile new file mode 100644 index 0000000..be14f58 --- /dev/null +++ b/tdigest/Makefile @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +EMCC=emcc +EMCFLAGS=-I../datasketches-cpp/common/include \ + -I../datasketches-cpp/tdigest/include \ + --no-entry \ + -sWASM_BIGINT=1 \ + -sEXPORTED_FUNCTIONS=[_malloc,_free] \ + -sENVIRONMENT=shell \ + -sTOTAL_MEMORY=1024MB \ + -O3 \ + --bind + +ARTIFACTS=tdigest_double.mjs tdigest_double.js tdigest_double.wasm + +all: $(ARTIFACTS) + +%.mjs: %.cpp + $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@ + +# this rule creates a non-es6 loadable library +%.js: %.cpp + $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@ + +%.wasm: %.cpp + $(EMCC) $< $(EMCFLAGS) -sSTANDALONE_WASM=1 -o $@ + +clean: + $(RM) $(ARTIFACTS) + +upload: all + @for file in $(ARTIFACTS); do \ + gcloud storage cp $$file $(JS_BUCKET)/ ; \ + done + +create: + @for file in $(wildcard sqlx/*.sqlx); do \ + echo creating $$file; \ + ../substitute_and_run.sh $$file ; \ + done + +install: upload create + +test: + @for file in $(wildcard test/*sql); do \ + ../substitute_and_run.sh $$file ; \ + done + +.PHONY: all clean install upload create test diff --git a/tdigest/sqlx/tdigest_double_build.sqlx b/tdigest/sqlx/tdigest_double_build.sqlx new file mode 100644 index 0000000..8710ed0 --- /dev/null +++ b/tdigest/sqlx/tdigest_double_build.sqlx @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE AGGREGATE FUNCTION ${self()}(value FLOAT64) +RETURNS BYTES +OPTIONS ( + description = '''Creates a sketch that represents the distribution of the given column. + +Param value: the column of FLOAT64 values. +Defaults: k = 200. +Returns: a t-Digest, as bytes. + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS ( + ${ref("tdigest_double_build_k")}(value, NULL) +); diff --git a/tdigest/sqlx/tdigest_double_build_k.sqlx b/tdigest/sqlx/tdigest_double_build_k.sqlx new file mode 100644 index 0000000..f13f9fd --- /dev/null +++ b/tdigest/sqlx/tdigest_double_build_k.sqlx @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE AGGREGATE FUNCTION ${self()}(value FLOAT64, k INT NOT AGGREGATE) +RETURNS BYTES +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.mjs"], + description = '''Creates a sketch that represents the distribution of the given column. + +Param value: the column of FLOAT64 values. +Param k: the sketch accuracy/size parameter as an INT in the range [10, 65535]. +Returns: a t-Digest, as bytes. + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +import ModuleFactory from "${JS_BUCKET}/tdigest_double.mjs"; +var Module = await ModuleFactory(); +const default_k = Number(Module.DEFAULT_K); + +// UDAF interface +export function initialState(k) { + return { + k: k == null ? default_k : Number(k) + }; +} + +export function aggregate(state, value) { + try { + if (state.sketch == null) { + state.sketch = new Module.tdigest_double(state.k); + } + state.sketch.update(value); + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); + } +} + +export function serialize(state) { + if (state.sketch == null) return state; // for transition deserialize-serialize + try { + // for prior transition deserialize-aggregate + // merge aggregated and serialized state + if (state.sketch != null && state.serialized != null) { + sketch.merge(state.serialized); + } + return { + k: state.k, + serialized: state.sketch.serializeAsUint8Array() + }; + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); + } finally { + state.sketch.delete(); + } +} + +export function deserialize(serialized) { + return serialized; +} + +export function merge(state, other_state) { + try { + if (state.sketch == null) { + state.sketch = new Module.tdigest_double(state.k); + } + if (state.serialized != null) { + state.sketch.merge(state.serialized); + delete state.serialized; + } + if (other_state.serialized != null) { + state.sketch.merge(other_state.serialized); + delete other_state.serialized; + } + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); + } +} + +export function finalize(state) { + return serialize(state).serialized; +} +"""; diff --git a/tdigest/sqlx/tdigest_double_get_max_value.sqlx b/tdigest/sqlx/tdigest_double_get_max_value.sqlx new file mode 100644 index 0000000..328538d --- /dev/null +++ b/tdigest/sqlx/tdigest_double_get_max_value.sqlx @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE FUNCTION ${self()}(sketch BYTES) +RETURNS FLOAT64 +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.js"], + js_parameter_encoding_mode='STANDARD', + description = '''Returns the maximum value of the input stream. + +Param sketch: the given sketch as BYTES. +Returns: max value as FLOAT64 + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +try { + var sketchObject = null; + try { + sketchObject = Module.tdigest_double.deserialize(sketch); + return sketchObject.getMaxValue(); + } finally { + if (sketchObject != null) sketchObject.delete(); + } +} catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); +} +"""; diff --git a/tdigest/sqlx/tdigest_double_get_min_value.sqlx b/tdigest/sqlx/tdigest_double_get_min_value.sqlx new file mode 100644 index 0000000..98e6ce8 --- /dev/null +++ b/tdigest/sqlx/tdigest_double_get_min_value.sqlx @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE FUNCTION ${self()}(sketch BYTES) +RETURNS FLOAT64 +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.js"], + js_parameter_encoding_mode='STANDARD', + description = '''Returns the minimum value of the input stream. + +Param sketch: the given sketch as BYTES. +Returns: min value as FLOAT64 + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +try { + var sketchObject = null; + try { + sketchObject = Module.tdigest_double.deserialize(sketch); + return sketchObject.getMinValue(); + } finally { + if (sketchObject != null) sketchObject.delete(); + } +} catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); +} +"""; diff --git a/tdigest/sqlx/tdigest_double_get_quantile.sqlx b/tdigest/sqlx/tdigest_double_get_quantile.sqlx new file mode 100644 index 0000000..b5e15db --- /dev/null +++ b/tdigest/sqlx/tdigest_double_get_quantile.sqlx @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE FUNCTION ${self()}(sketch BYTES, rank FLOAT64) +RETURNS FLOAT64 +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.js"], + js_parameter_encoding_mode='STANDARD', + description = '''Returns a value from the sketch that is the best approximation to a value from the original stream with the given rank. + +Param sketch: the given sketch in serialized form. +Param rank: rank of a value in the hypothetical sorted stream. +Returns: an approximate quantile associated with the given rank. + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +try { + var sketchObject = null; + try { + sketchObject = Module.tdigest_double.deserialize(sketch); + if (sketchObject.isEmpty()) return null; + return sketchObject.getQuantile(rank); + } finally { + if (sketchObject != null) sketchObject.delete(); + } +} catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); +} +"""; diff --git a/tdigest/sqlx/tdigest_double_get_rank.sqlx b/tdigest/sqlx/tdigest_double_get_rank.sqlx new file mode 100644 index 0000000..25d5117 --- /dev/null +++ b/tdigest/sqlx/tdigest_double_get_rank.sqlx @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE FUNCTION ${self()}(sketch BYTES, value FLOAT64) +RETURNS FLOAT64 +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.js"], + js_parameter_encoding_mode='STANDARD', + description = '''Returns an approximation to the normalized rank, on the interval [0.0, 1.0], of the given value. + +Param sketch: the given sketch in serialized form. +Param value: value to be ranked. +Returns: an approximate rank of the given value. + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +try { + var sketchObject = null; + try { + sketchObject = Module.tdigest_double.deserialize(sketch); + if (sketchObject.isEmpty()) return null; + return sketchObject.getRank(value); + } finally { + if (sketchObject != null) sketchObject.delete(); + } +} catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); +} +"""; diff --git a/tdigest/sqlx/tdigest_double_get_total_weight.sqlx b/tdigest/sqlx/tdigest_double_get_total_weight.sqlx new file mode 100644 index 0000000..78162d8 --- /dev/null +++ b/tdigest/sqlx/tdigest_double_get_total_weight.sqlx @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE FUNCTION ${self()}(sketch BYTES) +RETURNS INT64 +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.js"], + js_parameter_encoding_mode='STANDARD', + description = '''Returns the total weight of the input stream. + +Param sketch: the given sketch as BYTES. +Returns: total weight as INT64 + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +try { + var sketchObject = null; + try { + sketchObject = Module.tdigest_double.deserialize(sketch); + return sketchObject.getTotalWeight(); + } finally { + if (sketchObject != null) sketchObject.delete(); + } +} catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); +} +"""; diff --git a/tdigest/sqlx/tdigest_double_merge.sqlx b/tdigest/sqlx/tdigest_double_merge.sqlx new file mode 100644 index 0000000..44ad4f6 --- /dev/null +++ b/tdigest/sqlx/tdigest_double_merge.sqlx @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE AGGREGATE FUNCTION ${self()}(sketch BYTES) +RETURNS BYTES +OPTIONS ( + description = '''Merges sketches from the given column. + +Param sketch: the column of values. +Defaults: k = 200. +Returns: a serialized t-Digest as BYTES. + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS ( + ${ref("tdigest_double_merge_k")}(sketch, NULL) +); diff --git a/tdigest/sqlx/tdigest_double_merge_k.sqlx b/tdigest/sqlx/tdigest_double_merge_k.sqlx new file mode 100644 index 0000000..47f72ad --- /dev/null +++ b/tdigest/sqlx/tdigest_double_merge_k.sqlx @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE AGGREGATE FUNCTION ${self()}(sketch BYTES, k INT NOT AGGREGATE) +RETURNS BYTES +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.mjs"], + description = '''Merges sketches from the given column. + +Param sketch: the column of values. +Param k: the sketch accuracy/size parameter as an integer in the range [10, 65535]. +Returns: a serialized t-Digest as BYTES. + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +import ModuleFactory from "${JS_BUCKET}/tdigest_double.mjs"; +var Module = await ModuleFactory(); +const default_k = Number(Module.DEFAULT_K); + +// UDAF interface +export function initialState(k) { + return { + k: k == null ? default_k : Number(k), + }; +} + +export function aggregate(state, sketch) { + try { + if (state.sketch == null) { + state.sketch = new Module.tdigest_double(state.k); + } + state.sketch.merge(sketch); + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); + } +} + +export function serialize(state) { + if (state.sketch == null) return state; // for transition deserialize-serialize + try { + // for prior transition deserialize-aggregate + // merge aggregated and serialized state + if (state.sketch != null && state.serialized != null) { + sketch.merge(state.serialized); + } + return { + k: state.k, + serialized: state.sketch.serializeAsUint8Array() + }; + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); + } finally { + state.sketch.delete(); + } +} + +export function deserialize(serialized) { + return serialized; +} + +export function merge(state, other_state) { + try { + if (state.sketch == null) { + state.sketch = new Module.tdigest_double(state.k); + } + if (state.serialized != null) { + state.sketch.merge(state.serialized); + delete state.serialized; + } + if (other_state.serialized != null) { + state.sketch.merge(other_state.serialized); + delete other_state.serialized; + } + } catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); + } +} + +export function finalize(state) { + return serialize(state).serialized; +} +"""; diff --git a/tdigest/sqlx/tdigest_double_to_string.sqlx b/tdigest/sqlx/tdigest_double_to_string.sqlx new file mode 100644 index 0000000..8d8017a --- /dev/null +++ b/tdigest/sqlx/tdigest_double_to_string.sqlx @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +config { hasOutput: true } + +CREATE OR REPLACE FUNCTION ${self()}(sketch BYTES) +RETURNS STRING +LANGUAGE js +OPTIONS ( + library=["${JS_BUCKET}/tdigest_double.js"], + js_parameter_encoding_mode='STANDARD', + description = '''Returns a summary string that represents the state of the given sketch. + +Param sketch: the given sketch as sketch encoded bytes. +Returns: a string that represents the state of the given sketch. + +For more information: + - https://github.com/tdunning/t-digest +''' +) AS R""" +try { + var sketchObject = null; + try { + sketchObject = Module.tdigest_double.deserialize(sketch); + return sketchObject.toString(); + } finally { + if (sketchObject != null) sketchObject.delete(); + } +} catch (e) { + if (e.message != null) throw e; + throw new Error(Module.getExceptionMessage(e)); +} +"""; diff --git a/tdigest/tdigest_double.cpp b/tdigest/tdigest_double.cpp new file mode 100644 index 0000000..763d88d --- /dev/null +++ b/tdigest/tdigest_double.cpp @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <emscripten/bind.h> + +#include <tdigest.hpp> + +using tdigest_double = datasketches::tdigest_double; + +const emscripten::val Uint8Array = emscripten::val::global("Uint8Array"); + +EMSCRIPTEN_BINDINGS(tdigest_double) { + emscripten::function("getExceptionMessage", emscripten::optional_override([](intptr_t ptr) { + return std::string(reinterpret_cast<std::exception*>(ptr)->what()); + })); + + emscripten::constant("DEFAULT_K", tdigest_double::DEFAULT_K); + + emscripten::class_<tdigest_double>("tdigest_double") + .constructor(emscripten::optional_override([](uint16_t k) { + return new tdigest_double(k); + })) + .function("isEmpty", &tdigest_double::is_empty) + .function("update", &tdigest_double::update) + .function("merge", emscripten::optional_override([](tdigest_double& self, const std::string& bytes) { + auto td = tdigest_double::deserialize(bytes.data(), bytes.size()); + self.merge(td); + })) + .function("serializeAsUint8Array", emscripten::optional_override([](const tdigest_double& self) { + const auto bytes = self.serialize(); + return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(), bytes.data())); + })) + .class_function("deserialize", emscripten::optional_override([](const std::string& bytes) { + return new tdigest_double(tdigest_double::deserialize(bytes.data(), bytes.size())); + }), emscripten::allow_raw_pointers()) + .function("getTotalWeight", &tdigest_double::get_total_weight) + .function("getMinValue", &tdigest_double::get_min_value) + .function("getMaxValue", &tdigest_double::get_max_value) + .function("getRank", &tdigest_double::get_rank) + .function("getQuantile", &tdigest_double::get_quantile) + .function("toString", emscripten::optional_override([](const tdigest_double& self) { + return self.to_string(); + })) + ; +} diff --git a/tdigest/test/tdigest_double_test.sql b/tdigest/test/tdigest_double_test.sql new file mode 100644 index 0000000..e3d79e9 --- /dev/null +++ b/tdigest/test/tdigest_double_test.sql @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +create or replace table `$BQ_DATASET`.tdigest_double(sketch bytes); + +# using default +insert into `$BQ_DATASET`.tdigest_double +(select `$BQ_DATASET`.tdigest_double_build(value) from unnest([1,2,3,4,5,6,7,8,9,10]) as value); + +# using full signature +insert into `$BQ_DATASET`.tdigest_double +(select `$BQ_DATASET`.tdigest_double_build_k(value, 100) from unnest([11,12,13,14,15,16,17,18,19,20]) as value); + +select `$BQ_DATASET`.tdigest_double_to_string(sketch) from `$BQ_DATASET`.tdigest_double; + +# using default +select `$BQ_DATASET`.tdigest_double_to_string(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +# using full signature +select `$BQ_DATASET`.tdigest_double_to_string(`$BQ_DATASET`.tdigest_double_merge_k(sketch, 100)) from `$BQ_DATASET`.tdigest_double; + +# expected 0.5 +select `$BQ_DATASET`.tdigest_double_get_rank(`$BQ_DATASET`.tdigest_double_merge(sketch), 10) from `$BQ_DATASET`.tdigest_double; + +# expected 10 +select `$BQ_DATASET`.tdigest_double_get_quantile(`$BQ_DATASET`.tdigest_double_merge(sketch), 0.5) from `$BQ_DATASET`.tdigest_double; + +# expected 20 +select `$BQ_DATASET`.tdigest_double_get_total_weight(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +# expected 1 +select `$BQ_DATASET`.tdigest_double_get_min_value(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +# expected 20 +select `$BQ_DATASET`.tdigest_double_get_max_value(`$BQ_DATASET`.tdigest_double_merge(sketch)) from `$BQ_DATASET`.tdigest_double; + +drop table `$BQ_DATASET`.tdigest_double; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
