This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6576e9c feat(parquet/metadata): bloom filter implementation (#336)
6576e9c is described below
commit 6576e9cf6da9d9e5e7398242381abd5d800b96b2
Author: Matt Topol <[email protected]>
AuthorDate: Tue Apr 1 10:51:07 2025 -0400
feat(parquet/metadata): bloom filter implementation (#336)
### Rationale for this change
As with many other parquet reader/writers we should add support for
bloom filters.
### What changes are included in this PR?
This only adds an implementation to the `metadata` package to represent
bloom filters and process them for metadata reading and writing. This
does not yet wire it up through the actual parquet file reader and
writer. That will be done in a subsequent PR.
### Are these changes tested?
Yes, unit tests are included.
### Are there any user-facing changes?
Only the addition of the new functions that are exposed in the metadata
package.
---
.github/workflows/test.yml | 2 +-
go.mod | 1 +
go.sum | 2 +
parquet/file/file_writer.go | 1 +
parquet/file/page_writer.go | 4 +-
parquet/internal/encryption/aes.go | 2 +
parquet/metadata/Makefile | 64 +++
parquet/metadata/_lib/arch.h | 29 ++
parquet/metadata/_lib/bloom_filter_block.c | 64 +++
.../metadata/_lib/bloom_filter_block_avx2_amd64.s | 295 +++++++++++
.../metadata/_lib/bloom_filter_block_sse4_amd64.s | 322 ++++++++++++
parquet/metadata/adaptive_bloom_filter.go | 224 ++++++++
parquet/metadata/bloom_filter.go | 565 +++++++++++++++++++++
parquet/metadata/bloom_filter_block.go | 53 ++
parquet/metadata/bloom_filter_block_amd64.go | 36 ++
parquet/metadata/bloom_filter_block_avx2_amd64.go | 45 ++
parquet/metadata/bloom_filter_block_avx2_amd64.s | 151 ++++++
parquet/metadata/bloom_filter_block_default.go | 23 +
parquet/metadata/bloom_filter_block_sse4_amd64.go | 45 ++
parquet/metadata/bloom_filter_block_sse4_amd64.s | 176 +++++++
parquet/metadata/bloom_filter_reader_test.go | 275 ++++++++++
parquet/metadata/bloom_filter_test.go | 190 +++++++
parquet/metadata/cleanup_bloom_filter.go | 37 ++
parquet/metadata/cleanup_bloom_filter_go1.23.go | 35 ++
parquet/metadata/column_chunk.go | 25 +-
parquet/metadata/file.go | 6 +
parquet/metadata/metadata_test.go | 8 +-
parquet/metadata/row_group.go | 23 +-
28 files changed, 2681 insertions(+), 22 deletions(-)
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index f8ae5d5..fa45c52 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -63,7 +63,7 @@ jobs:
with:
submodules: recursive
- name: Login to GitHub Container registry
- uses: docker/login-action@v3
+ uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 #
v3.4.0
with:
registry: ghcr.io
username: ${{ github.actor }}
diff --git a/go.mod b/go.mod
index d056843..cd0bff1 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ toolchain go1.23.2
require (
github.com/andybalholm/brotli v1.1.1
github.com/apache/thrift v0.21.0
+ github.com/cespare/xxhash/v2 v2.3.0
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
github.com/goccy/go-json v0.10.5
github.com/golang/snappy v1.0.0
diff --git a/go.sum b/go.sum
index fc2a1f7..6da30a0 100644
--- a/go.sum
+++ b/go.sum
@@ -24,6 +24,8 @@ github.com/antlr4-go/antlr/v4 v4.13.1/go.mod
h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmO
github.com/apache/thrift v0.21.0
h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod
h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
github.com/atomicgo/cursor v0.0.1/go.mod
h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
+github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cockroachdb/apd/v3 v3.2.1
h1:U+8j7t0axsIgvQUqthuNm82HIrYXodOV2iWLWtEaIwg=
github.com/cockroachdb/apd/v3 v3.2.1/go.mod
h1:klXJcjp+FffLTHlhIG69tezTDvdP065naDsHzKhYSqc=
github.com/containerd/console v1.0.3
h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw=
diff --git a/parquet/file/file_writer.go b/parquet/file/file_writer.go
index 68b5eee..c9b913b 100644
--- a/parquet/file/file_writer.go
+++ b/parquet/file/file_writer.go
@@ -158,6 +158,7 @@ func (fw *Writer) startFile() {
}
fw.fileEncryptor = encryption.NewFileEncryptor(encryptionProps,
fw.props.Allocator())
+ fw.metadata.SetFileEncryptor(fw.fileEncryptor)
if encryptionProps.EncryptedFooter() {
magic = magicEBytes
}
diff --git a/parquet/file/page_writer.go b/parquet/file/page_writer.go
index 1718292..8d22ef8 100644
--- a/parquet/file/page_writer.go
+++ b/parquet/file/page_writer.go
@@ -220,7 +220,7 @@ func (pw *serializedPageWriter) Close(hasDict, fallback
bool) error {
DataEncodingStats: pw.dataEncodingStats,
}
pw.FinishPageIndexes(0)
- pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats,
pw.metaEncryptor)
+ pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats)
_, err := pw.metaData.WriteTo(pw.sink)
return err
}
@@ -505,7 +505,7 @@ func (bw *bufferedPageWriter) Close(hasDict, fallback bool)
error {
DictEncodingStats: bw.pager.dictEncodingStats,
DataEncodingStats: bw.pager.dataEncodingStats,
}
- bw.metadata.Finish(chunkInfo, hasDict, fallback, encodingStats,
bw.pager.metaEncryptor)
+ bw.metadata.Finish(chunkInfo, hasDict, fallback, encodingStats)
bw.pager.FinishPageIndexes(position)
bw.metadata.WriteTo(bw.inMemSink)
diff --git a/parquet/internal/encryption/aes.go
b/parquet/internal/encryption/aes.go
index 0c16fac..06c9c90 100644
--- a/parquet/internal/encryption/aes.go
+++ b/parquet/internal/encryption/aes.go
@@ -54,6 +54,8 @@ const (
DictPageHeaderModule
ColumnIndexModule
OffsetIndexModule
+ BloomFilterHeaderModule
+ BloomFilterBitsetModule
)
type aesEncryptor struct {
diff --git a/parquet/metadata/Makefile b/parquet/metadata/Makefile
new file mode 100644
index 0000000..cfcc8ef
--- /dev/null
+++ b/parquet/metadata/Makefile
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# this converts rotate instructions from "ro[lr] <reg>" -> "ro[lr] <reg>, 1"
for yasm compatibility
+PERL_FIXUP_ROTATE=perl -i -pe 's/(ro[rl]\s+\w{2,3})$$/\1, 1/'
+
+C2GOASM=c2goasm
+CC=clang-19
+C_FLAGS=-target x86_64-unknown-none -masm=intel -mno-red-zone -mstackrealign
-mllvm -inline-threshold=1000 \
+ -fno-asynchronous-unwind-tables -fno-exceptions
-fno-rtti -O3 -fno-builtin -ffast-math -fno-jump-tables -I_lib
+ASM_FLAGS_AVX2=-mavx2 -mfma
+ASM_FLAGS_SSE4=-msse4
+ASM_FLAGS_BMI2=-mbmi2
+ASM_FLAGS_POPCNT=-mpopcnt
+
+C_FLAGS_NEON=-O3 -fvectorize -mllvm -force-vector-width=16
-fno-asynchronous-unwind-tables -mno-red-zone -mstackrealign -fno-exceptions \
+ -fno-rtti -fno-builtin -ffast-math -fno-jump-tables -I_lib
+
+GO_SOURCES := $(shell find . -path ./_lib -prune -o -name '*.go' -not -name
'*_test.go')
+ALL_SOURCES := $(shell find . -path ./_lib -prune -o -name '*.go' -name '*.s'
-not -name '*_test.go')
+
+.PHONY: assembly
+
+INTEL_SOURCES := \
+ bloom_filter_block_avx2_amd64.s bloom_filter_block_sse4_amd64.s
+
+ARM_SOURCES := \
+ bloom_filter_block_neon_arm64.s
+
+
+assembly: $(INTEL_SOURCES)
+
+_lib/bloom_filter_block_avx2_amd64.s: _lib/bloom_filter_block.c
+ $(CC) -S $(C_FLAGS) $(ASM_FLAGS_AVX2) $^ -o $@ ; $(PERL_FIXUP_ROTATE)
$@; perl -i -pe 's/mem(cpy|set)/clib·_mem\1(SB)/' $@
+
+_lib/bloom_filter_block_sse4_amd64.s: _lib/bloom_filter_block.c
+ $(CC) -S $(C_FLAGS) $(ASM_FLAGS_SSE4) $^ -o $@ ; $(PERL_FIXUP_ROTATE)
$@; perl -i -pe 's/mem(cpy|set)/clib·_mem\1(SB)/' $@
+
+# neon not supported by c2goasm, will have to do it manually
+#_lib/bloom_filter_block_neon.s: _lib/bloom_filter_block.c
+# $(CC) -S $(C_FLAGS_NEON) $^ -o $@ ; $(PERL_FIXUP_ROTATE) $@
+
+bloom_filter_block_avx2_amd64.s: _lib/bloom_filter_block_avx2_amd64.s
+ $(C2GOASM) -a -f $^ $@
+
+bloom_filter_block_sse4_amd64.s: _lib/bloom_filter_block_sse4_amd64.s
+ $(C2GOASM) -a -f $^ $@
+
+clean:
+ rm -f $(INTEL_SOURCES)
+ rm -f $(addprefix _lib/,$(INTEL_SOURCES))
\ No newline at end of file
diff --git a/parquet/metadata/_lib/arch.h b/parquet/metadata/_lib/arch.h
new file mode 100644
index 0000000..165d120
--- /dev/null
+++ b/parquet/metadata/_lib/arch.h
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#undef FULL_NAME
+
+#if defined(__AVX2__)
+ #define FULL_NAME(x) x##_avx2
+#elif __SSE4_2__ == 1
+ #define FULL_NAME(x) x##_sse4
+#elif __SSE3__ == 1
+ #define FULL_NAME(x) x##_sse3
+#elif defined(__ARM_NEON) || defined(__ARM_NEON__)
+ #define FULL_NAME(x) x##_neon
+#else
+ #define FULL_NAME(x) x##_x86
+#endif
\ No newline at end of file
diff --git a/parquet/metadata/_lib/bloom_filter_block.c
b/parquet/metadata/_lib/bloom_filter_block.c
new file mode 100644
index 0000000..dced68b
--- /dev/null
+++ b/parquet/metadata/_lib/bloom_filter_block.c
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "arch.h"
+#include <stdint.h>
+#include <stdbool.h>
+
+// algorithms defined in
https://github.com/apache/parquet-format/blob/master/BloomFilter.md
+// describing the proper definitions for the bloom filter hash functions
+// to be compatible with the parquet format
+
+#define bitsSetPerBlock 8
+static const uint32_t SALT[bitsSetPerBlock] = {
+ 0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+ 0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};
+
+#define PREDICT_FALSE(x) (__builtin_expect(!!(x), 0))
+
+bool FULL_NAME(check_block)(const uint32_t blocks[], const int len, const
uint64_t hash){
+ const uint32_t bucket_index =
+ (uint32_t)(((hash >> 32) * (uint64_t)(len/8)) >> 32);
+ const uint32_t key = (uint32_t)hash;
+
+ for (int i = 0; i < bitsSetPerBlock; ++i)
+ {
+ const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+ if (PREDICT_FALSE(0 == (blocks[bitsSetPerBlock * bucket_index + i] &
mask)))
+ {
+ return false;
+ }
+ }
+ return true;
+}
+
+void FULL_NAME(insert_block)(uint32_t blocks[], const int len, const uint64_t
hash) {
+ const uint32_t bucket_index =
+ (uint32_t)(((hash >> 32) * (uint64_t)(len/8)) >> 32);
+ const uint32_t key = (uint32_t)hash;
+
+ for (int i = 0; i < bitsSetPerBlock; ++i)
+ {
+ const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+ blocks[bitsSetPerBlock * bucket_index + i] |= mask;
+ }
+}
+
+void FULL_NAME(insert_bulk)(uint32_t blocks[], const int block_len, const
uint64_t hashes[], const int num_hashes) {
+ for (int i = 0; i < num_hashes; ++i) {
+ FULL_NAME(insert_block)(blocks, block_len, hashes[i]);
+ }
+}
\ No newline at end of file
diff --git a/parquet/metadata/_lib/bloom_filter_block_avx2_amd64.s
b/parquet/metadata/_lib/bloom_filter_block_avx2_amd64.s
new file mode 100644
index 0000000..4bb0877
--- /dev/null
+++ b/parquet/metadata/_lib/bloom_filter_block_avx2_amd64.s
@@ -0,0 +1,295 @@
+ .text
+ .intel_syntax noprefix
+ .file "bloom_filter_block.c"
+ .globl check_block_avx2 # -- Begin function
check_block_avx2
+ .p2align 4, 0x90
+ .type check_block_avx2,@function
+check_block_avx2: # @check_block_avx2
+# %bb.0:
+ push rbp
+ mov rbp, rsp
+ and rsp, -8
+ # kill: def $esi killed $esi def $rsi
+ mov rcx, rdx
+ shr rcx, 32
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ cdqe
+ imul rax, rcx
+ shr rax, 29
+ and eax, -8
+ imul ecx, edx, 1203114875
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.1:
+ imul ecx, edx, 1150766481
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 4]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.2:
+ imul ecx, edx, -2010862245
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 8]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.3:
+ imul ecx, edx, -1565054819
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 12]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.4:
+ imul ecx, edx, 1884591559
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 16]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.5:
+ imul ecx, edx, 770785867
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 20]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.6:
+ imul ecx, edx, -1627633337
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 24]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.7:
+ imul ecx, edx, 1550580529
+ shr ecx, 27
+ mov eax, dword ptr [rdi + 4*rax + 28]
+ bt eax, ecx
+ setb al
+ # kill: def $al killed $al killed $eax
+ mov rsp, rbp
+ pop rbp
+ ret
+.LBB0_8:
+ xor eax, eax
+ # kill: def $al killed $al killed $eax
+ mov rsp, rbp
+ pop rbp
+ ret
+.Lfunc_end0:
+ .size check_block_avx2, .Lfunc_end0-check_block_avx2
+ # -- End function
+ .globl check_bulk_avx2 # -- Begin function
check_bulk_avx2
+ .p2align 4, 0x90
+ .type check_bulk_avx2,@function
+check_bulk_avx2: # @check_bulk_avx2
+# %bb.0:
+ # kill: def $esi killed $esi def $rsi
+ test r8d, r8d
+ jle .LBB1_19
+# %bb.1:
+ push rbp
+ mov rbp, rsp
+ push rbx
+ and rsp, -8
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ cdqe
+ mov esi, r8d
+ xor r8d, r8d
+ .p2align 4, 0x90
+.LBB1_4: # =>This Inner Loop Header: Depth=1
+ mov r10, qword ptr [rdx + 8*r8]
+ mov r9, r10
+ shr r9, 32
+ imul r9, rax
+ shr r9, 29
+ and r9d, -8
+ imul r11d, r10d, 1203114875
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.5: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, 1150766481
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 4]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.6: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, -2010862245
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 8]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.7: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, -1565054819
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 12]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.8: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, 1884591559
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 16]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.9: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, 770785867
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 20]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.10: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, -1627633337
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 24]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.11: # in Loop: Header=BB1_4 Depth=1
+ imul r10d, r10d, 1550580529
+ shr r10d, 27
+ mov r9d, dword ptr [rdi + 4*r9 + 28]
+ bt r9d, r10d
+ setb r9b
+ mov byte ptr [rcx + r8], r9b
+ inc r8
+ cmp rsi, r8
+ jne .LBB1_4
+ jmp .LBB1_18
+ .p2align 4, 0x90
+.LBB1_2: # in Loop: Header=BB1_4 Depth=1
+ xor r9d, r9d
+ mov byte ptr [rcx + r8], r9b
+ inc r8
+ cmp rsi, r8
+ jne .LBB1_4
+.LBB1_18:
+ # lea rsp, [rbp - 8]
+ pop rbx
+ pop rbp
+.LBB1_19:
+ ret
+.Lfunc_end1:
+ .size check_bulk_avx2, .Lfunc_end1-check_bulk_avx2
+ # -- End function
+ .section .rodata.cst32,"aM",@progbits,32
+ .p2align 5, 0x0 # -- Begin function
insert_block_avx2
+.LCPI2_0:
+ .long 1203114875 # 0x47b6137b
+ .long 1150766481 # 0x44974d91
+ .long 2284105051 # 0x8824ad5b
+ .long 2729912477 # 0xa2b7289d
+ .long 1884591559 # 0x705495c7
+ .long 770785867 # 0x2df1424b
+ .long 2667333959 # 0x9efc4947
+ .long 1550580529 # 0x5c6bfb31
+ .section .rodata.cst4,"aM",@progbits,4
+ .p2align 2, 0x0
+.LCPI2_1:
+ .long 1 # 0x1
+ .text
+ .globl insert_block_avx2
+ .p2align 4, 0x90
+ .type insert_block_avx2,@function
+insert_block_avx2: # @insert_block_avx2
+# %bb.0:
+ push rbp
+ mov rbp, rsp
+ and rsp, -8
+ # kill: def $esi killed $esi def $rsi
+ vmovd xmm0, edx
+ shr rdx, 32
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ cdqe
+ imul rax, rdx
+ shr rax, 27
+ movabs rcx, 17179869152
+ vpbroadcastd ymm0, xmm0
+ vpmulld ymm0, ymm0, ymmword ptr [rip + .LCPI2_0]
+ and rcx, rax
+ vpsrld ymm0, ymm0, 27
+ vpbroadcastd ymm1, dword ptr [rip + .LCPI2_1] # ymm1 =
[1,1,1,1,1,1,1,1]
+ vpsllvd ymm0, ymm1, ymm0
+ vpor ymm0, ymm0, ymmword ptr [rdi + rcx]
+ vmovdqu ymmword ptr [rdi + rcx], ymm0
+ mov rsp, rbp
+ pop rbp
+ vzeroupper
+ ret
+.Lfunc_end2:
+ .size insert_block_avx2, .Lfunc_end2-insert_block_avx2
+ # -- End function
+ .section .rodata.cst32,"aM",@progbits,32
+ .p2align 5, 0x0 # -- Begin function
insert_bulk_avx2
+.LCPI3_0:
+ .long 1203114875 # 0x47b6137b
+ .long 1150766481 # 0x44974d91
+ .long 2284105051 # 0x8824ad5b
+ .long 2729912477 # 0xa2b7289d
+ .long 1884591559 # 0x705495c7
+ .long 770785867 # 0x2df1424b
+ .long 2667333959 # 0x9efc4947
+ .long 1550580529 # 0x5c6bfb31
+ .section .rodata.cst4,"aM",@progbits,4
+ .p2align 2, 0x0
+.LCPI3_1:
+ .long 1 # 0x1
+ .text
+ .globl insert_bulk_avx2
+ .p2align 4, 0x90
+ .type insert_bulk_avx2,@function
+insert_bulk_avx2: # @insert_bulk_avx2
+# %bb.0:
+ # kill: def $esi killed $esi def $rsi
+ test ecx, ecx
+ jle .LBB3_4
+# %bb.1:
+ push rbp
+ mov rbp, rsp
+ and rsp, -8
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ cdqe
+ mov ecx, ecx
+ xor esi, esi
+ movabs r8, 17179869152
+ vmovdqa ymm0, ymmword ptr [rip + .LCPI3_0] # ymm0 =
[1203114875,1150766481,2284105051,2729912477,1884591559,770785867,2667333959,1550580529]
+ vpbroadcastd ymm1, dword ptr [rip + .LCPI3_1] # ymm1 =
[1,1,1,1,1,1,1,1]
+ .p2align 4, 0x90
+.LBB3_2: # =>This Inner Loop Header: Depth=1
+ mov r9, qword ptr [rdx + 8*rsi]
+ vmovd xmm2, r9d
+ shr r9, 32
+ imul r9, rax
+ shr r9, 27
+ and r9, r8
+ vpbroadcastd ymm2, xmm2
+ vpmulld ymm2, ymm2, ymm0
+ vpsrld ymm2, ymm2, 27
+ vpsllvd ymm2, ymm1, ymm2
+ vpor ymm2, ymm2, ymmword ptr [rdi + r9]
+ vmovdqu ymmword ptr [rdi + r9], ymm2
+ inc rsi
+ cmp rcx, rsi
+ jne .LBB3_2
+# %bb.3:
+ mov rsp, rbp
+ pop rbp
+.LBB3_4:
+ vzeroupper
+ ret
+.Lfunc_end3:
+ .size insert_bulk_avx2, .Lfunc_end3-insert_bulk_avx2
+ # -- End function
+ .ident "clang version 19.1.6
(https://github.com/conda-forge/clangdev-feedstock
a097c63bb6a9919682224023383a143d482c552e)"
+ .section ".note.GNU-stack","",@progbits
+ .addrsig
\ No newline at end of file
diff --git a/parquet/metadata/_lib/bloom_filter_block_sse4_amd64.s
b/parquet/metadata/_lib/bloom_filter_block_sse4_amd64.s
new file mode 100644
index 0000000..593b61b
--- /dev/null
+++ b/parquet/metadata/_lib/bloom_filter_block_sse4_amd64.s
@@ -0,0 +1,322 @@
+ .text
+ .intel_syntax noprefix
+ .file "bloom_filter_block.c"
+ .globl check_block_sse4 # -- Begin function
check_block_sse4
+ .p2align 4, 0x90
+ .type check_block_sse4,@function
+check_block_sse4: # @check_block_sse4
+# %bb.0:
+ push rbp
+ mov rbp, rsp
+ and rsp, -8
+ # kill: def $esi killed $esi def $rsi
+ mov rcx, rdx
+ shr rcx, 32
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ cdqe
+ imul rax, rcx
+ shr rax, 29
+ and eax, -8
+ imul ecx, edx, 1203114875
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.1:
+ imul ecx, edx, 1150766481
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 4]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.2:
+ imul ecx, edx, -2010862245
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 8]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.3:
+ imul ecx, edx, -1565054819
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 12]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.4:
+ imul ecx, edx, 1884591559
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 16]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.5:
+ imul ecx, edx, 770785867
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 20]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.6:
+ imul ecx, edx, -1627633337
+ shr ecx, 27
+ mov esi, dword ptr [rdi + 4*rax + 24]
+ bt esi, ecx
+ jae .LBB0_8
+# %bb.7:
+ imul ecx, edx, 1550580529
+ shr ecx, 27
+ mov eax, dword ptr [rdi + 4*rax + 28]
+ bt eax, ecx
+ setb al
+ # kill: def $al killed $al killed $eax
+ mov rsp, rbp
+ pop rbp
+ ret
+.LBB0_8:
+ xor eax, eax
+ # kill: def $al killed $al killed $eax
+ mov rsp, rbp
+ pop rbp
+ ret
+.Lfunc_end0:
+ .size check_block_sse4, .Lfunc_end0-check_block_sse4
+ # -- End function
+ .globl check_bulk_sse4 # -- Begin function
check_bulk_sse4
+ .p2align 4, 0x90
+ .type check_bulk_sse4,@function
+check_bulk_sse4: # @check_bulk_sse4
+# %bb.0:
+ # kill: def $esi killed $esi def $rsi
+ test r8d, r8d
+ jle .LBB1_19
+# %bb.1:
+ push rbp
+ mov rbp, rsp
+ push rbx
+ and rsp, -8
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ cdqe
+ mov esi, r8d
+ xor r8d, r8d
+ .p2align 4, 0x90
+.LBB1_4: # =>This Inner Loop Header: Depth=1
+ mov r10, qword ptr [rdx + 8*r8]
+ mov r9, r10
+ shr r9, 32
+ imul r9, rax
+ shr r9, 29
+ and r9d, -8
+ imul r11d, r10d, 1203114875
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.5: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, 1150766481
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 4]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.6: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, -2010862245
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 8]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.7: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, -1565054819
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 12]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.8: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, 1884591559
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 16]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.9: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, 770785867
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 20]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.10: # in Loop: Header=BB1_4 Depth=1
+ imul r11d, r10d, -1627633337
+ shr r11d, 27
+ mov ebx, dword ptr [rdi + 4*r9 + 24]
+ bt ebx, r11d
+ jae .LBB1_2
+# %bb.11: # in Loop: Header=BB1_4 Depth=1
+ imul r10d, r10d, 1550580529
+ shr r10d, 27
+ mov r9d, dword ptr [rdi + 4*r9 + 28]
+ bt r9d, r10d
+ setb r9b
+ mov byte ptr [rcx + r8], r9b
+ inc r8
+ cmp rsi, r8
+ jne .LBB1_4
+ jmp .LBB1_18
+ .p2align 4, 0x90
+.LBB1_2: # in Loop: Header=BB1_4 Depth=1
+ xor r9d, r9d
+ mov byte ptr [rcx + r8], r9b
+ inc r8
+ cmp rsi, r8
+ jne .LBB1_4
+.LBB1_18:
+ # lea rsp, [rbp - 8]
+ pop rbx
+ pop rbp
+.LBB1_19:
+ ret
+.Lfunc_end1:
+ .size check_bulk_sse4, .Lfunc_end1-check_bulk_sse4
+ # -- End function
+ .section .rodata.cst16,"aM",@progbits,16
+ .p2align 4, 0x0 # -- Begin function
insert_block_sse4
+.LCPI2_0:
+ .long 1203114875 # 0x47b6137b
+ .long 1150766481 # 0x44974d91
+ .long 2284105051 # 0x8824ad5b
+ .long 2729912477 # 0xa2b7289d
+.LCPI2_1:
+ .long 1065353216 # 0x3f800000
+ .long 1065353216 # 0x3f800000
+ .long 1065353216 # 0x3f800000
+ .long 1065353216 # 0x3f800000
+.LCPI2_2:
+ .long 1884591559 # 0x705495c7
+ .long 770785867 # 0x2df1424b
+ .long 2667333959 # 0x9efc4947
+ .long 1550580529 # 0x5c6bfb31
+ .text
+ .globl insert_block_sse4
+ .p2align 4, 0x90
+ .type insert_block_sse4,@function
+insert_block_sse4: # @insert_block_sse4
+# %bb.0:
+ push rbp
+ mov rbp, rsp
+ and rsp, -8
+ # kill: def $esi killed $esi def $rsi
+ movd xmm0, edx
+ shr rdx, 32
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ movsxd rcx, eax
+ imul rcx, rdx
+ shr rcx, 27
+ movabs rax, 17179869152
+ and rax, rcx
+ pshufd xmm0, xmm0, 0 # xmm0 = xmm0[0,0,0,0]
+ movdqa xmm1, xmmword ptr [rip + .LCPI2_0] # xmm1 =
[1203114875,1150766481,2284105051,2729912477]
+ pmulld xmm1, xmm0
+ psrld xmm1, 27
+ pslld xmm1, 23
+ movdqa xmm2, xmmword ptr [rip + .LCPI2_1] # xmm2 =
[1065353216,1065353216,1065353216,1065353216]
+ paddd xmm1, xmm2
+ cvttps2dq xmm1, xmm1
+ movups xmm3, xmmword ptr [rdi + rax]
+ orps xmm3, xmm1
+ movups xmm1, xmmword ptr [rdi + rax + 16]
+ movups xmmword ptr [rdi + rax], xmm3
+ pmulld xmm0, xmmword ptr [rip + .LCPI2_2]
+ psrld xmm0, 27
+ pslld xmm0, 23
+ paddd xmm0, xmm2
+ cvttps2dq xmm0, xmm0
+ orps xmm0, xmm1
+ movups xmmword ptr [rdi + rax + 16], xmm0
+ mov rsp, rbp
+ pop rbp
+ ret
+.Lfunc_end2:
+ .size insert_block_sse4, .Lfunc_end2-insert_block_sse4
+ # -- End function
+ .section .rodata.cst16,"aM",@progbits,16
+ .p2align 4, 0x0 # -- Begin function
insert_bulk_sse4
+.LCPI3_0:
+ .long 1203114875 # 0x47b6137b
+ .long 1150766481 # 0x44974d91
+ .long 2284105051 # 0x8824ad5b
+ .long 2729912477 # 0xa2b7289d
+.LCPI3_1:
+ .long 1065353216 # 0x3f800000
+ .long 1065353216 # 0x3f800000
+ .long 1065353216 # 0x3f800000
+ .long 1065353216 # 0x3f800000
+.LCPI3_2:
+ .long 1884591559 # 0x705495c7
+ .long 770785867 # 0x2df1424b
+ .long 2667333959 # 0x9efc4947
+ .long 1550580529 # 0x5c6bfb31
+ .text
+ .globl insert_bulk_sse4
+ .p2align 4, 0x90
+ .type insert_bulk_sse4,@function
+insert_bulk_sse4: # @insert_bulk_sse4
+# %bb.0:
+ # kill: def $esi killed $esi def $rsi
+ test ecx, ecx
+ jle .LBB3_4
+# %bb.1:
+ push rbp
+ mov rbp, rsp
+ and rsp, -8
+ lea eax, [rsi + 7]
+ test esi, esi
+ cmovns eax, esi
+ sar eax, 3
+ cdqe
+ mov ecx, ecx
+ xor esi, esi
+ movabs r8, 17179869152
+ movdqa xmm0, xmmword ptr [rip + .LCPI3_0] # xmm0 =
[1203114875,1150766481,2284105051,2729912477]
+ movdqa xmm1, xmmword ptr [rip + .LCPI3_1] # xmm1 =
[1065353216,1065353216,1065353216,1065353216]
+ movdqa xmm2, xmmword ptr [rip + .LCPI3_2] # xmm2 =
[1884591559,770785867,2667333959,1550580529]
+ .p2align 4, 0x90
+.LBB3_2: # =>This Inner Loop Header: Depth=1
+ mov r9, qword ptr [rdx + 8*rsi]
+ movd xmm3, r9d
+ shr r9, 32
+ imul r9, rax
+ shr r9, 27
+ and r9, r8
+ pshufd xmm3, xmm3, 0 # xmm3 = xmm3[0,0,0,0]
+ movdqa xmm4, xmm3
+ pmulld xmm4, xmm0
+ psrld xmm4, 27
+ pslld xmm4, 23
+ paddd xmm4, xmm1
+ cvttps2dq xmm4, xmm4
+ movups xmm5, xmmword ptr [rdi + r9]
+ orps xmm5, xmm4
+ movups xmm4, xmmword ptr [rdi + r9 + 16]
+ movups xmmword ptr [rdi + r9], xmm5
+ pmulld xmm3, xmm2
+ psrld xmm3, 27
+ pslld xmm3, 23
+ paddd xmm3, xmm1
+ cvttps2dq xmm3, xmm3
+ orps xmm3, xmm4
+ movups xmmword ptr [rdi + r9 + 16], xmm3
+ inc rsi
+ cmp rcx, rsi
+ jne .LBB3_2
+# %bb.3:
+ mov rsp, rbp
+ pop rbp
+.LBB3_4:
+ ret
+.Lfunc_end3:
+ .size insert_bulk_sse4, .Lfunc_end3-insert_bulk_sse4
+ # -- End function
+ .ident "clang version 19.1.6
(https://github.com/conda-forge/clangdev-feedstock
a097c63bb6a9919682224023383a143d482c552e)"
+ .section ".note.GNU-stack","",@progbits
+ .addrsig
\ No newline at end of file
diff --git a/parquet/metadata/adaptive_bloom_filter.go
b/parquet/metadata/adaptive_bloom_filter.go
new file mode 100644
index 0000000..c040645
--- /dev/null
+++ b/parquet/metadata/adaptive_bloom_filter.go
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+import (
+ "io"
+ "runtime"
+ "slices"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/bitutil"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet/internal/encryption"
+ format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+ "github.com/apache/arrow-go/v18/parquet/schema"
+)
+
+type bloomFilterCandidate struct {
+ bloomFilter blockSplitBloomFilter
+ expectedNDV uint32
+}
+
+func newBloomFilterCandidate(expectedNDV, numBytes, minBytes, maxBytes uint32,
h Hasher, mem memory.Allocator) *bloomFilterCandidate {
+ if numBytes < minBytes {
+ numBytes = minBytes
+ }
+
+ if numBytes > maxBytes {
+ numBytes = maxBytes
+ }
+
+ // get next power of 2 if it's not a power of 2
+ if (numBytes & (numBytes - 1)) != 0 {
+ numBytes = uint32(bitutil.NextPowerOf2(int(numBytes)))
+ }
+
+ buf := memory.NewResizableBuffer(mem)
+ buf.ResizeNoShrink(int(numBytes))
+ bf := blockSplitBloomFilter{
+ data: buf,
+ bitset32: arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
+ hasher: h,
+ algorithm: defaultAlgorithm,
+ hashStrategy: defaultHashStrategy,
+ compression: defaultCompression,
+ }
+ runtime.SetFinalizer(&bf, func(f *blockSplitBloomFilter) {
+ f.data.Release()
+ })
+ return &bloomFilterCandidate{bloomFilter: bf, expectedNDV: expectedNDV}
+}
+
+type adaptiveBlockSplitBloomFilter struct {
+ mem memory.Allocator
+ candidates []*bloomFilterCandidate
+ largestCandidate *bloomFilterCandidate
+ numDistinct int64
+ finalized bool
+
+ maxBytes, minBytes uint32
+ minCandidateNDV int
+ hasher Hasher
+ hashStrategy format.BloomFilterHash
+ algorithm format.BloomFilterAlgorithm
+ compression format.BloomFilterCompression
+
+ column *schema.Column
+}
+
+func NewAdaptiveBlockSplitBloomFilter(maxBytes uint32, numCandidates int, fpp
float64, column *schema.Column, mem memory.Allocator) BloomFilterBuilder {
+ ret := &adaptiveBlockSplitBloomFilter{
+ mem: mem,
+ maxBytes: min(maximumBloomFilterBytes, maxBytes),
+ minBytes: minimumBloomFilterBytes,
+ minCandidateNDV: 16,
+ hasher: xxhasher{},
+ column: column,
+ hashStrategy: defaultHashStrategy,
+ algorithm: defaultAlgorithm,
+ compression: defaultCompression,
+ }
+
+ ret.initCandidates(maxBytes, numCandidates, fpp)
+ return ret
+}
+
+func (b *adaptiveBlockSplitBloomFilter) getAlg() *format.BloomFilterAlgorithm {
+ return &b.algorithm
+}
+
+func (b *adaptiveBlockSplitBloomFilter) getHashStrategy()
*format.BloomFilterHash {
+ return &b.hashStrategy
+}
+
+func (b *adaptiveBlockSplitBloomFilter) getCompression()
*format.BloomFilterCompression {
+ return &b.compression
+}
+
+func (b *adaptiveBlockSplitBloomFilter) optimalCandidate()
*bloomFilterCandidate {
+ return slices.MinFunc(b.candidates, func(a, b *bloomFilterCandidate)
int {
+ return int(b.bloomFilter.Size() - a.bloomFilter.Size())
+ })
+}
+
+func (b *adaptiveBlockSplitBloomFilter) Hasher() Hasher { return b.hasher }
+
+func (b *adaptiveBlockSplitBloomFilter) InsertHash(hash uint64) {
+ if b.finalized {
+ panic("adaptive bloom filter has been marked finalized, no more
data allowed")
+ }
+
+ if !b.largestCandidate.bloomFilter.CheckHash(hash) {
+ b.numDistinct++
+ }
+
+ b.candidates = slices.DeleteFunc(b.candidates, func(c
*bloomFilterCandidate) bool {
+ return c.expectedNDV < uint32(b.numDistinct) && c !=
b.largestCandidate
+ })
+
+ for _, c := range b.candidates {
+ c.bloomFilter.InsertHash(hash)
+ }
+}
+
+func (b *adaptiveBlockSplitBloomFilter) InsertBulk(hashes []uint64) {
+ if b.finalized {
+ panic("adaptive bloom filter has been marked finalized, no more
data allowed")
+ }
+
+ for _, h := range hashes {
+ if !b.largestCandidate.bloomFilter.CheckHash(h) {
+ b.numDistinct++
+ }
+ }
+
+ b.candidates = slices.DeleteFunc(b.candidates, func(c
*bloomFilterCandidate) bool {
+ return c.expectedNDV < uint32(b.numDistinct) && c !=
b.largestCandidate
+ })
+
+ for _, c := range b.candidates {
+ c.bloomFilter.InsertBulk(hashes)
+ }
+}
+
+func (b *adaptiveBlockSplitBloomFilter) Size() int64 {
+ return b.optimalCandidate().bloomFilter.Size()
+}
+
+func (b *adaptiveBlockSplitBloomFilter) CheckHash(hash uint64) bool {
+ return b.largestCandidate.bloomFilter.CheckHash(hash)
+}
+
+func (b *adaptiveBlockSplitBloomFilter) WriteTo(w io.Writer, enc
encryption.Encryptor) (int, error) {
+ b.finalized = true
+
+ return b.optimalCandidate().bloomFilter.WriteTo(w, enc)
+}
+
+func (b *adaptiveBlockSplitBloomFilter) initCandidates(maxBytes uint32,
numCandidates int, fpp float64) {
+ b.candidates = make([]*bloomFilterCandidate, 0, numCandidates)
+ candidateByteSize := b.calcBoundedPowerOf2(maxBytes)
+ for range numCandidates {
+ candidateExpectedNDV := b.expectedNDV(candidateByteSize, fpp)
+ if candidateExpectedNDV <= 0 {
+ break
+ }
+
+ b.candidates = append(b.candidates,
newBloomFilterCandidate(uint32(candidateExpectedNDV),
+ candidateByteSize, b.minBytes, b.maxBytes, b.hasher,
b.mem))
+ candidateByteSize = b.calcBoundedPowerOf2(candidateByteSize / 2)
+ }
+
+ if len(b.candidates) == 0 {
+ // maxBytes is too small, but at least one candidate will be
generated
+ b.candidates = append(b.candidates,
newBloomFilterCandidate(uint32(b.minCandidateNDV),
+ b.minBytes, b.minBytes, b.maxBytes, b.hasher, b.mem))
+ }
+
+ b.largestCandidate = slices.MaxFunc(b.candidates, func(a, b
*bloomFilterCandidate) int {
+ return int(b.bloomFilter.Size() - a.bloomFilter.Size())
+ })
+}
+
+func (b *adaptiveBlockSplitBloomFilter) expectedNDV(numBytes uint32, fpp
float64) int {
+ var (
+ expectedNDV, optimalBytes uint32
+ )
+
+ const ndvStep = 500
+ for optimalBytes < numBytes {
+ expectedNDV += ndvStep
+ optimalBytes = optimalNumBytes(expectedNDV, fpp)
+ }
+
+ // make sure it is slightly smaller than what numBytes supports
+ expectedNDV -= ndvStep
+ return int(max(0, expectedNDV))
+}
+
+func (b *adaptiveBlockSplitBloomFilter) calcBoundedPowerOf2(numBytes uint32)
uint32 {
+ if numBytes < b.minBytes {
+ numBytes = b.minBytes
+ }
+
+ if numBytes&(numBytes-1) != 0 {
+ numBytes = uint32(bitutil.NextPowerOf2(int(numBytes)))
+ }
+
+ return max(min(numBytes, b.maxBytes), b.minBytes)
+}
diff --git a/parquet/metadata/bloom_filter.go b/parquet/metadata/bloom_filter.go
new file mode 100644
index 0000000..cb21809
--- /dev/null
+++ b/parquet/metadata/bloom_filter.go
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "sync"
+ "unsafe"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/bitutil"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/bitutils"
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/internal/debug"
+ "github.com/apache/arrow-go/v18/parquet/internal/encryption"
+ format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+ "github.com/apache/arrow-go/v18/parquet/internal/thrift"
+ "github.com/apache/arrow-go/v18/parquet/internal/utils"
+ "github.com/apache/arrow-go/v18/parquet/schema"
+ "github.com/cespare/xxhash/v2"
+)
+
+const (
+ bytesPerFilterBlock = 32
+ bitsSetPerBlock = 8
+ minimumBloomFilterBytes = bytesPerFilterBlock
+ // currently using 128MB as maximum size, should probably be
reconsidered
+ maximumBloomFilterBytes = 128 * 1024 * 1024
+)
+
+var (
+ salt = [bitsSetPerBlock]uint32{
+ 0x476137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+ 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}
+
+ defaultHashStrategy = format.BloomFilterHash{XXHASH: &format.XxHash{}}
+ defaultAlgorithm = format.BloomFilterAlgorithm{BLOCK:
&format.SplitBlockAlgorithm{}}
+ defaultCompression = format.BloomFilterCompression{UNCOMPRESSED:
&format.Uncompressed{}}
+)
+
+func optimalNumBytes(ndv uint32, fpp float64) uint32 {
+ optimalBits := optimalNumBits(ndv, fpp)
+ debug.Assert(bitutil.IsMultipleOf8(int64(optimalBits)), "optimal bits
should be multiple of 8")
+ return optimalBits >> 3
+}
+
+func optimalNumBits(ndv uint32, fpp float64) uint32 {
+ debug.Assert(fpp > 0 && fpp < 1, "false positive prob must be in (0,
1)")
+ var (
+ m = -8 * float64(ndv) / math.Log(1-math.Pow(fpp, 1.0/8.0))
+ numBits uint32
+ )
+
+ if m < 0 || m > maximumBloomFilterBytes>>3 {
+ numBits = maximumBloomFilterBytes << 3
+ } else {
+ numBits = uint32(m)
+ }
+
+ // round up to lower bound
+ if numBits < minimumBloomFilterBytes<<3 {
+ numBits = minimumBloomFilterBytes << 3
+ }
+
+ // get next power of 2 if bits is not power of 2
+ if (numBits & (numBits - 1)) != 0 {
+ numBits = uint32(bitutil.NextPowerOf2(int(numBits)))
+ }
+ return numBits
+}
+
+type Hasher interface {
+ Sum64(b []byte) uint64
+ Sum64s(b [][]byte) []uint64
+}
+
+type xxhasher struct{}
+
+func (xxhasher) Sum64(b []byte) uint64 {
+ return xxhash.Sum64(b)
+}
+
+func (xxhasher) Sum64s(b [][]byte) (vals []uint64) {
+ vals = make([]uint64, len(b))
+ for i, v := range b {
+ vals[i] = xxhash.Sum64(v)
+ }
+ return
+}
+
+func GetHash[T parquet.ColumnTypes](h Hasher, v T) uint64 {
+ return h.Sum64(getBytes(v))
+}
+
+func GetHashes[T parquet.ColumnTypes](h Hasher, vals []T) []uint64 {
+ return h.Sum64s(getBytesSlice(vals))
+}
+
+func GetSpacedHashes[T parquet.ColumnTypes](h Hasher, numValid int64, vals
[]T, validBits []byte, validBitsOffset int64) []uint64 {
+ if numValid == 0 {
+ return []uint64{}
+ }
+
+ out := make([]uint64, 0, numValid)
+
+ // TODO: replace with bitset run reader pool
+ setReader := bitutils.NewSetBitRunReader(validBits, validBitsOffset,
int64(len(vals)))
+ for {
+ run := setReader.NextRun()
+ if run.Length == 0 {
+ break
+ }
+
+ out = append(out,
h.Sum64s(getBytesSlice(vals[run.Pos:run.Pos+run.Length]))...)
+ }
+ return out
+}
+
+func getBytes[T parquet.ColumnTypes](v T) []byte {
+ switch v := any(v).(type) {
+ case parquet.ByteArray:
+ return v
+ case parquet.FixedLenByteArray:
+ return v
+ case parquet.Int96:
+ return v[:]
+ }
+
+ return unsafe.Slice((*byte)(unsafe.Pointer(&v)), unsafe.Sizeof(v))
+}
+
+func getBytesSlice[T parquet.ColumnTypes](v []T) [][]byte {
+ b := make([][]byte, len(v))
+ switch v := any(v).(type) {
+ case []parquet.ByteArray:
+ for i, vv := range v {
+ b[i] = vv
+ }
+ return b
+ case []parquet.FixedLenByteArray:
+ for i, vv := range v {
+ b[i] = vv
+ }
+ return b
+ case []parquet.Int96:
+ for i, vv := range v {
+ b[i] = vv[:]
+ }
+ return b
+ }
+
+ var z T
+ sz, ptr := int(unsafe.Sizeof(z)), unsafe.SliceData(v)
+ raw := unsafe.Slice((*byte)(unsafe.Pointer(ptr)), sz*len(v))
+ for i := range b {
+ b[i] = raw[i*sz : (i+1)*sz]
+ }
+
+ return b
+}
+
+type blockSplitBloomFilter struct {
+ data *memory.Buffer
+ bitset32 []uint32
+
+ hasher Hasher
+ algorithm format.BloomFilterAlgorithm
+ hashStrategy format.BloomFilterHash
+ compression format.BloomFilterCompression
+}
+
+func (b *blockSplitBloomFilter) getAlg() *format.BloomFilterAlgorithm {
+ return &b.algorithm
+}
+
+func (b *blockSplitBloomFilter) getHashStrategy() *format.BloomFilterHash {
+ return &b.hashStrategy
+}
+
+func (b *blockSplitBloomFilter) getCompression()
*format.BloomFilterCompression {
+ return &b.compression
+}
+
+func (b *blockSplitBloomFilter) CheckHash(hash uint64) bool {
+ return checkHash(b.bitset32, hash)
+}
+
+func (b *blockSplitBloomFilter) CheckBulk(hashes []uint64) []bool {
+ results := make([]bool, len(hashes))
+ checkBulk(b.bitset32, hashes, results)
+ return results
+}
+
+func (b *blockSplitBloomFilter) InsertHash(hash uint64) {
+ insertHash(b.bitset32, hash)
+}
+
+func (b *blockSplitBloomFilter) InsertBulk(hashes []uint64) {
+ insertBulk(b.bitset32, hashes)
+}
+
+func (b *blockSplitBloomFilter) Hasher() Hasher {
+ return b.hasher
+}
+
+func (b *blockSplitBloomFilter) Size() int64 {
+ return int64(len(b.bitset32) * 4)
+}
+
+func (b *blockSplitBloomFilter) WriteTo(w io.Writer, enc encryption.Encryptor)
(int, error) {
+ if enc != nil {
+ n := enc.Encrypt(w, b.data.Bytes())
+ return n, nil
+ }
+ return w.Write(b.data.Bytes())
+}
+
+func NewBloomFilter(numBytes, maxBytes uint32, mem memory.Allocator)
BloomFilterBuilder {
+ if numBytes < minimumBloomFilterBytes {
+ numBytes = minimumBloomFilterBytes
+ }
+
+ if maxBytes > maximumBloomFilterBytes {
+ maxBytes = maximumBloomFilterBytes
+ }
+
+ if numBytes > maxBytes {
+ numBytes = maxBytes
+ }
+
+ // get next power of 2 if it's not a power of 2
+ if (numBytes & (numBytes - 1)) != 0 {
+ numBytes = uint32(bitutil.NextPowerOf2(int(numBytes)))
+ }
+
+ buf := memory.NewResizableBuffer(mem)
+ buf.ResizeNoShrink(int(numBytes))
+ bf := &blockSplitBloomFilter{
+ data: buf,
+ bitset32: arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
+ hasher: xxhasher{},
+ algorithm: format.BloomFilterAlgorithm{BLOCK:
&format.SplitBlockAlgorithm{}},
+ hashStrategy: format.BloomFilterHash{XXHASH: &format.XxHash{}},
+ compression: format.BloomFilterCompression{UNCOMPRESSED:
&format.Uncompressed{}},
+ }
+ addCleanup(bf, nil)
+ return bf
+}
+
+func NewBloomFilterFromNDVAndFPP(ndv uint32, fpp float64, maxBytes int64, mem
memory.Allocator) BloomFilterBuilder {
+ numBytes := optimalNumBytes(ndv, fpp)
+ if numBytes > uint32(maxBytes) {
+ numBytes = uint32(maxBytes)
+ }
+
+ buf := memory.NewResizableBuffer(mem)
+ buf.ResizeNoShrink(int(numBytes))
+ bf := &blockSplitBloomFilter{
+ data: buf,
+ bitset32: arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
+ hasher: xxhasher{},
+ algorithm: format.BloomFilterAlgorithm{BLOCK:
&format.SplitBlockAlgorithm{}},
+ hashStrategy: format.BloomFilterHash{XXHASH: &format.XxHash{}},
+ compression: format.BloomFilterCompression{UNCOMPRESSED:
&format.Uncompressed{}},
+ }
+ addCleanup(bf, nil)
+ return bf
+}
+
+type BloomFilterBuilder interface {
+ Hasher() Hasher
+ Size() int64
+ InsertHash(hash uint64)
+ InsertBulk(hashes []uint64)
+ WriteTo(io.Writer, encryption.Encryptor) (int, error)
+
+ getAlg() *format.BloomFilterAlgorithm
+ getHashStrategy() *format.BloomFilterHash
+ getCompression() *format.BloomFilterCompression
+}
+
+type BloomFilter interface {
+ Hasher() Hasher
+ CheckHash(hash uint64) bool
+ Size() int64
+}
+
+type TypedBloomFilter[T parquet.ColumnTypes] struct {
+ BloomFilter
+}
+
+func (b *TypedBloomFilter[T]) Check(v T) bool {
+ h := b.Hasher()
+ return b.CheckHash(h.Sum64(getBytes(v)))
+}
+
+func validateBloomFilterHeader(hdr *format.BloomFilterHeader) error {
+ if hdr == nil {
+ return errors.New("bloom filter header must not be nil")
+ }
+
+ if !hdr.Algorithm.IsSetBLOCK() {
+ return fmt.Errorf("unsupported bloom filter algorithm: %s",
hdr.Algorithm)
+ }
+
+ if !hdr.Compression.IsSetUNCOMPRESSED() {
+ return fmt.Errorf("unsupported bloom filter compression: %s",
hdr.Compression)
+ }
+
+ if !hdr.Hash.IsSetXXHASH() {
+ return fmt.Errorf("unsupported bloom filter hash strategy: %s",
hdr.Hash)
+ }
+
+ if hdr.NumBytes < minimumBloomFilterBytes || hdr.NumBytes >
maximumBloomFilterBytes {
+ return fmt.Errorf("invalid bloom filter size: %d", hdr.NumBytes)
+ }
+
+ return nil
+}
+
+type BloomFilterReader struct {
+ Input parquet.ReaderAtSeeker
+ FileMetadata *FileMetaData
+ Props *parquet.ReaderProperties
+ FileDecryptor encryption.FileDecryptor
+ BufferPool *sync.Pool
+}
+
+func (r *BloomFilterReader) RowGroup(i int) (*RowGroupBloomFilterReader,
error) {
+ if i < 0 || i >= len(r.FileMetadata.RowGroups) {
+ return nil, fmt.Errorf("row group index %d out of range", i)
+ }
+
+ rgMeta := r.FileMetadata.RowGroup(i)
+ return &RowGroupBloomFilterReader{
+ input: r.Input,
+ rgMeta: rgMeta,
+ fileDecryptor: r.FileDecryptor,
+ rgOrdinal: int16(i),
+ bufferPool: r.BufferPool,
+ sourceFileSize: r.FileMetadata.sourceFileSize,
+ }, nil
+}
+
+type RowGroupBloomFilterReader struct {
+ input parquet.ReaderAtSeeker
+ rgMeta *RowGroupMetaData
+ fileDecryptor encryption.FileDecryptor
+ rgOrdinal int16
+ sourceFileSize int64
+
+ bufferPool *sync.Pool
+}
+
+func (r *RowGroupBloomFilterReader) GetColumnBloomFilter(i int) (BloomFilter,
error) {
+ if i < 0 || i >= r.rgMeta.NumColumns() {
+ return nil, fmt.Errorf("column index %d out of range", i)
+ }
+
+ col, err := r.rgMeta.ColumnChunk(i)
+ if err != nil {
+ return nil, err
+ }
+
+ var (
+ decryptor encryption.Decryptor
+ header format.BloomFilterHeader
+ offset int64
+ bloomFilterReadSize int32 = 256
+ )
+
+ if offset = col.BloomFilterOffset(); offset <= 0 {
+ return nil, nil
+ }
+
+ if col.BloomFilterLength() > 0 {
+ bloomFilterReadSize = col.BloomFilterLength()
+ }
+
+ sectionRdr := io.NewSectionReader(r.input, offset,
r.sourceFileSize-offset)
+ cryptoMetadata := col.CryptoMetadata()
+ if cryptoMetadata != nil {
+ decryptor, err =
encryption.GetColumnMetaDecryptor(cryptoMetadata, r.fileDecryptor)
+ if err != nil {
+ return nil, err
+ }
+
+ encryption.UpdateDecryptor(decryptor, r.rgOrdinal, int16(i),
+ encryption.BloomFilterHeaderModule)
+ hdr := decryptor.DecryptFrom(sectionRdr)
+ if _, err = thrift.DeserializeThrift(&header, hdr); err != nil {
+ return nil, err
+ }
+
+ if err = validateBloomFilterHeader(&header); err != nil {
+ return nil, err
+ }
+
+ encryption.UpdateDecryptor(decryptor, r.rgOrdinal, int16(i),
+ encryption.BloomFilterBitsetModule)
+ bitset := decryptor.DecryptFrom(sectionRdr)
+ if len(bitset) != int(header.NumBytes) {
+ return nil, fmt.Errorf("wrong length of decrypted bloom
filter bitset: %d vs %d",
+ len(bitset), header.NumBytes)
+ }
+
+ return &blockSplitBloomFilter{
+ data: memory.NewBufferBytes(bitset),
+ bitset32: arrow.Uint32Traits.CastFromBytes(bitset),
+ hasher: xxhasher{},
+ algorithm: *header.Algorithm,
+ hashStrategy: *header.Hash,
+ compression: *header.Compression,
+ }, nil
+ }
+
+ headerBuf := r.bufferPool.Get().(*memory.Buffer)
+ headerBuf.ResizeNoShrink(int(bloomFilterReadSize))
+ defer func() {
+ if headerBuf != nil {
+ headerBuf.ResizeNoShrink(0)
+ r.bufferPool.Put(headerBuf)
+ }
+ }()
+
+ if _, err = sectionRdr.Read(headerBuf.Bytes()); err != nil {
+ return nil, err
+ }
+
+ remaining, err := thrift.DeserializeThrift(&header, headerBuf.Bytes())
+ if err != nil {
+ return nil, err
+ }
+ headerSize := len(headerBuf.Bytes()) - int(remaining)
+
+ if err = validateBloomFilterHeader(&header); err != nil {
+ return nil, err
+ }
+
+ bloomFilterSz := header.NumBytes
+ var bitset []byte
+ if int(bloomFilterSz)+headerSize <= len(headerBuf.Bytes()) {
+ // bloom filter data is entirely contained in the buffer we
just read
+ bitset = headerBuf.Bytes()[headerSize :
headerSize+int(bloomFilterSz)]
+ } else {
+ buf := r.bufferPool.Get().(*memory.Buffer)
+ buf.ResizeNoShrink(int(bloomFilterSz))
+ filterBytesInHeader := headerBuf.Len() - headerSize
+ if filterBytesInHeader > 0 {
+ copy(buf.Bytes(), headerBuf.Bytes()[headerSize:])
+ }
+
+ if _, err = sectionRdr.Read(buf.Bytes()[filterBytesInHeader:]);
err != nil {
+ return nil, err
+ }
+ bitset = buf.Bytes()
+ headerBuf.ResizeNoShrink(0)
+ r.bufferPool.Put(headerBuf)
+ headerBuf = buf
+ }
+
+ bf := &blockSplitBloomFilter{
+ data: headerBuf,
+ bitset32: arrow.GetData[uint32](bitset),
+ hasher: xxhasher{},
+ algorithm: *header.Algorithm,
+ hashStrategy: *header.Hash,
+ compression: *header.Compression,
+ }
+ headerBuf = nil
+ addCleanup(bf, r.bufferPool)
+ return bf, nil
+}
+
+type FileBloomFilterBuilder struct {
+ Schema *schema.Schema
+ Encryptor encryption.FileEncryptor
+
+ rgMetaBldrs []*RowGroupMetaDataBuilder
+ bloomFilters []map[string]BloomFilterBuilder
+}
+
+func (f *FileBloomFilterBuilder) AppendRowGroup(rgMeta
*RowGroupMetaDataBuilder, filters map[string]BloomFilterBuilder) {
+ f.rgMetaBldrs = append(f.rgMetaBldrs, rgMeta)
+ f.bloomFilters = append(f.bloomFilters, filters)
+}
+
+func (f *FileBloomFilterBuilder) WriteTo(w utils.WriterTell) error {
+ if len(f.rgMetaBldrs) == 0 || len(f.bloomFilters) == 0 {
+ return nil
+ }
+
+ var (
+ hdr format.BloomFilterHeader
+ serializer = thrift.NewThriftSerializer()
+ )
+ for rg, rgMeta := range f.rgMetaBldrs {
+ if len(f.bloomFilters[rg]) == 0 {
+ continue
+ }
+
+ for c, col := range rgMeta.colBuilders {
+ colPath := col.column.Path()
+ bf, ok := f.bloomFilters[rg][colPath]
+ if !ok || bf == nil {
+ continue
+ }
+
+ offset := w.Tell()
+ col.chunk.MetaData.BloomFilterOffset = &offset
+ var encryptor encryption.Encryptor
+ if f.Encryptor != nil {
+ encryptor =
f.Encryptor.GetColumnMetaEncryptor(colPath)
+ }
+
+ if encryptor != nil {
+ encryptor.UpdateAad(encryption.CreateModuleAad(
+ encryptor.FileAad(),
encryption.BloomFilterHeaderModule,
+ int16(rg), int16(c),
encryption.NonPageOrdinal))
+ }
+
+ hdr.NumBytes = int32(bf.Size())
+ hdr.Algorithm = bf.getAlg()
+ hdr.Hash = bf.getHashStrategy()
+ hdr.Compression = bf.getCompression()
+
+ _, err := serializer.Serialize(&hdr, w, encryptor)
+ if err != nil {
+ return err
+ }
+
+ if encryptor != nil {
+ encryptor.UpdateAad(encryption.CreateModuleAad(
+ encryptor.FileAad(),
encryption.BloomFilterBitsetModule,
+ int16(rg), int16(c),
encryption.NonPageOrdinal))
+ }
+
+ if _, err = bf.WriteTo(w, encryptor); err != nil {
+ return err
+ }
+
+ dataWritten := int32(w.Tell() - offset)
+ col.chunk.MetaData.BloomFilterLength = &dataWritten
+ }
+ }
+ return nil
+}
diff --git a/parquet/metadata/bloom_filter_block.go
b/parquet/metadata/bloom_filter_block.go
new file mode 100644
index 0000000..bbd8426
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+var (
+ checkHash func([]uint32, uint64) bool
+ checkBulk func([]uint32, []uint64, []bool)
+ insertHash func([]uint32, uint64)
+ insertBulk func([]uint32, []uint64)
+)
+
+func checkHashGo(bitset32 []uint32, hash uint64) bool {
+ bucketIdx := uint32(((hash >> 32) * uint64(len(bitset32)/8)) >> 32)
+ key := uint32(hash)
+
+ for i := range bitsSetPerBlock {
+ mask := uint32(1) << ((key * salt[i]) >> 27)
+ if bitset32[bitsSetPerBlock*bucketIdx+uint32(i)]&mask == 0 {
+ return false
+ }
+ }
+ return true
+}
+
+func insertHashGo(bitset32 []uint32, hash uint64) {
+ bucketIdx := uint32(((hash >> 32) * uint64(len(bitset32)/8)) >> 32)
+ key := uint32(hash)
+
+ for i := range bitsSetPerBlock {
+ mask := uint32(1) << ((key * salt[i]) >> 27)
+ bitset32[bitsSetPerBlock*bucketIdx+uint32(i)] |= mask
+ }
+}
+
+func insertBulkGo(bitset32 []uint32, hash []uint64) {
+ for _, h := range hash {
+ insertHash(bitset32, h)
+ }
+}
diff --git a/parquet/metadata/bloom_filter_block_amd64.go
b/parquet/metadata/bloom_filter_block_amd64.go
new file mode 100644
index 0000000..463e696
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_amd64.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !noasm
+
+package metadata
+
+import (
+ "golang.org/x/sys/cpu"
+)
+
+func init() {
+ if cpu.X86.HasAVX2 {
+ checkHash = checkBlockAvx2
+ insertHash, insertBulk = insertBlockAvx2, insertBulkAvx2
+ } else if cpu.X86.HasSSE42 {
+ checkHash = checkBlockSSE4
+ insertHash, insertBulk = insertBlockSSE4, insertBulkSSE4
+ } else {
+ checkHash = checkHashGo
+ insertHash, insertBulk = insertHashGo, insertBulkGo
+ }
+}
diff --git a/parquet/metadata/bloom_filter_block_avx2_amd64.go
b/parquet/metadata/bloom_filter_block_avx2_amd64.go
new file mode 100644
index 0000000..01655d0
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_avx2_amd64.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !noasm
+
+package metadata
+
+import (
+ "unsafe"
+)
+
+//go:noescape
+func _check_block_avx2(bitset32 unsafe.Pointer, len int, hash uint64) (result
bool)
+
+func checkBlockAvx2(bitset32 []uint32, hash uint64) bool {
+ return _check_block_avx2(unsafe.Pointer(unsafe.SliceData(bitset32)),
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_block_avx2(bitset32 unsafe.Pointer, len int, hash uint64)
+
+func insertBlockAvx2(bitset32 []uint32, hash uint64) {
+ _insert_block_avx2(unsafe.Pointer(unsafe.SliceData(bitset32)),
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_bulk_avx2(bitset32 unsafe.Pointer, block_len int, hashes
unsafe.Pointer, hash_len int)
+
+func insertBulkAvx2(bitset32 []uint32, hashes []uint64) {
+ _insert_bulk_avx2(unsafe.Pointer(unsafe.SliceData(bitset32)),
len(bitset32),
+ unsafe.Pointer(unsafe.SliceData(hashes)), len(hashes))
+}
diff --git a/parquet/metadata/bloom_filter_block_avx2_amd64.s
b/parquet/metadata/bloom_filter_block_avx2_amd64.s
new file mode 100644
index 0000000..c176800
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_avx2_amd64.s
@@ -0,0 +1,151 @@
+//+build !noasm !appengine
+// AUTO-GENERATED BY C2GOASM -- DO NOT EDIT
+
+DATA LCDATA1<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA1<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA1<>+0x010(SB)/8, $0x2df1424b705495c7
+DATA LCDATA1<>+0x018(SB)/8, $0x5c6bfb319efc4947
+DATA LCDATA1<>+0x020(SB)/8, $0x0000000000000001
+GLOBL LCDATA1<>(SB), 8, $40
+
+TEXT ·_check_block_avx2(SB), $0-32
+
+ MOVQ bitset32+0(FP), DI
+ MOVQ len+8(FP), SI
+ MOVQ hash+16(FP), DX
+ LEAQ LCDATA1<>(SB), BP
+
+ WORD $0x8948; BYTE $0xd1 // mov rcx, rdx
+ LONG $0x20e9c148 // shr rcx, 32
+ WORD $0x468d; BYTE $0x07 // lea eax, [rsi + 7]
+ WORD $0xf685 // test esi, esi
+ WORD $0x490f; BYTE $0xc6 // cmovns eax, esi
+ WORD $0xf8c1; BYTE $0x03 // sar eax, 3
+ WORD $0x9848 // cdqe
+ LONG $0xc1af0f48 // imul rax, rcx
+ LONG $0x1de8c148 // shr rax, 29
+ WORD $0xe083; BYTE $0xf8 // and eax, -8
+ LONG $0x137bca69; WORD $0x47b6 // imul ecx, edx, 1203114875
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ WORD $0x348b; BYTE $0x87 // mov esi, dword [rdi + 4*rax]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x4d91ca69; WORD $0x4497 // imul ecx, edx, 1150766481
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x0487748b // mov esi, dword [rdi + 4*rax + 4]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0xad5bca69; WORD $0x8824 // imul ecx, edx, -2010862245
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x0887748b // mov esi, dword [rdi + 4*rax + 8]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x289dca69; WORD $0xa2b7 // imul ecx, edx, -1565054819
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x0c87748b // mov esi, dword [rdi + 4*rax + 12]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x95c7ca69; WORD $0x7054 // imul ecx, edx, 1884591559
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1087748b // mov esi, dword [rdi + 4*rax + 16]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x424bca69; WORD $0x2df1 // imul ecx, edx, 770785867
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1487748b // mov esi, dword [rdi + 4*rax + 20]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x4947ca69; WORD $0x9efc // imul ecx, edx, -1627633337
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1887748b // mov esi, dword [rdi + 4*rax + 24]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0xfb31ca69; WORD $0x5c6b // imul ecx, edx, 1550580529
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1c87448b // mov eax, dword [rdi + 4*rax + 28]
+ WORD $0xa30f; BYTE $0xc8 // bt eax, ecx
+ WORD $0x920f; BYTE $0xd0 // setb al
+ MOVQ AX, result+24(FP)
+ RET
+
+LBB0_8:
+ WORD $0xc031 // xor eax, eax
+ MOVQ AX, result+24(FP)
+ RET
+
+TEXT ·_insert_block_avx2(SB), $0-24
+
+ MOVQ bitset32+0(FP), DI
+ MOVQ len+8(FP), SI
+ MOVQ hash+16(FP), DX
+ LEAQ LCDATA1<>(SB), BP
+
+ LONG $0xc26ef9c5 // vmovd xmm0, edx
+ LONG $0x20eac148 // shr rdx, 32
+ WORD $0x468d; BYTE $0x07 // lea eax, [rsi + 7]
+ WORD $0xf685 // test esi, esi
+ WORD $0x490f; BYTE $0xc6 // cmovns eax, esi
+ WORD $0xf8c1; BYTE $0x03 // sar eax, 3
+ WORD $0x9848 // cdqe
+ LONG $0xc2af0f48 // imul rax, rdx
+ LONG $0x1be8c148 // shr rax, 27
+ QUAD $0x0003ffffffe0b948; WORD $0x0000 // mov rcx, 17179869152
+ LONG $0x587de2c4; BYTE $0xc0 // vpbroadcastd ymm0, xmm0
+ LONG $0x407de2c4; WORD $0x0045 // vpmulld ymm0, ymm0, yword
0[rbp] /* [rip + .LCPI2_0] */
+ WORD $0x2148; BYTE $0xc1 // and rcx, rax
+ LONG $0xd072fdc5; BYTE $0x1b // vpsrld ymm0, ymm0, 27
+ LONG $0x587de2c4; WORD $0x204d // vpbroadcastd ymm1, dword
32[rbp] /* [rip + .LCPI2_1] */
+ LONG $0x4775e2c4; BYTE $0xc0 // vpsllvd ymm0, ymm1, ymm0
+ LONG $0x04ebfdc5; BYTE $0x0f // vpor ymm0, ymm0, yword
[rdi + rcx]
+ LONG $0x047ffec5; BYTE $0x0f // vmovdqu yword [rdi + rcx],
ymm0
+ VZEROUPPER
+ RET
+
+DATA LCDATA2<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA2<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA2<>+0x010(SB)/8, $0x2df1424b705495c7
+DATA LCDATA2<>+0x018(SB)/8, $0x5c6bfb319efc4947
+DATA LCDATA2<>+0x020(SB)/8, $0x0000000000000001
+GLOBL LCDATA2<>(SB), 8, $40
+
+TEXT ·_insert_bulk_avx2(SB), $0-32
+
+ MOVQ bitset32+0(FP), DI
+ MOVQ block_len+8(FP), SI
+ MOVQ hashes+16(FP), DX
+ MOVQ hash_len+24(FP), CX
+ LEAQ LCDATA2<>(SB), BP
+
+ WORD $0xc985 // test ecx, ecx
+ JLE LBB3_4
+ WORD $0x468d; BYTE $0x07 // lea eax, [rsi + 7]
+ WORD $0xf685 // test esi, esi
+ WORD $0x490f; BYTE $0xc6 // cmovns eax, esi
+ WORD $0xf8c1; BYTE $0x03 // sar eax, 3
+ WORD $0x9848 // cdqe
+ WORD $0xc989 // mov ecx, ecx
+ WORD $0xf631 // xor esi, esi
+ QUAD $0x0003ffffffe0b849; WORD $0x0000 // mov r8, 17179869152
+ LONG $0x456ffdc5; BYTE $0x00 // vmovdqa ymm0, yword 0[rbp]
/* [rip + .LCPI3_0] */
+ LONG $0x587de2c4; WORD $0x204d // vpbroadcastd ymm1, dword
32[rbp] /* [rip + .LCPI3_1] */
+
+LBB3_2:
+ LONG $0xf20c8b4c // mov r9, qword [rdx + 8*rsi]
+ LONG $0x6e79c1c4; BYTE $0xd1 // vmovd xmm2, r9d
+ LONG $0x20e9c149 // shr r9, 32
+ LONG $0xc8af0f4c // imul r9, rax
+ LONG $0x1be9c149 // shr r9, 27
+ WORD $0x214d; BYTE $0xc1 // and r9, r8
+ LONG $0x587de2c4; BYTE $0xd2 // vpbroadcastd ymm2, xmm2
+ LONG $0x406de2c4; BYTE $0xd0 // vpmulld ymm2, ymm2, ymm0
+ LONG $0xd272edc5; BYTE $0x1b // vpsrld ymm2, ymm2, 27
+ LONG $0x4775e2c4; BYTE $0xd2 // vpsllvd ymm2, ymm1, ymm2
+ LONG $0xeb6da1c4; WORD $0x0f14 // vpor ymm2, ymm2, yword [rdi + r9]
+ LONG $0x7f7ea1c4; WORD $0x0f14 // vmovdqu yword [rdi + r9], ymm2
+ WORD $0xff48; BYTE $0xc6 // inc rsi
+ WORD $0x3948; BYTE $0xf1 // cmp rcx, rsi
+ JNE LBB3_2
+
+LBB3_4:
+ VZEROUPPER
+ RET
diff --git a/parquet/metadata/bloom_filter_block_default.go
b/parquet/metadata/bloom_filter_block_default.go
new file mode 100644
index 0000000..ca1ec75
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_default.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build noasm || !amd64
+
+package metadata
+
+func init() {
+ checkHash, insertHash, insertBulk = checkHashGo, insertHashGo,
insertBulkGo
+}
diff --git a/parquet/metadata/bloom_filter_block_sse4_amd64.go
b/parquet/metadata/bloom_filter_block_sse4_amd64.go
new file mode 100644
index 0000000..7c94bf7
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_sse4_amd64.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !noasm
+
+package metadata
+
+import (
+ "unsafe"
+)
+
+//go:noescape
+func _check_block_sse4(bitset32 unsafe.Pointer, len int, hash uint64) (result
bool)
+
+func checkBlockSSE4(bitset32 []uint32, hash uint64) bool {
+ return _check_block_sse4(unsafe.Pointer(unsafe.SliceData(bitset32)),
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_block_sse4(bitset32 unsafe.Pointer, len int, hash uint64)
+
+func insertBlockSSE4(bitset32 []uint32, hash uint64) {
+ _insert_block_sse4(unsafe.Pointer(unsafe.SliceData(bitset32)),
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_bulk_sse4(bitset32 unsafe.Pointer, block_len int, hashes
unsafe.Pointer, hash_len int)
+
+func insertBulkSSE4(bitset32 []uint32, hashes []uint64) {
+ _insert_bulk_sse4(unsafe.Pointer(unsafe.SliceData(bitset32)),
len(bitset32),
+ unsafe.Pointer(unsafe.SliceData(hashes)), len(hashes))
+}
diff --git a/parquet/metadata/bloom_filter_block_sse4_amd64.s
b/parquet/metadata/bloom_filter_block_sse4_amd64.s
new file mode 100644
index 0000000..ac5ff12
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_sse4_amd64.s
@@ -0,0 +1,176 @@
+//+build !noasm !appengine
+// AUTO-GENERATED BY C2GOASM -- DO NOT EDIT
+
+DATA LCDATA1<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA1<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA1<>+0x010(SB)/8, $0x3f8000003f800000
+DATA LCDATA1<>+0x018(SB)/8, $0x3f8000003f800000
+DATA LCDATA1<>+0x020(SB)/8, $0x2df1424b705495c7
+DATA LCDATA1<>+0x028(SB)/8, $0x5c6bfb319efc4947
+GLOBL LCDATA1<>(SB), 8, $48
+
+TEXT ·_check_block_sse4(SB), $0-32
+
+ MOVQ bitset32+0(FP), DI
+ MOVQ len+8(FP), SI
+ MOVQ hash+16(FP), DX
+ LEAQ LCDATA1<>(SB), BP
+
+ WORD $0x8948; BYTE $0xd1 // mov rcx, rdx
+ LONG $0x20e9c148 // shr rcx, 32
+ WORD $0x468d; BYTE $0x07 // lea eax, [rsi + 7]
+ WORD $0xf685 // test esi, esi
+ WORD $0x490f; BYTE $0xc6 // cmovns eax, esi
+ WORD $0xf8c1; BYTE $0x03 // sar eax, 3
+ WORD $0x9848 // cdqe
+ LONG $0xc1af0f48 // imul rax, rcx
+ LONG $0x1de8c148 // shr rax, 29
+ WORD $0xe083; BYTE $0xf8 // and eax, -8
+ LONG $0x137bca69; WORD $0x47b6 // imul ecx, edx, 1203114875
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ WORD $0x348b; BYTE $0x87 // mov esi, dword [rdi + 4*rax]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x4d91ca69; WORD $0x4497 // imul ecx, edx, 1150766481
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x0487748b // mov esi, dword [rdi + 4*rax + 4]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0xad5bca69; WORD $0x8824 // imul ecx, edx, -2010862245
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x0887748b // mov esi, dword [rdi + 4*rax + 8]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x289dca69; WORD $0xa2b7 // imul ecx, edx, -1565054819
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x0c87748b // mov esi, dword [rdi + 4*rax + 12]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x95c7ca69; WORD $0x7054 // imul ecx, edx, 1884591559
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1087748b // mov esi, dword [rdi + 4*rax + 16]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x424bca69; WORD $0x2df1 // imul ecx, edx, 770785867
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1487748b // mov esi, dword [rdi + 4*rax + 20]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0x4947ca69; WORD $0x9efc // imul ecx, edx, -1627633337
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1887748b // mov esi, dword [rdi + 4*rax + 24]
+ WORD $0xa30f; BYTE $0xce // bt esi, ecx
+ JAE LBB0_8
+ LONG $0xfb31ca69; WORD $0x5c6b // imul ecx, edx, 1550580529
+ WORD $0xe9c1; BYTE $0x1b // shr ecx, 27
+ LONG $0x1c87448b // mov eax, dword [rdi + 4*rax + 28]
+ WORD $0xa30f; BYTE $0xc8 // bt eax, ecx
+ WORD $0x920f; BYTE $0xd0 // setb al
+ MOVQ AX, result+24(FP)
+ RET
+
+LBB0_8:
+ WORD $0xc031 // xor eax, eax
+ MOVQ AX, result+24(FP)
+ RET
+
+TEXT ·_insert_block_sse4(SB), $0-24
+
+ MOVQ bitset32+0(FP), DI
+ MOVQ len+8(FP), SI
+ MOVQ hash+16(FP), DX
+ LEAQ LCDATA1<>(SB), BP
+
+ LONG $0xc26e0f66 // movd xmm0, edx
+ LONG $0x20eac148 // shr rdx, 32
+ WORD $0x468d; BYTE $0x07 // lea eax, [rsi + 7]
+ WORD $0xf685 // test esi, esi
+ WORD $0x490f; BYTE $0xc6 // cmovns eax, esi
+ WORD $0xf8c1; BYTE $0x03 // sar eax, 3
+ WORD $0x6348; BYTE $0xc8 // movsxd rcx, eax
+ LONG $0xcaaf0f48 // imul rcx, rdx
+ LONG $0x1be9c148 // shr rcx, 27
+ QUAD $0x0003ffffffe0b848; WORD $0x0000 // mov rax, 17179869152
+ WORD $0x2148; BYTE $0xc8 // and rax, rcx
+ LONG $0xc0700f66; BYTE $0x00 // pshufd xmm0, xmm0, 0
+ LONG $0x4d6f0f66; BYTE $0x00 // movdqa xmm1, oword 0[rbp]
/* [rip + .LCPI2_0] */
+ LONG $0x40380f66; BYTE $0xc8 // pmulld xmm1, xmm0
+ LONG $0xd1720f66; BYTE $0x1b // psrld xmm1, 27
+ LONG $0xf1720f66; BYTE $0x17 // pslld xmm1, 23
+ LONG $0x556f0f66; BYTE $0x10 // movdqa xmm2, oword 16[rbp]
/* [rip + .LCPI2_1] */
+ LONG $0xcafe0f66 // paddd xmm1, xmm2
+ LONG $0xc95b0ff3 // cvttps2dq xmm1, xmm1
+ LONG $0x071c100f // movups xmm3, oword [rdi +
rax]
+ WORD $0x560f; BYTE $0xd9 // orps xmm3, xmm1
+ LONG $0x074c100f; BYTE $0x10 // movups xmm1, oword [rdi +
rax + 16]
+ LONG $0x071c110f // movups oword [rdi + rax],
xmm3
+ LONG $0x40380f66; WORD $0x2045 // pmulld xmm0, oword 32[rbp]
/* [rip + .LCPI2_2] */
+ LONG $0xd0720f66; BYTE $0x1b // psrld xmm0, 27
+ LONG $0xf0720f66; BYTE $0x17 // pslld xmm0, 23
+ LONG $0xc2fe0f66 // paddd xmm0, xmm2
+ LONG $0xc05b0ff3 // cvttps2dq xmm0, xmm0
+ WORD $0x560f; BYTE $0xc1 // orps xmm0, xmm1
+ LONG $0x0744110f; BYTE $0x10 // movups oword [rdi + rax +
16], xmm0
+ RET
+
+DATA LCDATA2<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA2<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA2<>+0x010(SB)/8, $0x3f8000003f800000
+DATA LCDATA2<>+0x018(SB)/8, $0x3f8000003f800000
+DATA LCDATA2<>+0x020(SB)/8, $0x2df1424b705495c7
+DATA LCDATA2<>+0x028(SB)/8, $0x5c6bfb319efc4947
+GLOBL LCDATA2<>(SB), 8, $48
+
+TEXT ·_insert_bulk_sse4(SB), $0-32
+
+ MOVQ bitset32+0(FP), DI
+ MOVQ block_len+8(FP), SI
+ MOVQ hashes+16(FP), DX
+ MOVQ hash_len+24(FP), CX
+ LEAQ LCDATA2<>(SB), BP
+
+ WORD $0xc985 // test ecx, ecx
+ JLE LBB3_4
+ WORD $0x468d; BYTE $0x07 // lea eax, [rsi + 7]
+ WORD $0xf685 // test esi, esi
+ WORD $0x490f; BYTE $0xc6 // cmovns eax, esi
+ WORD $0xf8c1; BYTE $0x03 // sar eax, 3
+ WORD $0x9848 // cdqe
+ WORD $0xc989 // mov ecx, ecx
+ WORD $0xf631 // xor esi, esi
+ QUAD $0x0003ffffffe0b849; WORD $0x0000 // mov r8, 17179869152
+ LONG $0x456f0f66; BYTE $0x00 // movdqa xmm0, oword 0[rbp]
/* [rip + .LCPI3_0] */
+ LONG $0x4d6f0f66; BYTE $0x10 // movdqa xmm1, oword 16[rbp]
/* [rip + .LCPI3_1] */
+ LONG $0x556f0f66; BYTE $0x20 // movdqa xmm2, oword 32[rbp]
/* [rip + .LCPI3_2] */
+
+LBB3_2:
+ LONG $0xf20c8b4c // mov r9, qword [rdx + 8*rsi]
+ LONG $0x6e0f4166; BYTE $0xd9 // movd xmm3, r9d
+ LONG $0x20e9c149 // shr r9, 32
+ LONG $0xc8af0f4c // imul r9, rax
+ LONG $0x1be9c149 // shr r9, 27
+ WORD $0x214d; BYTE $0xc1 // and r9, r8
+ LONG $0xdb700f66; BYTE $0x00 // pshufd xmm3, xmm3, 0
+ LONG $0xe36f0f66 // movdqa xmm4, xmm3
+ LONG $0x40380f66; BYTE $0xe0 // pmulld xmm4, xmm0
+ LONG $0xd4720f66; BYTE $0x1b // psrld xmm4, 27
+ LONG $0xf4720f66; BYTE $0x17 // pslld xmm4, 23
+ LONG $0xe1fe0f66 // paddd xmm4, xmm1
+ LONG $0xe45b0ff3 // cvttps2dq xmm4, xmm4
+ LONG $0x2c100f42; BYTE $0x0f // movups xmm5, oword [rdi + r9]
+ WORD $0x560f; BYTE $0xec // orps xmm5, xmm4
+ LONG $0x64100f42; WORD $0x100f // movups xmm4, oword [rdi + r9 + 16]
+ LONG $0x2c110f42; BYTE $0x0f // movups oword [rdi + r9], xmm5
+ LONG $0x40380f66; BYTE $0xda // pmulld xmm3, xmm2
+ LONG $0xd3720f66; BYTE $0x1b // psrld xmm3, 27
+ LONG $0xf3720f66; BYTE $0x17 // pslld xmm3, 23
+ LONG $0xd9fe0f66 // paddd xmm3, xmm1
+ LONG $0xdb5b0ff3 // cvttps2dq xmm3, xmm3
+ WORD $0x560f; BYTE $0xdc // orps xmm3, xmm4
+ LONG $0x5c110f42; WORD $0x100f // movups oword [rdi + r9 + 16], xmm3
+ WORD $0xff48; BYTE $0xc6 // inc rsi
+ WORD $0x3948; BYTE $0xf1 // cmp rcx, rsi
+ JNE LBB3_2
+
+LBB3_4:
+ RET
diff --git a/parquet/metadata/bloom_filter_reader_test.go
b/parquet/metadata/bloom_filter_reader_test.go
new file mode 100644
index 0000000..25d9505
--- /dev/null
+++ b/parquet/metadata/bloom_filter_reader_test.go
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata_test
+
+import (
+ "bytes"
+ "runtime"
+ "sync"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/internal/encryption"
+ "github.com/apache/arrow-go/v18/parquet/internal/utils"
+ "github.com/apache/arrow-go/v18/parquet/metadata"
+ "github.com/apache/arrow-go/v18/parquet/schema"
+ "github.com/stretchr/testify/suite"
+)
+
+type BloomFilterBuilderSuite struct {
+ suite.Suite
+
+ sc *schema.Schema
+ props *parquet.WriterProperties
+ mem *memory.CheckedAllocator
+ buf bytes.Buffer
+}
+
+func (suite *BloomFilterBuilderSuite) SetupTest() {
+ suite.props = parquet.NewWriterProperties()
+ suite.mem = memory.NewCheckedAllocator(memory.NewGoAllocator())
+ suite.buf.Reset()
+}
+
+func (suite *BloomFilterBuilderSuite) TearDownTest() {
+ runtime.GC() // we use setfinalizer to clean up the buffers, so run the
GC
+ suite.mem.AssertSize(suite.T(), 0)
+}
+
+func (suite *BloomFilterBuilderSuite) TestSingleRowGroup() {
+ suite.sc = schema.NewSchema(schema.MustGroup(
+ schema.NewGroupNode("schema", parquet.Repetitions.Repeated,
+ schema.FieldList{
+ schema.NewByteArrayNode("c1",
parquet.Repetitions.Optional, -1),
+ schema.NewByteArrayNode("c2",
parquet.Repetitions.Optional, -1),
+ schema.NewByteArrayNode("c3",
parquet.Repetitions.Optional, -1),
+ }, -1)))
+
+ bldr := metadata.FileBloomFilterBuilder{Schema: suite.sc}
+
+ metaBldr := metadata.NewFileMetadataBuilder(suite.sc, suite.props, nil)
+
+ {
+ rgMeta := metaBldr.AppendRowGroup()
+ filterMap := make(map[string]metadata.BloomFilterBuilder)
+ bldr.AppendRowGroup(rgMeta, filterMap)
+
+ bf1 := metadata.NewBloomFilter(32, 1024, suite.mem)
+ bf2 := metadata.NewAdaptiveBlockSplitBloomFilter(1024, 5, 0.01,
suite.sc.Column(2), suite.mem)
+
+ h1, h2 := bf1.Hasher(), bf2.Hasher()
+
+ rgMeta.NextColumnChunk()
+ rgMeta.NextColumnChunk()
+ rgMeta.NextColumnChunk()
+ rgMeta.Finish(0, 0)
+
+ bf1.InsertHash(metadata.GetHash(h1, parquet.ByteArray("Hello")))
+ bf2.InsertHash(metadata.GetHash(h2, parquet.ByteArray("World")))
+ filterMap["c1"] = bf1
+ filterMap["c3"] = bf2
+
+ wr := &utils.TellWrapper{Writer: &suite.buf}
+ wr.Write([]byte("PAR1")) // offset of 0 means unset, so write
something
+ // to force the offset to be set as a non-zero value
+ suite.Require().NoError(bldr.WriteTo(wr))
+ }
+ runtime.GC()
+
+ finalMeta, err := metaBldr.Finish()
+ suite.Require().NoError(err)
+ {
+ bufferPool := &sync.Pool{
+ New: func() interface{} {
+ buf := memory.NewResizableBuffer(suite.mem)
+ runtime.SetFinalizer(buf, func(obj
*memory.Buffer) {
+ obj.Release()
+ })
+ return buf
+ },
+ }
+
+ rdr := metadata.BloomFilterReader{
+ Input: bytes.NewReader(suite.buf.Bytes()),
+ FileMetadata: finalMeta,
+ BufferPool: bufferPool,
+ }
+
+ bfr, err := rdr.RowGroup(0)
+ suite.Require().NoError(err)
+ suite.Require().NotNil(bfr)
+
+ {
+ bf1, err := bfr.GetColumnBloomFilter(0)
+ suite.Require().NoError(err)
+ suite.Require().NotNil(bf1)
+
suite.False(bf1.CheckHash(metadata.GetHash(bf1.Hasher(),
parquet.ByteArray("World"))))
+ suite.True(bf1.CheckHash(metadata.GetHash(bf1.Hasher(),
parquet.ByteArray("Hello"))))
+ }
+ runtime.GC() // force GC to run to put the buffer back into the
pool
+ {
+ bf2, err := bfr.GetColumnBloomFilter(1)
+ suite.Require().NoError(err)
+ suite.Require().Nil(bf2)
+ }
+ {
+ bf3, err := bfr.GetColumnBloomFilter(2)
+ suite.Require().NoError(err)
+ suite.Require().NotNil(bf3)
+
suite.False(bf3.CheckHash(metadata.GetHash(bf3.Hasher(),
parquet.ByteArray("Hello"))))
+ suite.True(bf3.CheckHash(metadata.GetHash(bf3.Hasher(),
parquet.ByteArray("World"))))
+ }
+ runtime.GC() // we're using setfinalizer, so force release
+ }
+ runtime.GC()
+}
+
+const (
+ FooterEncryptionKey = "0123456789012345"
+ ColumnEncryptionKey1 = "1234567890123450"
+ ColumnEncryptionKey2 = "1234567890123451"
+ FooterEncryptionKeyID = "kf"
+ ColumnEncryptionKey1ID = "kc1"
+ ColumnEncryptionKey2ID = "kc2"
+)
+
+type EncryptedBloomFilterBuilderSuite struct {
+ suite.Suite
+
+ sc *schema.Schema
+ props *parquet.WriterProperties
+ decryptProps *parquet.FileDecryptionProperties
+ mem *memory.CheckedAllocator
+ buf bytes.Buffer
+}
+
+func (suite *EncryptedBloomFilterBuilderSuite) SetupTest() {
+ encryptedCols := parquet.ColumnPathToEncryptionPropsMap{
+ "c1": parquet.NewColumnEncryptionProperties("c1",
+ parquet.WithKey(ColumnEncryptionKey1),
parquet.WithKeyID(ColumnEncryptionKey1ID)),
+ "c2": parquet.NewColumnEncryptionProperties("c2",
+ parquet.WithKey(ColumnEncryptionKey2),
parquet.WithKeyID(ColumnEncryptionKey2ID)),
+ }
+
+ encProps := parquet.NewFileEncryptionProperties(FooterEncryptionKey,
+ parquet.WithFooterKeyID(FooterEncryptionKeyID),
+ parquet.WithEncryptedColumns(encryptedCols))
+
+ suite.decryptProps = parquet.NewFileDecryptionProperties(
+ parquet.WithFooterKey(FooterEncryptionKey),
+ parquet.WithColumnKeys(parquet.ColumnPathToDecryptionPropsMap{
+ "c1": parquet.NewColumnDecryptionProperties("c1",
parquet.WithDecryptKey(ColumnEncryptionKey1)),
+ "c2": parquet.NewColumnDecryptionProperties("c2",
parquet.WithDecryptKey(ColumnEncryptionKey2)),
+ }))
+
+ suite.props =
parquet.NewWriterProperties(parquet.WithEncryptionProperties(encProps))
+ suite.mem = memory.NewCheckedAllocator(memory.NewGoAllocator())
+ suite.buf.Reset()
+}
+
+func (suite *EncryptedBloomFilterBuilderSuite) TearDownTest() {
+ runtime.GC() // we use setfinalizer to clean up the buffers, so run the
GC
+ suite.mem.AssertSize(suite.T(), 0)
+}
+
+func (suite *EncryptedBloomFilterBuilderSuite) TestEncryptedBloomFilters() {
+ suite.sc = schema.NewSchema(schema.MustGroup(
+ schema.NewGroupNode("schema", parquet.Repetitions.Repeated,
+ schema.FieldList{
+ schema.NewByteArrayNode("c1",
parquet.Repetitions.Optional, -1),
+ schema.NewByteArrayNode("c2",
parquet.Repetitions.Optional, -1),
+ schema.NewByteArrayNode("c3",
parquet.Repetitions.Optional, -1),
+ }, -1)))
+
+ encryptor :=
encryption.NewFileEncryptor(suite.props.FileEncryptionProperties(), suite.mem)
+ metaBldr := metadata.NewFileMetadataBuilder(suite.sc, suite.props, nil)
+ metaBldr.SetFileEncryptor(encryptor)
+ bldr := metadata.FileBloomFilterBuilder{Schema: suite.sc, Encryptor:
encryptor}
+ {
+ rgMeta := metaBldr.AppendRowGroup()
+ filterMap := make(map[string]metadata.BloomFilterBuilder)
+ bldr.AppendRowGroup(rgMeta, filterMap)
+
+ bf1 := metadata.NewBloomFilter(32, 1024, suite.mem)
+ bf2 := metadata.NewAdaptiveBlockSplitBloomFilter(1024, 5, 0.01,
suite.sc.Column(1), suite.mem)
+ h1, h2 := bf1.Hasher(), bf2.Hasher()
+
+ bf1.InsertHash(metadata.GetHash(h1, parquet.ByteArray("Hello")))
+ bf2.InsertHash(metadata.GetHash(h2, parquet.ByteArray("World")))
+ filterMap["c1"] = bf1
+ filterMap["c2"] = bf2
+
+ colChunk1 := rgMeta.NextColumnChunk()
+ colChunk1.Finish(metadata.ChunkMetaInfo{}, false, false,
metadata.EncodingStats{})
+
+ colChunk2 := rgMeta.NextColumnChunk()
+ colChunk2.Finish(metadata.ChunkMetaInfo{}, false, false,
metadata.EncodingStats{})
+
+ colChunk3 := rgMeta.NextColumnChunk()
+ colChunk3.Finish(metadata.ChunkMetaInfo{}, false, false,
metadata.EncodingStats{})
+
+ wr := &utils.TellWrapper{Writer: &suite.buf}
+ wr.Write([]byte("PAR1")) // offset of 0 means unset, so write
something
+ // to force the offset to be set as a non-zero value
+ suite.Require().NoError(bldr.WriteTo(wr))
+
+ rgMeta.Finish(0, 0)
+ }
+
+ finalMeta, err := metaBldr.Finish()
+ suite.Require().NoError(err)
+ finalMeta.FileDecryptor =
encryption.NewFileDecryptor(suite.decryptProps,
+ suite.props.FileEncryptionProperties().FileAad(),
+ suite.props.FileEncryptionProperties().Algorithm().Algo, "",
suite.mem)
+ {
+ bufferPool := &sync.Pool{
+ New: func() interface{} {
+ buf := memory.NewResizableBuffer(suite.mem)
+ runtime.SetFinalizer(buf, func(obj
*memory.Buffer) {
+ obj.Release()
+ })
+ return buf
+ },
+ }
+ defer runtime.GC()
+
+ rdr := metadata.BloomFilterReader{
+ Input: bytes.NewReader(suite.buf.Bytes()),
+ FileMetadata: finalMeta,
+ BufferPool: bufferPool,
+ FileDecryptor: finalMeta.FileDecryptor,
+ }
+
+ bfr, err := rdr.RowGroup(0)
+ suite.Require().NoError(err)
+ suite.Require().NotNil(bfr)
+
+ {
+ bf1, err := bfr.GetColumnBloomFilter(0)
+ suite.Require().NoError(err)
+ suite.Require().NotNil(bf1)
+
suite.False(bf1.CheckHash(metadata.GetHash(bf1.Hasher(),
parquet.ByteArray("World"))))
+ suite.True(bf1.CheckHash(metadata.GetHash(bf1.Hasher(),
parquet.ByteArray("Hello"))))
+ }
+ }
+}
+
+func TestBloomFilterRoundTrip(t *testing.T) {
+ suite.Run(t, new(BloomFilterBuilderSuite))
+ suite.Run(t, new(EncryptedBloomFilterBuilderSuite))
+}
diff --git a/parquet/metadata/bloom_filter_test.go
b/parquet/metadata/bloom_filter_test.go
new file mode 100644
index 0000000..1f8a601
--- /dev/null
+++ b/parquet/metadata/bloom_filter_test.go
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+import (
+ "fmt"
+ "math/rand/v2"
+ "runtime"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow/bitutil"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestSplitBlockFilter(t *testing.T) {
+ const N = 1000
+ const S = 3
+ const P = 0.01
+
+ bf := blockSplitBloomFilter{
+ bitset32: make([]uint32, optimalNumBytes(N, P)),
+ }
+
+ p := rand.New(rand.NewPCG(S, S))
+ for i := 0; i < N; i++ {
+ bf.InsertHash(p.Uint64())
+ }
+
+ falsePositives := 0
+ p = rand.New(rand.NewPCG(S, S))
+ for i := 0; i < N; i++ {
+ x := p.Uint64()
+
+ if !bf.CheckHash(x) {
+ t.Fatalf("bloom filter block does not contain value #%d
that was inserted %d", i, x)
+ }
+
+ if bf.CheckHash(^x) {
+ falsePositives++
+ }
+ }
+
+ if r := (float64(falsePositives) / N); r > P {
+ t.Fatalf("false positive rate is too high: %f", r)
+ }
+}
+
+func testHash[T parquet.ColumnTypes](t assert.TestingT, h Hasher, vals []T) {
+ results := GetHashes(h, vals)
+ assert.Len(t, results, len(vals))
+ for i, v := range vals {
+ assert.Equal(t, GetHash(h, v), results[i])
+ }
+
+ var (
+ nvalid = int64(len(vals))
+ validBits = make([]byte, bitutil.BytesForBits(2*nvalid))
+ spacedVals = make([]T, 2*nvalid)
+ )
+
+ for i, v := range vals {
+ spacedVals[i*2] = v
+ bitutil.SetBit(validBits, i*2)
+
+ }
+
+ results = GetSpacedHashes(h, nvalid, spacedVals, validBits, 0)
+ assert.Len(t, results, len(vals))
+ for i, v := range vals {
+ assert.Equal(t, GetHash(h, v), results[i])
+ }
+}
+
+func TestGetHashes(t *testing.T) {
+ var (
+ h xxhasher
+ valsBA = []parquet.ByteArray{
+ []byte("hello"),
+ []byte("world"),
+ }
+
+ valsFLBA = []parquet.FixedLenByteArray{
+ []byte("hello"),
+ []byte("world"),
+ }
+
+ valsI32 = []int32{42, 43}
+ )
+
+ assert.Len(t, GetSpacedHashes[int32](h, 0, nil, nil, 0), 0)
+
+ testHash(t, h, valsBA)
+ testHash(t, h, valsFLBA)
+ testHash(t, h, valsI32)
+}
+
+func TestNewBloomFilter(t *testing.T) {
+ tests := []struct {
+ ndv uint32
+ fpp float64
+ maxBytes int64
+ expectedBytes int64
+ }{
+ {1, 0.09, 0, 0},
+ // cap at maximumBloomFilterBytes
+ {1024 * 1024 * 128, 0.9, maximumBloomFilterBytes + 1,
maximumBloomFilterBytes},
+ // round to power of 2
+ {1024 * 1024, 0.01, maximumBloomFilterBytes, 1 << 21},
+ }
+
+ for _, tt := range tests {
+ t.Run(fmt.Sprintf("ndv=%d,fpp=%0.3f", tt.ndv, tt.fpp), func(t
*testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ {
+ bf := NewBloomFilterFromNDVAndFPP(tt.ndv,
tt.fpp, tt.maxBytes, mem)
+ assert.EqualValues(t, tt.expectedBytes,
bf.Size())
+ runtime.GC()
+ }
+ runtime.GC() // force GC to run and do the cleanup
routines
+ })
+ }
+}
+
+func BenchmarkFilterInsert(b *testing.B) {
+ bf := blockSplitBloomFilter{bitset32: make([]uint32, 8)}
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bf.InsertHash(uint64(i))
+ }
+ b.SetBytes(bytesPerFilterBlock)
+}
+
+func BenchmarkFilterCheck(b *testing.B) {
+ bf := blockSplitBloomFilter{bitset32: make([]uint32, 8)}
+ bf.InsertHash(42)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bf.CheckHash(42)
+ }
+ b.SetBytes(bytesPerFilterBlock)
+}
+
+func BenchmarkFilterCheckBulk(b *testing.B) {
+ bf := blockSplitBloomFilter{bitset32: make([]uint32,
99*bitsSetPerBlock)}
+ x := make([]uint64, 16)
+ r := rand.New(rand.NewPCG(0, 0))
+ for i := range x {
+ x[i] = r.Uint64()
+ }
+
+ bf.InsertBulk(x)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bf.CheckBulk(x)
+ }
+ b.SetBytes(bytesPerFilterBlock * int64(len(x)))
+}
+
+func BenchmarkFilterInsertBulk(b *testing.B) {
+ bf := blockSplitBloomFilter{bitset32: make([]uint32,
99*bitsSetPerBlock)}
+ x := make([]uint64, 16)
+ r := rand.New(rand.NewPCG(0, 0))
+ for i := range x {
+ x[i] = r.Uint64()
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bf.InsertBulk(x)
+ }
+ b.SetBytes(bytesPerFilterBlock * int64(len(x)))
+}
diff --git a/parquet/metadata/cleanup_bloom_filter.go
b/parquet/metadata/cleanup_bloom_filter.go
new file mode 100644
index 0000000..ed835c3
--- /dev/null
+++ b/parquet/metadata/cleanup_bloom_filter.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.24
+
+package metadata
+
+import (
+ "runtime"
+ "sync"
+
+ "github.com/apache/arrow-go/v18/arrow/memory"
+)
+
+func addCleanup(bf *blockSplitBloomFilter, bufferPool *sync.Pool) {
+ runtime.AddCleanup(bf, func(data *memory.Buffer) {
+ if bufferPool != nil {
+ data.ResizeNoShrink(0)
+ bufferPool.Put(data)
+ } else {
+ data.Release()
+ }
+ }, bf.data)
+}
diff --git a/parquet/metadata/cleanup_bloom_filter_go1.23.go
b/parquet/metadata/cleanup_bloom_filter_go1.23.go
new file mode 100644
index 0000000..b4bffbe
--- /dev/null
+++ b/parquet/metadata/cleanup_bloom_filter_go1.23.go
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !go1.24
+
+package metadata
+
+import (
+ "runtime"
+ "sync"
+)
+
+func addCleanup(bf *blockSplitBloomFilter, bufferPool *sync.Pool) {
+ runtime.SetFinalizer(bf, func(f *blockSplitBloomFilter) {
+ if bufferPool != nil {
+ f.data.ResizeNoShrink(0)
+ bufferPool.Put(f.data)
+ } else {
+ f.data.Release()
+ }
+ })
+}
diff --git a/parquet/metadata/column_chunk.go b/parquet/metadata/column_chunk.go
index 22c9b9e..848d20f 100644
--- a/parquet/metadata/column_chunk.go
+++ b/parquet/metadata/column_chunk.go
@@ -218,6 +218,13 @@ func (c *ColumnChunkMetaData) BloomFilterOffset() int64 {
return c.columnMeta.GetBloomFilterOffset()
}
+// BloomFilterLength is the length of the serialized bloomfilter including the
+// serialized bloom filter header. This was only added in 2.10 so it may not
exist,
+// returning 0 in that case.
+func (c *ColumnChunkMetaData) BloomFilterLength() int32 {
+ return c.columnMeta.GetBloomFilterLength()
+}
+
// StatsSet returns true only if there are statistics set in the metadata and
the column
// descriptor has a sort order that is not SortUnknown
//
@@ -267,6 +274,7 @@ type ColumnChunkMetaDataBuilder struct {
compressedSize int64
uncompressedSize int64
+ fileOffset int64
}
func NewColumnChunkMetaDataBuilder(props *parquet.WriterProperties, column
*schema.Column) *ColumnChunkMetaDataBuilder {
@@ -347,14 +355,15 @@ type EncodingStats struct {
}
// Finish finalizes the metadata with the given offsets,
-// flushes any compression that needs to be done, and performs
-// any encryption if an encryptor is provided.
-func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict,
dictFallback bool, encStats EncodingStats, metaEncryptor encryption.Encryptor)
error {
+// flushes any compression that needs to be done.
+// Encryption will be performed by calling PopulateCryptoData
+// after this function is called.
+func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict,
dictFallback bool, encStats EncodingStats) error {
if info.DictPageOffset > 0 {
c.chunk.MetaData.DictionaryPageOffset = &info.DictPageOffset
- c.chunk.FileOffset = info.DictPageOffset + info.CompressedSize
+ c.fileOffset = info.DictPageOffset
} else {
- c.chunk.FileOffset = info.DataPageOffset + info.CompressedSize
+ c.fileOffset = info.DataPageOffset
}
c.chunk.MetaData.NumValues = info.NumValues
@@ -411,6 +420,10 @@ func (c *ColumnChunkMetaDataBuilder) Finish(info
ChunkMetaInfo, hasDict, dictFal
}
c.chunk.MetaData.EncodingStats = thriftEncodingStats
+ return nil
+}
+
+func (c *ColumnChunkMetaDataBuilder) PopulateCryptoData(encryptor
encryption.Encryptor) error {
encryptProps := c.props.ColumnEncryptionProperties(c.column.Path())
if encryptProps != nil && encryptProps.IsEncrypted() {
ccmd := format.NewColumnCryptoMetaData()
@@ -436,7 +449,7 @@ func (c *ColumnChunkMetaDataBuilder) Finish(info
ChunkMetaInfo, hasDict, dictFal
return err
}
var buf bytes.Buffer
- metaEncryptor.Encrypt(&buf, data)
+ encryptor.Encrypt(&buf, data)
c.chunk.EncryptedColumnMetadata = buf.Bytes()
if encryptedFooter {
diff --git a/parquet/metadata/file.go b/parquet/metadata/file.go
index 39a3192..95d3813 100644
--- a/parquet/metadata/file.go
+++ b/parquet/metadata/file.go
@@ -47,6 +47,7 @@ type FileMetaDataBuilder struct {
currentRgBldr *RowGroupMetaDataBuilder
kvmeta KeyValueMetadata
cryptoMetadata *format.FileCryptoMetaData
+ fileEncryptor encryption.FileEncryptor
}
// NewFileMetadataBuilder will use the default writer properties if nil is
passed for
@@ -65,6 +66,10 @@ func NewFileMetadataBuilder(schema *schema.Schema, props
*parquet.WriterProperti
}
}
+func (f *FileMetaDataBuilder) SetFileEncryptor(encryptor
encryption.FileEncryptor) {
+ f.fileEncryptor = encryptor
+}
+
// GetFileCryptoMetaData returns the cryptographic information for encrypting/
// decrypting the file.
func (f *FileMetaDataBuilder) GetFileCryptoMetaData() *FileCryptoMetadata {
@@ -92,6 +97,7 @@ func (f *FileMetaDataBuilder) AppendRowGroup()
*RowGroupMetaDataBuilder {
rg := format.NewRowGroup()
f.rowGroups = append(f.rowGroups, rg)
f.currentRgBldr = NewRowGroupMetaDataBuilder(f.props, f.schema, rg)
+ f.currentRgBldr.fileEncryptor = f.fileEncryptor
return f.currentRgBldr
}
diff --git a/parquet/metadata/metadata_test.go
b/parquet/metadata/metadata_test.go
index 2f35d63..fccfbe4 100644
--- a/parquet/metadata/metadata_test.go
+++ b/parquet/metadata/metadata_test.go
@@ -46,8 +46,8 @@ func generateTableMetaData(schema *schema.Schema, props
*parquet.WriterPropertie
statsFloat.Signed = true
col2Builder.SetStats(statsFloat)
- col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 4, 0, 10,
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats,
dataEncodingStats}, nil)
- col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 24, 0, 30,
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats,
dataEncodingStats}, nil)
+ col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 4, 0, 10,
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats,
dataEncodingStats})
+ col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 24, 0, 30,
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats,
dataEncodingStats})
rg1Builder.SetNumRows(nrows / 2)
rg1Builder.Finish(1024, -1)
@@ -60,8 +60,8 @@ func generateTableMetaData(schema *schema.Schema, props
*parquet.WriterPropertie
col1Builder.SetStats(statsInt)
col2Builder.SetStats(statsFloat)
dictEncodingStats = make(map[parquet.Encoding]int32)
- col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 0
/*dictionary page offset*/, 0, 10, 512, 600}, false /* has dictionary */,
false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
- col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 16, 0, 26,
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats,
dataEncodingStats}, nil)
+ col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 0
/*dictionary page offset*/, 0, 10, 512, 600}, false /* has dictionary */,
false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats})
+ col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 16, 0, 26,
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats,
dataEncodingStats})
rg2Builder.SetNumRows(nrows / 2)
rg2Builder.Finish(1024, -1)
diff --git a/parquet/metadata/row_group.go b/parquet/metadata/row_group.go
index e4ec4c7..5ccd2e6 100644
--- a/parquet/metadata/row_group.go
+++ b/parquet/metadata/row_group.go
@@ -99,6 +99,8 @@ type RowGroupMetaDataBuilder struct {
schema *schema.Schema
colBuilders []*ColumnChunkMetaDataBuilder
nextCol int
+
+ fileEncryptor encryption.FileEncryptor
}
// NewRowGroupMetaDataBuilder returns a builder using the given properties and
underlying thrift object.
@@ -166,21 +168,24 @@ func (r *RowGroupMetaDataBuilder) Finish(_ int64, ordinal
int16) error {
totalUncompressed int64
)
- for idx, col := range r.rg.Columns {
- if col.FileOffset < 0 {
- return fmt.Errorf("parquet: Column %d is not complete",
idx)
- }
+ for idx := range r.rg.Columns {
if idx == 0 {
- if col.MetaData.IsSetDictionaryPageOffset() &&
col.MetaData.GetDictionaryPageOffset() > 0 {
- fileOffset =
col.MetaData.GetDictionaryPageOffset()
- } else {
- fileOffset = col.MetaData.DataPageOffset
- }
+ fileOffset = r.colBuilders[idx].fileOffset
}
+
// sometimes column metadata is encrypted and not available to
read
// so we must get total compressed size from column builder
totalCompressed += r.colBuilders[idx].TotalCompressedSize()
totalUncompressed += r.colBuilders[idx].TotalUncompressedSize()
+
+ if r.fileEncryptor != nil {
+ enc :=
r.fileEncryptor.GetColumnMetaEncryptor(r.colBuilders[idx].Descr().Path())
+ if enc != nil {
+
enc.UpdateAad(encryption.CreateModuleAad(enc.FileAad(),
encryption.ColumnMetaModule,
+ ordinal, int16(idx), -1))
+ r.colBuilders[idx].PopulateCryptoData(enc)
+ }
+ }
}
if len(r.props.SortingColumns()) > 0 {