Rishabh Maurya created FLINK-29186:
--------------------------------------
Summary: Store summarized results in internal nodes of BKD for
time series points
Key: FLINK-29186
URL: https://issues.apache.org/jira/browse/FLINK-29186
Project: Flink
Issue Type: New Feature
Components: API / Core
Reporter: Rishabh Maurya
Time series points have a timestamp, measurement and dimensions associated with
them. The common queries are range queries on timestamp, metric aggregation on
measurement and grouping on dimensions. Or similar query with histogram on
timestamp.
*Proposal:*
Prototype can be found [here|https://github.com/rishabhmaurya/lucene/pull/1]
1. Introduce a new time series point as a field in lucene - `TSPoint` which can
be added as -
```
Document doc = new Document(); doc.add(new TSIntPoint("tsid1", "cpu",
timestamp, measurement));
```
`tsid1` is the time series ID of the point. It will be the unit of storage for
time series points and for prototype each of them represents a unique field in
lucene.
`timestamp` is the actual point in BKD on which the index is created.
Full definition here can be found
[here.|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/document/TSIntPoint.java]
2. Interface for decomposable aggregate function, which can be defined as part
of index configuration -
Sum function -
```java
new BKDSummaryWriter.SummaryMergeFunction<Integer>() {
@Override
public int getSummarySize() {
return Integer.BYTES;
}
@Override
public void merge(byte[] a, byte[] b, byte[] c) {
packBytes(unpackBytes(a) + unpackBytes(b), c);
}
@Override
public Integer unpackBytes(byte[] val) {
return NumericUtils.sortableBytesToInt(val, 0);
}
@Override
public void packBytes(Integer val, byte[] res) {
NumericUtils.intToSortableBytes(val, res, 0);
}
};
```
3. New query per `LeafReader` to perform range queries on TSPoint and retrieve
summarized results -
```
LeafReader leafReader; PointValues points = leafReader.getPointValues("tsid1");
TSPointQuery tsPointQuery = new TSPointQuery("tsid1", lowerBoundTimestamp,
upperBoundTimestamp);
byte[] res = tsPointQuery.getSummary((BKDWithSummaryReader.BKDSummaryTree)
points.getPointTree(), mergeFunction);
```
Instead of BKDReader and BKDWriter, we will be using
[BKDSummaryWriter|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/util/bkd/BKDSummaryWriter.java]
[BKDSummaryReader|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWithSummaryReader.java]
which supports writing summaries with internal nodes of the tree.
Changes in IntersectVisitor interface
[here.|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/index/PointValues.java#L320-L337]
h4. Comparison with DocValues
Below is the comparison of running unit test for
[DocValue|https://github.com/rishabhmaurya/lucene/pull/1/commits/157215cd2c4787787748625e10b58001583c6e6e#diff-87c31ac3b1cd1ef45b15c84d9cf1c5bab03feb94f199256f859f14ed4747abd2R129]
approach vs
[TSPoint|https://github.com/rishabhmaurya/lucene/pull/1/commits/157215cd2c4787787748625e10b58001583c6e6e#diff-87c31ac3b1cd1ef45b15c84d9cf1c5bab03feb94f199256f859f14ed4747abd2R52]
approach -
This test ingests {{10000000}} docs against a given TSID and performs a range
query on timestamp 100 times against the same TSID. Merge function used is
{{{}sum{}}}.
||DocValues approach||TSPoint approach||
|Indexing took: 42948 ms|Indexing took: 32985|
|Matching docs count:1304624 \| Segments:3 \| DiskAccess: 1304624|Matching docs
count:8784032 \| Segments:10 \| DiskAccess: 302|
|Search took: 12382 ms|Search took: 50ms|
This is not apple to apple comparison since number of segments are 3 in
DocValues approach whereas its 10 in TSPoint approach.
h4. Limitation of this feature
* Doc deletion currently not supported. We need to evaluate how important is
it and possibly find a way to support it in future.
* Only
[decomposable|https://en.wikipedia.org/wiki/Aggregate_function#Decomposable_aggregate_functions]
aggregation functions can be supported. E.g. min, max, sum, avg, count.
* Range query will only be supported on {{{}timestamp{}}}.
h4. TODOs
* Implementation for multiple TSIDs. For now we need to create a new field
with the name same as TSID for a timeseries.
* Segment merge for BKD with summaries. Currently, the UTs disables merge and
perform search across multiple segments and cumulate the results.
* Pluggable merge function to merge 2 {{{}TSPoint{}}}. Currently its hardcoded
in {{FieldInfo.java}} which isn't the right place to define them.
* Measurement compression in BKD. I'm thinking of using delta encoding to
store measurement values and summaries while packing the summaries associated
with nodes of the tree.
* Persist first and last docID in internal nodes of BKD with summaries in an
efficient way. This will be useful to use precomputed summaries and skip over
batches of documents when iterating using DocIDSetIterator.
* Benchmark against real timeseries dataset.
** compare against SortedDocValues approach.
** compare against other timeseries databases.
* Evaluate support of deletion of document/timeseries/batch of documents
(matching a timestamp range).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)