This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 6576e9c  feat(parquet/metadata): bloom filter implementation (#336)
6576e9c is described below

commit 6576e9cf6da9d9e5e7398242381abd5d800b96b2
Author: Matt Topol <[email protected]>
AuthorDate: Tue Apr 1 10:51:07 2025 -0400

    feat(parquet/metadata): bloom filter implementation (#336)
    
    ### Rationale for this change
    As with many other parquet reader/writers we should add support for
    bloom filters.
    
    ### What changes are included in this PR?
    This only adds an implementation to the `metadata` package to represent
    bloom filters and process them for metadata reading and writing. This
    does not yet wire it up through the actual parquet file reader and
    writer. That will be done in a subsequent PR.
    
    ### Are these changes tested?
    Yes, unit tests are included.
    
    ### Are there any user-facing changes?
    Only the addition of the new functions that are exposed in the metadata
    package.
---
 .github/workflows/test.yml                         |   2 +-
 go.mod                                             |   1 +
 go.sum                                             |   2 +
 parquet/file/file_writer.go                        |   1 +
 parquet/file/page_writer.go                        |   4 +-
 parquet/internal/encryption/aes.go                 |   2 +
 parquet/metadata/Makefile                          |  64 +++
 parquet/metadata/_lib/arch.h                       |  29 ++
 parquet/metadata/_lib/bloom_filter_block.c         |  64 +++
 .../metadata/_lib/bloom_filter_block_avx2_amd64.s  | 295 +++++++++++
 .../metadata/_lib/bloom_filter_block_sse4_amd64.s  | 322 ++++++++++++
 parquet/metadata/adaptive_bloom_filter.go          | 224 ++++++++
 parquet/metadata/bloom_filter.go                   | 565 +++++++++++++++++++++
 parquet/metadata/bloom_filter_block.go             |  53 ++
 parquet/metadata/bloom_filter_block_amd64.go       |  36 ++
 parquet/metadata/bloom_filter_block_avx2_amd64.go  |  45 ++
 parquet/metadata/bloom_filter_block_avx2_amd64.s   | 151 ++++++
 parquet/metadata/bloom_filter_block_default.go     |  23 +
 parquet/metadata/bloom_filter_block_sse4_amd64.go  |  45 ++
 parquet/metadata/bloom_filter_block_sse4_amd64.s   | 176 +++++++
 parquet/metadata/bloom_filter_reader_test.go       | 275 ++++++++++
 parquet/metadata/bloom_filter_test.go              | 190 +++++++
 parquet/metadata/cleanup_bloom_filter.go           |  37 ++
 parquet/metadata/cleanup_bloom_filter_go1.23.go    |  35 ++
 parquet/metadata/column_chunk.go                   |  25 +-
 parquet/metadata/file.go                           |   6 +
 parquet/metadata/metadata_test.go                  |   8 +-
 parquet/metadata/row_group.go                      |  23 +-
 28 files changed, 2681 insertions(+), 22 deletions(-)

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index f8ae5d5..fa45c52 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -63,7 +63,7 @@ jobs:
         with:
           submodules: recursive
       - name: Login to GitHub Container registry
-        uses: docker/login-action@v3
+        uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # 
v3.4.0
         with:
           registry: ghcr.io
           username: ${{ github.actor }}
diff --git a/go.mod b/go.mod
index d056843..cd0bff1 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ toolchain go1.23.2
 require (
        github.com/andybalholm/brotli v1.1.1
        github.com/apache/thrift v0.21.0
+       github.com/cespare/xxhash/v2 v2.3.0
        github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
        github.com/goccy/go-json v0.10.5
        github.com/golang/snappy v1.0.0
diff --git a/go.sum b/go.sum
index fc2a1f7..6da30a0 100644
--- a/go.sum
+++ b/go.sum
@@ -24,6 +24,8 @@ github.com/antlr4-go/antlr/v4 v4.13.1/go.mod 
h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmO
 github.com/apache/thrift v0.21.0 
h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
 github.com/apache/thrift v0.21.0/go.mod 
h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
 github.com/atomicgo/cursor v0.0.1/go.mod 
h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
+github.com/cespare/xxhash/v2 v2.3.0 
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/cockroachdb/apd/v3 v3.2.1 
h1:U+8j7t0axsIgvQUqthuNm82HIrYXodOV2iWLWtEaIwg=
 github.com/cockroachdb/apd/v3 v3.2.1/go.mod 
h1:klXJcjp+FffLTHlhIG69tezTDvdP065naDsHzKhYSqc=
 github.com/containerd/console v1.0.3 
h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw=
diff --git a/parquet/file/file_writer.go b/parquet/file/file_writer.go
index 68b5eee..c9b913b 100644
--- a/parquet/file/file_writer.go
+++ b/parquet/file/file_writer.go
@@ -158,6 +158,7 @@ func (fw *Writer) startFile() {
                }
 
                fw.fileEncryptor = encryption.NewFileEncryptor(encryptionProps, 
fw.props.Allocator())
+               fw.metadata.SetFileEncryptor(fw.fileEncryptor)
                if encryptionProps.EncryptedFooter() {
                        magic = magicEBytes
                }
diff --git a/parquet/file/page_writer.go b/parquet/file/page_writer.go
index 1718292..8d22ef8 100644
--- a/parquet/file/page_writer.go
+++ b/parquet/file/page_writer.go
@@ -220,7 +220,7 @@ func (pw *serializedPageWriter) Close(hasDict, fallback 
bool) error {
                DataEncodingStats: pw.dataEncodingStats,
        }
        pw.FinishPageIndexes(0)
-       pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats, 
pw.metaEncryptor)
+       pw.metaData.Finish(chunkInfo, hasDict, fallback, encodingStats)
        _, err := pw.metaData.WriteTo(pw.sink)
        return err
 }
@@ -505,7 +505,7 @@ func (bw *bufferedPageWriter) Close(hasDict, fallback bool) 
error {
                DictEncodingStats: bw.pager.dictEncodingStats,
                DataEncodingStats: bw.pager.dataEncodingStats,
        }
-       bw.metadata.Finish(chunkInfo, hasDict, fallback, encodingStats, 
bw.pager.metaEncryptor)
+       bw.metadata.Finish(chunkInfo, hasDict, fallback, encodingStats)
        bw.pager.FinishPageIndexes(position)
 
        bw.metadata.WriteTo(bw.inMemSink)
diff --git a/parquet/internal/encryption/aes.go 
b/parquet/internal/encryption/aes.go
index 0c16fac..06c9c90 100644
--- a/parquet/internal/encryption/aes.go
+++ b/parquet/internal/encryption/aes.go
@@ -54,6 +54,8 @@ const (
        DictPageHeaderModule
        ColumnIndexModule
        OffsetIndexModule
+       BloomFilterHeaderModule
+       BloomFilterBitsetModule
 )
 
 type aesEncryptor struct {
diff --git a/parquet/metadata/Makefile b/parquet/metadata/Makefile
new file mode 100644
index 0000000..cfcc8ef
--- /dev/null
+++ b/parquet/metadata/Makefile
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# this converts rotate instructions from "ro[lr] <reg>" -> "ro[lr] <reg>, 1" 
for yasm compatibility
+PERL_FIXUP_ROTATE=perl -i -pe 's/(ro[rl]\s+\w{2,3})$$/\1, 1/'
+
+C2GOASM=c2goasm
+CC=clang-19
+C_FLAGS=-target x86_64-unknown-none -masm=intel -mno-red-zone -mstackrealign 
-mllvm -inline-threshold=1000 \
+                               -fno-asynchronous-unwind-tables -fno-exceptions 
-fno-rtti -O3 -fno-builtin -ffast-math -fno-jump-tables -I_lib
+ASM_FLAGS_AVX2=-mavx2 -mfma
+ASM_FLAGS_SSE4=-msse4
+ASM_FLAGS_BMI2=-mbmi2
+ASM_FLAGS_POPCNT=-mpopcnt
+
+C_FLAGS_NEON=-O3 -fvectorize -mllvm -force-vector-width=16 
-fno-asynchronous-unwind-tables -mno-red-zone -mstackrealign -fno-exceptions \
+       -fno-rtti -fno-builtin -ffast-math -fno-jump-tables -I_lib
+
+GO_SOURCES  := $(shell find . -path ./_lib -prune -o -name '*.go' -not -name 
'*_test.go')
+ALL_SOURCES := $(shell find . -path ./_lib -prune -o -name '*.go' -name '*.s' 
-not -name '*_test.go')
+
+.PHONY: assembly
+
+INTEL_SOURCES := \
+       bloom_filter_block_avx2_amd64.s bloom_filter_block_sse4_amd64.s 
+
+ARM_SOURCES := \
+       bloom_filter_block_neon_arm64.s
+
+
+assembly: $(INTEL_SOURCES)
+
+_lib/bloom_filter_block_avx2_amd64.s: _lib/bloom_filter_block.c
+       $(CC) -S $(C_FLAGS) $(ASM_FLAGS_AVX2) $^ -o $@ ; $(PERL_FIXUP_ROTATE) 
$@; perl -i -pe 's/mem(cpy|set)/clib·_mem\1(SB)/' $@
+
+_lib/bloom_filter_block_sse4_amd64.s: _lib/bloom_filter_block.c
+       $(CC) -S $(C_FLAGS) $(ASM_FLAGS_SSE4) $^ -o $@ ; $(PERL_FIXUP_ROTATE) 
$@; perl -i -pe 's/mem(cpy|set)/clib·_mem\1(SB)/' $@
+
+# neon not supported by c2goasm, will have to do it manually 
+#_lib/bloom_filter_block_neon.s: _lib/bloom_filter_block.c
+#      $(CC) -S $(C_FLAGS_NEON) $^ -o $@ ; $(PERL_FIXUP_ROTATE) $@
+
+bloom_filter_block_avx2_amd64.s: _lib/bloom_filter_block_avx2_amd64.s
+       $(C2GOASM) -a -f $^ $@
+
+bloom_filter_block_sse4_amd64.s: _lib/bloom_filter_block_sse4_amd64.s
+       $(C2GOASM) -a -f $^ $@
+
+clean:
+       rm -f $(INTEL_SOURCES)
+       rm -f $(addprefix _lib/,$(INTEL_SOURCES))
\ No newline at end of file
diff --git a/parquet/metadata/_lib/arch.h b/parquet/metadata/_lib/arch.h
new file mode 100644
index 0000000..165d120
--- /dev/null
+++ b/parquet/metadata/_lib/arch.h
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#undef FULL_NAME
+
+#if defined(__AVX2__)
+    #define FULL_NAME(x) x##_avx2
+#elif __SSE4_2__ == 1
+    #define FULL_NAME(x) x##_sse4
+#elif __SSE3__ == 1
+    #define FULL_NAME(x) x##_sse3
+#elif defined(__ARM_NEON) || defined(__ARM_NEON__)
+    #define FULL_NAME(x) x##_neon
+#else
+    #define FULL_NAME(x) x##_x86
+#endif
\ No newline at end of file
diff --git a/parquet/metadata/_lib/bloom_filter_block.c 
b/parquet/metadata/_lib/bloom_filter_block.c
new file mode 100644
index 0000000..dced68b
--- /dev/null
+++ b/parquet/metadata/_lib/bloom_filter_block.c
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "arch.h"
+#include <stdint.h>
+#include <stdbool.h>
+
+// algorithms defined in 
https://github.com/apache/parquet-format/blob/master/BloomFilter.md
+// describing the proper definitions for the bloom filter hash functions
+// to be compatible with the parquet format
+
+#define bitsSetPerBlock 8
+static const uint32_t SALT[bitsSetPerBlock] = {
+    0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+    0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};
+
+#define PREDICT_FALSE(x) (__builtin_expect(!!(x), 0))
+
+bool FULL_NAME(check_block)(const uint32_t blocks[], const int len, const 
uint64_t hash){
+    const uint32_t bucket_index =
+        (uint32_t)(((hash >> 32) * (uint64_t)(len/8)) >> 32);
+    const uint32_t key = (uint32_t)hash;
+
+    for (int i = 0; i < bitsSetPerBlock; ++i)
+    {
+        const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+        if (PREDICT_FALSE(0 == (blocks[bitsSetPerBlock * bucket_index + i] & 
mask)))
+        {
+            return false;
+        }
+    }
+    return true;
+}
+
+void FULL_NAME(insert_block)(uint32_t blocks[], const int len, const uint64_t 
hash) {
+    const uint32_t bucket_index =
+        (uint32_t)(((hash >> 32) * (uint64_t)(len/8)) >> 32);
+    const uint32_t key = (uint32_t)hash;
+
+    for (int i = 0; i < bitsSetPerBlock; ++i)
+    {
+        const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+        blocks[bitsSetPerBlock * bucket_index + i] |= mask;
+    }
+}
+
+void FULL_NAME(insert_bulk)(uint32_t blocks[], const int block_len, const 
uint64_t hashes[], const int num_hashes) {
+    for (int i = 0; i < num_hashes; ++i) {
+        FULL_NAME(insert_block)(blocks, block_len, hashes[i]);
+    }
+}
\ No newline at end of file
diff --git a/parquet/metadata/_lib/bloom_filter_block_avx2_amd64.s 
b/parquet/metadata/_lib/bloom_filter_block_avx2_amd64.s
new file mode 100644
index 0000000..4bb0877
--- /dev/null
+++ b/parquet/metadata/_lib/bloom_filter_block_avx2_amd64.s
@@ -0,0 +1,295 @@
+       .text
+       .intel_syntax noprefix
+       .file   "bloom_filter_block.c"
+       .globl  check_block_avx2                # -- Begin function 
check_block_avx2
+       .p2align        4, 0x90
+       .type   check_block_avx2,@function
+check_block_avx2:                       # @check_block_avx2
+# %bb.0:
+       push    rbp
+       mov     rbp, rsp
+       and     rsp, -8
+                                        # kill: def $esi killed $esi def $rsi
+       mov     rcx, rdx
+       shr     rcx, 32
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       cdqe
+       imul    rax, rcx
+       shr     rax, 29
+       and     eax, -8
+       imul    ecx, edx, 1203114875
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.1:
+       imul    ecx, edx, 1150766481
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 4]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.2:
+       imul    ecx, edx, -2010862245
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 8]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.3:
+       imul    ecx, edx, -1565054819
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 12]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.4:
+       imul    ecx, edx, 1884591559
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 16]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.5:
+       imul    ecx, edx, 770785867
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 20]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.6:
+       imul    ecx, edx, -1627633337
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 24]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.7:
+       imul    ecx, edx, 1550580529
+       shr     ecx, 27
+       mov     eax, dword ptr [rdi + 4*rax + 28]
+       bt      eax, ecx
+       setb    al
+                                        # kill: def $al killed $al killed $eax
+       mov     rsp, rbp
+       pop     rbp
+       ret
+.LBB0_8:
+       xor     eax, eax
+                                        # kill: def $al killed $al killed $eax
+       mov     rsp, rbp
+       pop     rbp
+       ret
+.Lfunc_end0:
+       .size   check_block_avx2, .Lfunc_end0-check_block_avx2
+                                        # -- End function
+       .globl  check_bulk_avx2                 # -- Begin function 
check_bulk_avx2
+       .p2align        4, 0x90
+       .type   check_bulk_avx2,@function
+check_bulk_avx2:                        # @check_bulk_avx2
+# %bb.0:
+                                        # kill: def $esi killed $esi def $rsi
+       test    r8d, r8d
+       jle     .LBB1_19
+# %bb.1:
+       push    rbp
+       mov     rbp, rsp
+       push    rbx
+       and     rsp, -8
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       cdqe
+       mov     esi, r8d
+       xor     r8d, r8d
+       .p2align        4, 0x90
+.LBB1_4:                                # =>This Inner Loop Header: Depth=1
+       mov     r10, qword ptr [rdx + 8*r8]
+       mov     r9, r10
+       shr     r9, 32
+       imul    r9, rax
+       shr     r9, 29
+       and     r9d, -8
+       imul    r11d, r10d, 1203114875
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.5:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, 1150766481
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 4]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.6:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, -2010862245
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 8]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.7:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, -1565054819
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 12]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.8:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, 1884591559
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 16]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.9:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, 770785867
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 20]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.10:                               #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, -1627633337
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 24]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.11:                               #   in Loop: Header=BB1_4 Depth=1
+       imul    r10d, r10d, 1550580529
+       shr     r10d, 27
+       mov     r9d, dword ptr [rdi + 4*r9 + 28]
+       bt      r9d, r10d
+       setb    r9b
+       mov     byte ptr [rcx + r8], r9b
+       inc     r8
+       cmp     rsi, r8
+       jne     .LBB1_4
+       jmp     .LBB1_18
+       .p2align        4, 0x90
+.LBB1_2:                                #   in Loop: Header=BB1_4 Depth=1
+       xor     r9d, r9d
+       mov     byte ptr [rcx + r8], r9b
+       inc     r8
+       cmp     rsi, r8
+       jne     .LBB1_4
+.LBB1_18:
+       # lea   rsp, [rbp - 8]
+       pop     rbx
+       pop     rbp
+.LBB1_19:
+       ret
+.Lfunc_end1:
+       .size   check_bulk_avx2, .Lfunc_end1-check_bulk_avx2
+                                        # -- End function
+       .section        .rodata.cst32,"aM",@progbits,32
+       .p2align        5, 0x0                          # -- Begin function 
insert_block_avx2
+.LCPI2_0:
+       .long   1203114875                      # 0x47b6137b
+       .long   1150766481                      # 0x44974d91
+       .long   2284105051                      # 0x8824ad5b
+       .long   2729912477                      # 0xa2b7289d
+       .long   1884591559                      # 0x705495c7
+       .long   770785867                       # 0x2df1424b
+       .long   2667333959                      # 0x9efc4947
+       .long   1550580529                      # 0x5c6bfb31
+       .section        .rodata.cst4,"aM",@progbits,4
+       .p2align        2, 0x0
+.LCPI2_1:
+       .long   1                               # 0x1
+       .text
+       .globl  insert_block_avx2
+       .p2align        4, 0x90
+       .type   insert_block_avx2,@function
+insert_block_avx2:                      # @insert_block_avx2
+# %bb.0:
+       push    rbp
+       mov     rbp, rsp
+       and     rsp, -8
+                                        # kill: def $esi killed $esi def $rsi
+       vmovd   xmm0, edx
+       shr     rdx, 32
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       cdqe
+       imul    rax, rdx
+       shr     rax, 27
+       movabs  rcx, 17179869152
+       vpbroadcastd    ymm0, xmm0
+       vpmulld ymm0, ymm0, ymmword ptr [rip + .LCPI2_0]
+       and     rcx, rax
+       vpsrld  ymm0, ymm0, 27
+       vpbroadcastd    ymm1, dword ptr [rip + .LCPI2_1] # ymm1 = 
[1,1,1,1,1,1,1,1]
+       vpsllvd ymm0, ymm1, ymm0
+       vpor    ymm0, ymm0, ymmword ptr [rdi + rcx]
+       vmovdqu ymmword ptr [rdi + rcx], ymm0
+       mov     rsp, rbp
+       pop     rbp
+       vzeroupper
+       ret
+.Lfunc_end2:
+       .size   insert_block_avx2, .Lfunc_end2-insert_block_avx2
+                                        # -- End function
+       .section        .rodata.cst32,"aM",@progbits,32
+       .p2align        5, 0x0                          # -- Begin function 
insert_bulk_avx2
+.LCPI3_0:
+       .long   1203114875                      # 0x47b6137b
+       .long   1150766481                      # 0x44974d91
+       .long   2284105051                      # 0x8824ad5b
+       .long   2729912477                      # 0xa2b7289d
+       .long   1884591559                      # 0x705495c7
+       .long   770785867                       # 0x2df1424b
+       .long   2667333959                      # 0x9efc4947
+       .long   1550580529                      # 0x5c6bfb31
+       .section        .rodata.cst4,"aM",@progbits,4
+       .p2align        2, 0x0
+.LCPI3_1:
+       .long   1                               # 0x1
+       .text
+       .globl  insert_bulk_avx2
+       .p2align        4, 0x90
+       .type   insert_bulk_avx2,@function
+insert_bulk_avx2:                       # @insert_bulk_avx2
+# %bb.0:
+                                        # kill: def $esi killed $esi def $rsi
+       test    ecx, ecx
+       jle     .LBB3_4
+# %bb.1:
+       push    rbp
+       mov     rbp, rsp
+       and     rsp, -8
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       cdqe
+       mov     ecx, ecx
+       xor     esi, esi
+       movabs  r8, 17179869152
+       vmovdqa ymm0, ymmword ptr [rip + .LCPI3_0] # ymm0 = 
[1203114875,1150766481,2284105051,2729912477,1884591559,770785867,2667333959,1550580529]
+       vpbroadcastd    ymm1, dword ptr [rip + .LCPI3_1] # ymm1 = 
[1,1,1,1,1,1,1,1]
+       .p2align        4, 0x90
+.LBB3_2:                                # =>This Inner Loop Header: Depth=1
+       mov     r9, qword ptr [rdx + 8*rsi]
+       vmovd   xmm2, r9d
+       shr     r9, 32
+       imul    r9, rax
+       shr     r9, 27
+       and     r9, r8
+       vpbroadcastd    ymm2, xmm2
+       vpmulld ymm2, ymm2, ymm0
+       vpsrld  ymm2, ymm2, 27
+       vpsllvd ymm2, ymm1, ymm2
+       vpor    ymm2, ymm2, ymmword ptr [rdi + r9]
+       vmovdqu ymmword ptr [rdi + r9], ymm2
+       inc     rsi
+       cmp     rcx, rsi
+       jne     .LBB3_2
+# %bb.3:
+       mov     rsp, rbp
+       pop     rbp
+.LBB3_4:
+       vzeroupper
+       ret
+.Lfunc_end3:
+       .size   insert_bulk_avx2, .Lfunc_end3-insert_bulk_avx2
+                                        # -- End function
+       .ident  "clang version 19.1.6 
(https://github.com/conda-forge/clangdev-feedstock 
a097c63bb6a9919682224023383a143d482c552e)"
+       .section        ".note.GNU-stack","",@progbits
+       .addrsig
\ No newline at end of file
diff --git a/parquet/metadata/_lib/bloom_filter_block_sse4_amd64.s 
b/parquet/metadata/_lib/bloom_filter_block_sse4_amd64.s
new file mode 100644
index 0000000..593b61b
--- /dev/null
+++ b/parquet/metadata/_lib/bloom_filter_block_sse4_amd64.s
@@ -0,0 +1,322 @@
+       .text
+       .intel_syntax noprefix
+       .file   "bloom_filter_block.c"
+       .globl  check_block_sse4                # -- Begin function 
check_block_sse4
+       .p2align        4, 0x90
+       .type   check_block_sse4,@function
+check_block_sse4:                       # @check_block_sse4
+# %bb.0:
+       push    rbp
+       mov     rbp, rsp
+       and     rsp, -8
+                                        # kill: def $esi killed $esi def $rsi
+       mov     rcx, rdx
+       shr     rcx, 32
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       cdqe
+       imul    rax, rcx
+       shr     rax, 29
+       and     eax, -8
+       imul    ecx, edx, 1203114875
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.1:
+       imul    ecx, edx, 1150766481
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 4]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.2:
+       imul    ecx, edx, -2010862245
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 8]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.3:
+       imul    ecx, edx, -1565054819
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 12]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.4:
+       imul    ecx, edx, 1884591559
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 16]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.5:
+       imul    ecx, edx, 770785867
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 20]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.6:
+       imul    ecx, edx, -1627633337
+       shr     ecx, 27
+       mov     esi, dword ptr [rdi + 4*rax + 24]
+       bt      esi, ecx
+       jae     .LBB0_8
+# %bb.7:
+       imul    ecx, edx, 1550580529
+       shr     ecx, 27
+       mov     eax, dword ptr [rdi + 4*rax + 28]
+       bt      eax, ecx
+       setb    al
+                                        # kill: def $al killed $al killed $eax
+       mov     rsp, rbp
+       pop     rbp
+       ret
+.LBB0_8:
+       xor     eax, eax
+                                        # kill: def $al killed $al killed $eax
+       mov     rsp, rbp
+       pop     rbp
+       ret
+.Lfunc_end0:
+       .size   check_block_sse4, .Lfunc_end0-check_block_sse4
+                                        # -- End function
+       .globl  check_bulk_sse4                 # -- Begin function 
check_bulk_sse4
+       .p2align        4, 0x90
+       .type   check_bulk_sse4,@function
+check_bulk_sse4:                        # @check_bulk_sse4
+# %bb.0:
+                                        # kill: def $esi killed $esi def $rsi
+       test    r8d, r8d
+       jle     .LBB1_19
+# %bb.1:
+       push    rbp
+       mov     rbp, rsp
+       push    rbx
+       and     rsp, -8
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       cdqe
+       mov     esi, r8d
+       xor     r8d, r8d
+       .p2align        4, 0x90
+.LBB1_4:                                # =>This Inner Loop Header: Depth=1
+       mov     r10, qword ptr [rdx + 8*r8]
+       mov     r9, r10
+       shr     r9, 32
+       imul    r9, rax
+       shr     r9, 29
+       and     r9d, -8
+       imul    r11d, r10d, 1203114875
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.5:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, 1150766481
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 4]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.6:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, -2010862245
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 8]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.7:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, -1565054819
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 12]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.8:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, 1884591559
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 16]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.9:                                #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, 770785867
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 20]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.10:                               #   in Loop: Header=BB1_4 Depth=1
+       imul    r11d, r10d, -1627633337
+       shr     r11d, 27
+       mov     ebx, dword ptr [rdi + 4*r9 + 24]
+       bt      ebx, r11d
+       jae     .LBB1_2
+# %bb.11:                               #   in Loop: Header=BB1_4 Depth=1
+       imul    r10d, r10d, 1550580529
+       shr     r10d, 27
+       mov     r9d, dword ptr [rdi + 4*r9 + 28]
+       bt      r9d, r10d
+       setb    r9b
+       mov     byte ptr [rcx + r8], r9b
+       inc     r8
+       cmp     rsi, r8
+       jne     .LBB1_4
+       jmp     .LBB1_18
+       .p2align        4, 0x90
+.LBB1_2:                                #   in Loop: Header=BB1_4 Depth=1
+       xor     r9d, r9d
+       mov     byte ptr [rcx + r8], r9b
+       inc     r8
+       cmp     rsi, r8
+       jne     .LBB1_4
+.LBB1_18:
+       # lea   rsp, [rbp - 8]
+       pop     rbx
+       pop     rbp
+.LBB1_19:
+       ret
+.Lfunc_end1:
+       .size   check_bulk_sse4, .Lfunc_end1-check_bulk_sse4
+                                        # -- End function
+       .section        .rodata.cst16,"aM",@progbits,16
+       .p2align        4, 0x0                          # -- Begin function 
insert_block_sse4
+.LCPI2_0:
+       .long   1203114875                      # 0x47b6137b
+       .long   1150766481                      # 0x44974d91
+       .long   2284105051                      # 0x8824ad5b
+       .long   2729912477                      # 0xa2b7289d
+.LCPI2_1:
+       .long   1065353216                      # 0x3f800000
+       .long   1065353216                      # 0x3f800000
+       .long   1065353216                      # 0x3f800000
+       .long   1065353216                      # 0x3f800000
+.LCPI2_2:
+       .long   1884591559                      # 0x705495c7
+       .long   770785867                       # 0x2df1424b
+       .long   2667333959                      # 0x9efc4947
+       .long   1550580529                      # 0x5c6bfb31
+       .text
+       .globl  insert_block_sse4
+       .p2align        4, 0x90
+       .type   insert_block_sse4,@function
+insert_block_sse4:                      # @insert_block_sse4
+# %bb.0:
+       push    rbp
+       mov     rbp, rsp
+       and     rsp, -8
+                                        # kill: def $esi killed $esi def $rsi
+       movd    xmm0, edx
+       shr     rdx, 32
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       movsxd  rcx, eax
+       imul    rcx, rdx
+       shr     rcx, 27
+       movabs  rax, 17179869152
+       and     rax, rcx
+       pshufd  xmm0, xmm0, 0                   # xmm0 = xmm0[0,0,0,0]
+       movdqa  xmm1, xmmword ptr [rip + .LCPI2_0] # xmm1 = 
[1203114875,1150766481,2284105051,2729912477]
+       pmulld  xmm1, xmm0
+       psrld   xmm1, 27
+       pslld   xmm1, 23
+       movdqa  xmm2, xmmword ptr [rip + .LCPI2_1] # xmm2 = 
[1065353216,1065353216,1065353216,1065353216]
+       paddd   xmm1, xmm2
+       cvttps2dq       xmm1, xmm1
+       movups  xmm3, xmmword ptr [rdi + rax]
+       orps    xmm3, xmm1
+       movups  xmm1, xmmword ptr [rdi + rax + 16]
+       movups  xmmword ptr [rdi + rax], xmm3
+       pmulld  xmm0, xmmword ptr [rip + .LCPI2_2]
+       psrld   xmm0, 27
+       pslld   xmm0, 23
+       paddd   xmm0, xmm2
+       cvttps2dq       xmm0, xmm0
+       orps    xmm0, xmm1
+       movups  xmmword ptr [rdi + rax + 16], xmm0
+       mov     rsp, rbp
+       pop     rbp
+       ret
+.Lfunc_end2:
+       .size   insert_block_sse4, .Lfunc_end2-insert_block_sse4
+                                        # -- End function
+       .section        .rodata.cst16,"aM",@progbits,16
+       .p2align        4, 0x0                          # -- Begin function 
insert_bulk_sse4
+.LCPI3_0:
+       .long   1203114875                      # 0x47b6137b
+       .long   1150766481                      # 0x44974d91
+       .long   2284105051                      # 0x8824ad5b
+       .long   2729912477                      # 0xa2b7289d
+.LCPI3_1:
+       .long   1065353216                      # 0x3f800000
+       .long   1065353216                      # 0x3f800000
+       .long   1065353216                      # 0x3f800000
+       .long   1065353216                      # 0x3f800000
+.LCPI3_2:
+       .long   1884591559                      # 0x705495c7
+       .long   770785867                       # 0x2df1424b
+       .long   2667333959                      # 0x9efc4947
+       .long   1550580529                      # 0x5c6bfb31
+       .text
+       .globl  insert_bulk_sse4
+       .p2align        4, 0x90
+       .type   insert_bulk_sse4,@function
+insert_bulk_sse4:                       # @insert_bulk_sse4
+# %bb.0:
+                                        # kill: def $esi killed $esi def $rsi
+       test    ecx, ecx
+       jle     .LBB3_4
+# %bb.1:
+       push    rbp
+       mov     rbp, rsp
+       and     rsp, -8
+       lea     eax, [rsi + 7]
+       test    esi, esi
+       cmovns  eax, esi
+       sar     eax, 3
+       cdqe
+       mov     ecx, ecx
+       xor     esi, esi
+       movabs  r8, 17179869152
+       movdqa  xmm0, xmmword ptr [rip + .LCPI3_0] # xmm0 = 
[1203114875,1150766481,2284105051,2729912477]
+       movdqa  xmm1, xmmword ptr [rip + .LCPI3_1] # xmm1 = 
[1065353216,1065353216,1065353216,1065353216]
+       movdqa  xmm2, xmmword ptr [rip + .LCPI3_2] # xmm2 = 
[1884591559,770785867,2667333959,1550580529]
+       .p2align        4, 0x90
+.LBB3_2:                                # =>This Inner Loop Header: Depth=1
+       mov     r9, qword ptr [rdx + 8*rsi]
+       movd    xmm3, r9d
+       shr     r9, 32
+       imul    r9, rax
+       shr     r9, 27
+       and     r9, r8
+       pshufd  xmm3, xmm3, 0                   # xmm3 = xmm3[0,0,0,0]
+       movdqa  xmm4, xmm3
+       pmulld  xmm4, xmm0
+       psrld   xmm4, 27
+       pslld   xmm4, 23
+       paddd   xmm4, xmm1
+       cvttps2dq       xmm4, xmm4
+       movups  xmm5, xmmword ptr [rdi + r9]
+       orps    xmm5, xmm4
+       movups  xmm4, xmmword ptr [rdi + r9 + 16]
+       movups  xmmword ptr [rdi + r9], xmm5
+       pmulld  xmm3, xmm2
+       psrld   xmm3, 27
+       pslld   xmm3, 23
+       paddd   xmm3, xmm1
+       cvttps2dq       xmm3, xmm3
+       orps    xmm3, xmm4
+       movups  xmmword ptr [rdi + r9 + 16], xmm3
+       inc     rsi
+       cmp     rcx, rsi
+       jne     .LBB3_2
+# %bb.3:
+       mov     rsp, rbp
+       pop     rbp
+.LBB3_4:
+       ret
+.Lfunc_end3:
+       .size   insert_bulk_sse4, .Lfunc_end3-insert_bulk_sse4
+                                        # -- End function
+       .ident  "clang version 19.1.6 
(https://github.com/conda-forge/clangdev-feedstock 
a097c63bb6a9919682224023383a143d482c552e)"
+       .section        ".note.GNU-stack","",@progbits
+       .addrsig
\ No newline at end of file
diff --git a/parquet/metadata/adaptive_bloom_filter.go 
b/parquet/metadata/adaptive_bloom_filter.go
new file mode 100644
index 0000000..c040645
--- /dev/null
+++ b/parquet/metadata/adaptive_bloom_filter.go
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+import (
+       "io"
+       "runtime"
+       "slices"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/bitutil"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/internal/encryption"
+       format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow-go/v18/parquet/schema"
+)
+
+type bloomFilterCandidate struct {
+       bloomFilter blockSplitBloomFilter
+       expectedNDV uint32
+}
+
+func newBloomFilterCandidate(expectedNDV, numBytes, minBytes, maxBytes uint32, 
h Hasher, mem memory.Allocator) *bloomFilterCandidate {
+       if numBytes < minBytes {
+               numBytes = minBytes
+       }
+
+       if numBytes > maxBytes {
+               numBytes = maxBytes
+       }
+
+       // get next power of 2 if it's not a power of 2
+       if (numBytes & (numBytes - 1)) != 0 {
+               numBytes = uint32(bitutil.NextPowerOf2(int(numBytes)))
+       }
+
+       buf := memory.NewResizableBuffer(mem)
+       buf.ResizeNoShrink(int(numBytes))
+       bf := blockSplitBloomFilter{
+               data:         buf,
+               bitset32:     arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
+               hasher:       h,
+               algorithm:    defaultAlgorithm,
+               hashStrategy: defaultHashStrategy,
+               compression:  defaultCompression,
+       }
+       runtime.SetFinalizer(&bf, func(f *blockSplitBloomFilter) {
+               f.data.Release()
+       })
+       return &bloomFilterCandidate{bloomFilter: bf, expectedNDV: expectedNDV}
+}
+
+type adaptiveBlockSplitBloomFilter struct {
+       mem              memory.Allocator
+       candidates       []*bloomFilterCandidate
+       largestCandidate *bloomFilterCandidate
+       numDistinct      int64
+       finalized        bool
+
+       maxBytes, minBytes uint32
+       minCandidateNDV    int
+       hasher             Hasher
+       hashStrategy       format.BloomFilterHash
+       algorithm          format.BloomFilterAlgorithm
+       compression        format.BloomFilterCompression
+
+       column *schema.Column
+}
+
+func NewAdaptiveBlockSplitBloomFilter(maxBytes uint32, numCandidates int, fpp 
float64, column *schema.Column, mem memory.Allocator) BloomFilterBuilder {
+       ret := &adaptiveBlockSplitBloomFilter{
+               mem:             mem,
+               maxBytes:        min(maximumBloomFilterBytes, maxBytes),
+               minBytes:        minimumBloomFilterBytes,
+               minCandidateNDV: 16,
+               hasher:          xxhasher{},
+               column:          column,
+               hashStrategy:    defaultHashStrategy,
+               algorithm:       defaultAlgorithm,
+               compression:     defaultCompression,
+       }
+
+       ret.initCandidates(maxBytes, numCandidates, fpp)
+       return ret
+}
+
+func (b *adaptiveBlockSplitBloomFilter) getAlg() *format.BloomFilterAlgorithm {
+       return &b.algorithm
+}
+
+func (b *adaptiveBlockSplitBloomFilter) getHashStrategy() 
*format.BloomFilterHash {
+       return &b.hashStrategy
+}
+
+func (b *adaptiveBlockSplitBloomFilter) getCompression() 
*format.BloomFilterCompression {
+       return &b.compression
+}
+
+func (b *adaptiveBlockSplitBloomFilter) optimalCandidate() 
*bloomFilterCandidate {
+       return slices.MinFunc(b.candidates, func(a, b *bloomFilterCandidate) 
int {
+               return int(b.bloomFilter.Size() - a.bloomFilter.Size())
+       })
+}
+
+func (b *adaptiveBlockSplitBloomFilter) Hasher() Hasher { return b.hasher }
+
+func (b *adaptiveBlockSplitBloomFilter) InsertHash(hash uint64) {
+       if b.finalized {
+               panic("adaptive bloom filter has been marked finalized, no more 
data allowed")
+       }
+
+       if !b.largestCandidate.bloomFilter.CheckHash(hash) {
+               b.numDistinct++
+       }
+
+       b.candidates = slices.DeleteFunc(b.candidates, func(c 
*bloomFilterCandidate) bool {
+               return c.expectedNDV < uint32(b.numDistinct) && c != 
b.largestCandidate
+       })
+
+       for _, c := range b.candidates {
+               c.bloomFilter.InsertHash(hash)
+       }
+}
+
+func (b *adaptiveBlockSplitBloomFilter) InsertBulk(hashes []uint64) {
+       if b.finalized {
+               panic("adaptive bloom filter has been marked finalized, no more 
data allowed")
+       }
+
+       for _, h := range hashes {
+               if !b.largestCandidate.bloomFilter.CheckHash(h) {
+                       b.numDistinct++
+               }
+       }
+
+       b.candidates = slices.DeleteFunc(b.candidates, func(c 
*bloomFilterCandidate) bool {
+               return c.expectedNDV < uint32(b.numDistinct) && c != 
b.largestCandidate
+       })
+
+       for _, c := range b.candidates {
+               c.bloomFilter.InsertBulk(hashes)
+       }
+}
+
+func (b *adaptiveBlockSplitBloomFilter) Size() int64 {
+       return b.optimalCandidate().bloomFilter.Size()
+}
+
+func (b *adaptiveBlockSplitBloomFilter) CheckHash(hash uint64) bool {
+       return b.largestCandidate.bloomFilter.CheckHash(hash)
+}
+
+func (b *adaptiveBlockSplitBloomFilter) WriteTo(w io.Writer, enc 
encryption.Encryptor) (int, error) {
+       b.finalized = true
+
+       return b.optimalCandidate().bloomFilter.WriteTo(w, enc)
+}
+
+func (b *adaptiveBlockSplitBloomFilter) initCandidates(maxBytes uint32, 
numCandidates int, fpp float64) {
+       b.candidates = make([]*bloomFilterCandidate, 0, numCandidates)
+       candidateByteSize := b.calcBoundedPowerOf2(maxBytes)
+       for range numCandidates {
+               candidateExpectedNDV := b.expectedNDV(candidateByteSize, fpp)
+               if candidateExpectedNDV <= 0 {
+                       break
+               }
+
+               b.candidates = append(b.candidates, 
newBloomFilterCandidate(uint32(candidateExpectedNDV),
+                       candidateByteSize, b.minBytes, b.maxBytes, b.hasher, 
b.mem))
+               candidateByteSize = b.calcBoundedPowerOf2(candidateByteSize / 2)
+       }
+
+       if len(b.candidates) == 0 {
+               // maxBytes is too small, but at least one candidate will be 
generated
+               b.candidates = append(b.candidates, 
newBloomFilterCandidate(uint32(b.minCandidateNDV),
+                       b.minBytes, b.minBytes, b.maxBytes, b.hasher, b.mem))
+       }
+
+       b.largestCandidate = slices.MaxFunc(b.candidates, func(a, b 
*bloomFilterCandidate) int {
+               return int(b.bloomFilter.Size() - a.bloomFilter.Size())
+       })
+}
+
+func (b *adaptiveBlockSplitBloomFilter) expectedNDV(numBytes uint32, fpp 
float64) int {
+       var (
+               expectedNDV, optimalBytes uint32
+       )
+
+       const ndvStep = 500
+       for optimalBytes < numBytes {
+               expectedNDV += ndvStep
+               optimalBytes = optimalNumBytes(expectedNDV, fpp)
+       }
+
+       // make sure it is slightly smaller than what numBytes supports
+       expectedNDV -= ndvStep
+       return int(max(0, expectedNDV))
+}
+
+func (b *adaptiveBlockSplitBloomFilter) calcBoundedPowerOf2(numBytes uint32) 
uint32 {
+       if numBytes < b.minBytes {
+               numBytes = b.minBytes
+       }
+
+       if numBytes&(numBytes-1) != 0 {
+               numBytes = uint32(bitutil.NextPowerOf2(int(numBytes)))
+       }
+
+       return max(min(numBytes, b.maxBytes), b.minBytes)
+}
diff --git a/parquet/metadata/bloom_filter.go b/parquet/metadata/bloom_filter.go
new file mode 100644
index 0000000..cb21809
--- /dev/null
+++ b/parquet/metadata/bloom_filter.go
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+import (
+       "errors"
+       "fmt"
+       "io"
+       "math"
+       "sync"
+       "unsafe"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/bitutil"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/bitutils"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/internal/debug"
+       "github.com/apache/arrow-go/v18/parquet/internal/encryption"
+       format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
+       "github.com/apache/arrow-go/v18/parquet/internal/thrift"
+       "github.com/apache/arrow-go/v18/parquet/internal/utils"
+       "github.com/apache/arrow-go/v18/parquet/schema"
+       "github.com/cespare/xxhash/v2"
+)
+
+const (
+       bytesPerFilterBlock     = 32
+       bitsSetPerBlock         = 8
+       minimumBloomFilterBytes = bytesPerFilterBlock
+       // currently using 128MB as maximum size, should probably be 
reconsidered
+       maximumBloomFilterBytes = 128 * 1024 * 1024
+)
+
+var (
+       salt = [bitsSetPerBlock]uint32{
+               0x476137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+               0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}
+
+       defaultHashStrategy = format.BloomFilterHash{XXHASH: &format.XxHash{}}
+       defaultAlgorithm    = format.BloomFilterAlgorithm{BLOCK: 
&format.SplitBlockAlgorithm{}}
+       defaultCompression  = format.BloomFilterCompression{UNCOMPRESSED: 
&format.Uncompressed{}}
+)
+
+func optimalNumBytes(ndv uint32, fpp float64) uint32 {
+       optimalBits := optimalNumBits(ndv, fpp)
+       debug.Assert(bitutil.IsMultipleOf8(int64(optimalBits)), "optimal bits 
should be multiple of 8")
+       return optimalBits >> 3
+}
+
+func optimalNumBits(ndv uint32, fpp float64) uint32 {
+       debug.Assert(fpp > 0 && fpp < 1, "false positive prob must be in (0, 
1)")
+       var (
+               m       = -8 * float64(ndv) / math.Log(1-math.Pow(fpp, 1.0/8.0))
+               numBits uint32
+       )
+
+       if m < 0 || m > maximumBloomFilterBytes>>3 {
+               numBits = maximumBloomFilterBytes << 3
+       } else {
+               numBits = uint32(m)
+       }
+
+       // round up to lower bound
+       if numBits < minimumBloomFilterBytes<<3 {
+               numBits = minimumBloomFilterBytes << 3
+       }
+
+       // get next power of 2 if bits is not power of 2
+       if (numBits & (numBits - 1)) != 0 {
+               numBits = uint32(bitutil.NextPowerOf2(int(numBits)))
+       }
+       return numBits
+}
+
+type Hasher interface {
+       Sum64(b []byte) uint64
+       Sum64s(b [][]byte) []uint64
+}
+
+type xxhasher struct{}
+
+func (xxhasher) Sum64(b []byte) uint64 {
+       return xxhash.Sum64(b)
+}
+
+func (xxhasher) Sum64s(b [][]byte) (vals []uint64) {
+       vals = make([]uint64, len(b))
+       for i, v := range b {
+               vals[i] = xxhash.Sum64(v)
+       }
+       return
+}
+
+func GetHash[T parquet.ColumnTypes](h Hasher, v T) uint64 {
+       return h.Sum64(getBytes(v))
+}
+
+func GetHashes[T parquet.ColumnTypes](h Hasher, vals []T) []uint64 {
+       return h.Sum64s(getBytesSlice(vals))
+}
+
+func GetSpacedHashes[T parquet.ColumnTypes](h Hasher, numValid int64, vals 
[]T, validBits []byte, validBitsOffset int64) []uint64 {
+       if numValid == 0 {
+               return []uint64{}
+       }
+
+       out := make([]uint64, 0, numValid)
+
+       // TODO: replace with bitset run reader pool
+       setReader := bitutils.NewSetBitRunReader(validBits, validBitsOffset, 
int64(len(vals)))
+       for {
+               run := setReader.NextRun()
+               if run.Length == 0 {
+                       break
+               }
+
+               out = append(out, 
h.Sum64s(getBytesSlice(vals[run.Pos:run.Pos+run.Length]))...)
+       }
+       return out
+}
+
+func getBytes[T parquet.ColumnTypes](v T) []byte {
+       switch v := any(v).(type) {
+       case parquet.ByteArray:
+               return v
+       case parquet.FixedLenByteArray:
+               return v
+       case parquet.Int96:
+               return v[:]
+       }
+
+       return unsafe.Slice((*byte)(unsafe.Pointer(&v)), unsafe.Sizeof(v))
+}
+
+func getBytesSlice[T parquet.ColumnTypes](v []T) [][]byte {
+       b := make([][]byte, len(v))
+       switch v := any(v).(type) {
+       case []parquet.ByteArray:
+               for i, vv := range v {
+                       b[i] = vv
+               }
+               return b
+       case []parquet.FixedLenByteArray:
+               for i, vv := range v {
+                       b[i] = vv
+               }
+               return b
+       case []parquet.Int96:
+               for i, vv := range v {
+                       b[i] = vv[:]
+               }
+               return b
+       }
+
+       var z T
+       sz, ptr := int(unsafe.Sizeof(z)), unsafe.SliceData(v)
+       raw := unsafe.Slice((*byte)(unsafe.Pointer(ptr)), sz*len(v))
+       for i := range b {
+               b[i] = raw[i*sz : (i+1)*sz]
+       }
+
+       return b
+}
+
+type blockSplitBloomFilter struct {
+       data     *memory.Buffer
+       bitset32 []uint32
+
+       hasher       Hasher
+       algorithm    format.BloomFilterAlgorithm
+       hashStrategy format.BloomFilterHash
+       compression  format.BloomFilterCompression
+}
+
+func (b *blockSplitBloomFilter) getAlg() *format.BloomFilterAlgorithm {
+       return &b.algorithm
+}
+
+func (b *blockSplitBloomFilter) getHashStrategy() *format.BloomFilterHash {
+       return &b.hashStrategy
+}
+
+func (b *blockSplitBloomFilter) getCompression() 
*format.BloomFilterCompression {
+       return &b.compression
+}
+
+func (b *blockSplitBloomFilter) CheckHash(hash uint64) bool {
+       return checkHash(b.bitset32, hash)
+}
+
+func (b *blockSplitBloomFilter) CheckBulk(hashes []uint64) []bool {
+       results := make([]bool, len(hashes))
+       checkBulk(b.bitset32, hashes, results)
+       return results
+}
+
+func (b *blockSplitBloomFilter) InsertHash(hash uint64) {
+       insertHash(b.bitset32, hash)
+}
+
+func (b *blockSplitBloomFilter) InsertBulk(hashes []uint64) {
+       insertBulk(b.bitset32, hashes)
+}
+
+func (b *blockSplitBloomFilter) Hasher() Hasher {
+       return b.hasher
+}
+
+func (b *blockSplitBloomFilter) Size() int64 {
+       return int64(len(b.bitset32) * 4)
+}
+
+func (b *blockSplitBloomFilter) WriteTo(w io.Writer, enc encryption.Encryptor) 
(int, error) {
+       if enc != nil {
+               n := enc.Encrypt(w, b.data.Bytes())
+               return n, nil
+       }
+       return w.Write(b.data.Bytes())
+}
+
+func NewBloomFilter(numBytes, maxBytes uint32, mem memory.Allocator) 
BloomFilterBuilder {
+       if numBytes < minimumBloomFilterBytes {
+               numBytes = minimumBloomFilterBytes
+       }
+
+       if maxBytes > maximumBloomFilterBytes {
+               maxBytes = maximumBloomFilterBytes
+       }
+
+       if numBytes > maxBytes {
+               numBytes = maxBytes
+       }
+
+       // get next power of 2 if it's not a power of 2
+       if (numBytes & (numBytes - 1)) != 0 {
+               numBytes = uint32(bitutil.NextPowerOf2(int(numBytes)))
+       }
+
+       buf := memory.NewResizableBuffer(mem)
+       buf.ResizeNoShrink(int(numBytes))
+       bf := &blockSplitBloomFilter{
+               data:         buf,
+               bitset32:     arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
+               hasher:       xxhasher{},
+               algorithm:    format.BloomFilterAlgorithm{BLOCK: 
&format.SplitBlockAlgorithm{}},
+               hashStrategy: format.BloomFilterHash{XXHASH: &format.XxHash{}},
+               compression:  format.BloomFilterCompression{UNCOMPRESSED: 
&format.Uncompressed{}},
+       }
+       addCleanup(bf, nil)
+       return bf
+}
+
+func NewBloomFilterFromNDVAndFPP(ndv uint32, fpp float64, maxBytes int64, mem 
memory.Allocator) BloomFilterBuilder {
+       numBytes := optimalNumBytes(ndv, fpp)
+       if numBytes > uint32(maxBytes) {
+               numBytes = uint32(maxBytes)
+       }
+
+       buf := memory.NewResizableBuffer(mem)
+       buf.ResizeNoShrink(int(numBytes))
+       bf := &blockSplitBloomFilter{
+               data:         buf,
+               bitset32:     arrow.Uint32Traits.CastFromBytes(buf.Bytes()),
+               hasher:       xxhasher{},
+               algorithm:    format.BloomFilterAlgorithm{BLOCK: 
&format.SplitBlockAlgorithm{}},
+               hashStrategy: format.BloomFilterHash{XXHASH: &format.XxHash{}},
+               compression:  format.BloomFilterCompression{UNCOMPRESSED: 
&format.Uncompressed{}},
+       }
+       addCleanup(bf, nil)
+       return bf
+}
+
+type BloomFilterBuilder interface {
+       Hasher() Hasher
+       Size() int64
+       InsertHash(hash uint64)
+       InsertBulk(hashes []uint64)
+       WriteTo(io.Writer, encryption.Encryptor) (int, error)
+
+       getAlg() *format.BloomFilterAlgorithm
+       getHashStrategy() *format.BloomFilterHash
+       getCompression() *format.BloomFilterCompression
+}
+
+type BloomFilter interface {
+       Hasher() Hasher
+       CheckHash(hash uint64) bool
+       Size() int64
+}
+
+type TypedBloomFilter[T parquet.ColumnTypes] struct {
+       BloomFilter
+}
+
+func (b *TypedBloomFilter[T]) Check(v T) bool {
+       h := b.Hasher()
+       return b.CheckHash(h.Sum64(getBytes(v)))
+}
+
+func validateBloomFilterHeader(hdr *format.BloomFilterHeader) error {
+       if hdr == nil {
+               return errors.New("bloom filter header must not be nil")
+       }
+
+       if !hdr.Algorithm.IsSetBLOCK() {
+               return fmt.Errorf("unsupported bloom filter algorithm: %s", 
hdr.Algorithm)
+       }
+
+       if !hdr.Compression.IsSetUNCOMPRESSED() {
+               return fmt.Errorf("unsupported bloom filter compression: %s", 
hdr.Compression)
+       }
+
+       if !hdr.Hash.IsSetXXHASH() {
+               return fmt.Errorf("unsupported bloom filter hash strategy: %s", 
hdr.Hash)
+       }
+
+       if hdr.NumBytes < minimumBloomFilterBytes || hdr.NumBytes > 
maximumBloomFilterBytes {
+               return fmt.Errorf("invalid bloom filter size: %d", hdr.NumBytes)
+       }
+
+       return nil
+}
+
+type BloomFilterReader struct {
+       Input         parquet.ReaderAtSeeker
+       FileMetadata  *FileMetaData
+       Props         *parquet.ReaderProperties
+       FileDecryptor encryption.FileDecryptor
+       BufferPool    *sync.Pool
+}
+
+func (r *BloomFilterReader) RowGroup(i int) (*RowGroupBloomFilterReader, 
error) {
+       if i < 0 || i >= len(r.FileMetadata.RowGroups) {
+               return nil, fmt.Errorf("row group index %d out of range", i)
+       }
+
+       rgMeta := r.FileMetadata.RowGroup(i)
+       return &RowGroupBloomFilterReader{
+               input:          r.Input,
+               rgMeta:         rgMeta,
+               fileDecryptor:  r.FileDecryptor,
+               rgOrdinal:      int16(i),
+               bufferPool:     r.BufferPool,
+               sourceFileSize: r.FileMetadata.sourceFileSize,
+       }, nil
+}
+
+type RowGroupBloomFilterReader struct {
+       input          parquet.ReaderAtSeeker
+       rgMeta         *RowGroupMetaData
+       fileDecryptor  encryption.FileDecryptor
+       rgOrdinal      int16
+       sourceFileSize int64
+
+       bufferPool *sync.Pool
+}
+
+func (r *RowGroupBloomFilterReader) GetColumnBloomFilter(i int) (BloomFilter, 
error) {
+       if i < 0 || i >= r.rgMeta.NumColumns() {
+               return nil, fmt.Errorf("column index %d out of range", i)
+       }
+
+       col, err := r.rgMeta.ColumnChunk(i)
+       if err != nil {
+               return nil, err
+       }
+
+       var (
+               decryptor           encryption.Decryptor
+               header              format.BloomFilterHeader
+               offset              int64
+               bloomFilterReadSize int32 = 256
+       )
+
+       if offset = col.BloomFilterOffset(); offset <= 0 {
+               return nil, nil
+       }
+
+       if col.BloomFilterLength() > 0 {
+               bloomFilterReadSize = col.BloomFilterLength()
+       }
+
+       sectionRdr := io.NewSectionReader(r.input, offset, 
r.sourceFileSize-offset)
+       cryptoMetadata := col.CryptoMetadata()
+       if cryptoMetadata != nil {
+               decryptor, err = 
encryption.GetColumnMetaDecryptor(cryptoMetadata, r.fileDecryptor)
+               if err != nil {
+                       return nil, err
+               }
+
+               encryption.UpdateDecryptor(decryptor, r.rgOrdinal, int16(i),
+                       encryption.BloomFilterHeaderModule)
+               hdr := decryptor.DecryptFrom(sectionRdr)
+               if _, err = thrift.DeserializeThrift(&header, hdr); err != nil {
+                       return nil, err
+               }
+
+               if err = validateBloomFilterHeader(&header); err != nil {
+                       return nil, err
+               }
+
+               encryption.UpdateDecryptor(decryptor, r.rgOrdinal, int16(i),
+                       encryption.BloomFilterBitsetModule)
+               bitset := decryptor.DecryptFrom(sectionRdr)
+               if len(bitset) != int(header.NumBytes) {
+                       return nil, fmt.Errorf("wrong length of decrypted bloom 
filter bitset: %d vs %d",
+                               len(bitset), header.NumBytes)
+               }
+
+               return &blockSplitBloomFilter{
+                       data:         memory.NewBufferBytes(bitset),
+                       bitset32:     arrow.Uint32Traits.CastFromBytes(bitset),
+                       hasher:       xxhasher{},
+                       algorithm:    *header.Algorithm,
+                       hashStrategy: *header.Hash,
+                       compression:  *header.Compression,
+               }, nil
+       }
+
+       headerBuf := r.bufferPool.Get().(*memory.Buffer)
+       headerBuf.ResizeNoShrink(int(bloomFilterReadSize))
+       defer func() {
+               if headerBuf != nil {
+                       headerBuf.ResizeNoShrink(0)
+                       r.bufferPool.Put(headerBuf)
+               }
+       }()
+
+       if _, err = sectionRdr.Read(headerBuf.Bytes()); err != nil {
+               return nil, err
+       }
+
+       remaining, err := thrift.DeserializeThrift(&header, headerBuf.Bytes())
+       if err != nil {
+               return nil, err
+       }
+       headerSize := len(headerBuf.Bytes()) - int(remaining)
+
+       if err = validateBloomFilterHeader(&header); err != nil {
+               return nil, err
+       }
+
+       bloomFilterSz := header.NumBytes
+       var bitset []byte
+       if int(bloomFilterSz)+headerSize <= len(headerBuf.Bytes()) {
+               // bloom filter data is entirely contained in the buffer we 
just read
+               bitset = headerBuf.Bytes()[headerSize : 
headerSize+int(bloomFilterSz)]
+       } else {
+               buf := r.bufferPool.Get().(*memory.Buffer)
+               buf.ResizeNoShrink(int(bloomFilterSz))
+               filterBytesInHeader := headerBuf.Len() - headerSize
+               if filterBytesInHeader > 0 {
+                       copy(buf.Bytes(), headerBuf.Bytes()[headerSize:])
+               }
+
+               if _, err = sectionRdr.Read(buf.Bytes()[filterBytesInHeader:]); 
err != nil {
+                       return nil, err
+               }
+               bitset = buf.Bytes()
+               headerBuf.ResizeNoShrink(0)
+               r.bufferPool.Put(headerBuf)
+               headerBuf = buf
+       }
+
+       bf := &blockSplitBloomFilter{
+               data:         headerBuf,
+               bitset32:     arrow.GetData[uint32](bitset),
+               hasher:       xxhasher{},
+               algorithm:    *header.Algorithm,
+               hashStrategy: *header.Hash,
+               compression:  *header.Compression,
+       }
+       headerBuf = nil
+       addCleanup(bf, r.bufferPool)
+       return bf, nil
+}
+
+type FileBloomFilterBuilder struct {
+       Schema    *schema.Schema
+       Encryptor encryption.FileEncryptor
+
+       rgMetaBldrs  []*RowGroupMetaDataBuilder
+       bloomFilters []map[string]BloomFilterBuilder
+}
+
+func (f *FileBloomFilterBuilder) AppendRowGroup(rgMeta 
*RowGroupMetaDataBuilder, filters map[string]BloomFilterBuilder) {
+       f.rgMetaBldrs = append(f.rgMetaBldrs, rgMeta)
+       f.bloomFilters = append(f.bloomFilters, filters)
+}
+
+func (f *FileBloomFilterBuilder) WriteTo(w utils.WriterTell) error {
+       if len(f.rgMetaBldrs) == 0 || len(f.bloomFilters) == 0 {
+               return nil
+       }
+
+       var (
+               hdr        format.BloomFilterHeader
+               serializer = thrift.NewThriftSerializer()
+       )
+       for rg, rgMeta := range f.rgMetaBldrs {
+               if len(f.bloomFilters[rg]) == 0 {
+                       continue
+               }
+
+               for c, col := range rgMeta.colBuilders {
+                       colPath := col.column.Path()
+                       bf, ok := f.bloomFilters[rg][colPath]
+                       if !ok || bf == nil {
+                               continue
+                       }
+
+                       offset := w.Tell()
+                       col.chunk.MetaData.BloomFilterOffset = &offset
+                       var encryptor encryption.Encryptor
+                       if f.Encryptor != nil {
+                               encryptor = 
f.Encryptor.GetColumnMetaEncryptor(colPath)
+                       }
+
+                       if encryptor != nil {
+                               encryptor.UpdateAad(encryption.CreateModuleAad(
+                                       encryptor.FileAad(), 
encryption.BloomFilterHeaderModule,
+                                       int16(rg), int16(c), 
encryption.NonPageOrdinal))
+                       }
+
+                       hdr.NumBytes = int32(bf.Size())
+                       hdr.Algorithm = bf.getAlg()
+                       hdr.Hash = bf.getHashStrategy()
+                       hdr.Compression = bf.getCompression()
+
+                       _, err := serializer.Serialize(&hdr, w, encryptor)
+                       if err != nil {
+                               return err
+                       }
+
+                       if encryptor != nil {
+                               encryptor.UpdateAad(encryption.CreateModuleAad(
+                                       encryptor.FileAad(), 
encryption.BloomFilterBitsetModule,
+                                       int16(rg), int16(c), 
encryption.NonPageOrdinal))
+                       }
+
+                       if _, err = bf.WriteTo(w, encryptor); err != nil {
+                               return err
+                       }
+
+                       dataWritten := int32(w.Tell() - offset)
+                       col.chunk.MetaData.BloomFilterLength = &dataWritten
+               }
+       }
+       return nil
+}
diff --git a/parquet/metadata/bloom_filter_block.go 
b/parquet/metadata/bloom_filter_block.go
new file mode 100644
index 0000000..bbd8426
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+var (
+       checkHash  func([]uint32, uint64) bool
+       checkBulk  func([]uint32, []uint64, []bool)
+       insertHash func([]uint32, uint64)
+       insertBulk func([]uint32, []uint64)
+)
+
+func checkHashGo(bitset32 []uint32, hash uint64) bool {
+       bucketIdx := uint32(((hash >> 32) * uint64(len(bitset32)/8)) >> 32)
+       key := uint32(hash)
+
+       for i := range bitsSetPerBlock {
+               mask := uint32(1) << ((key * salt[i]) >> 27)
+               if bitset32[bitsSetPerBlock*bucketIdx+uint32(i)]&mask == 0 {
+                       return false
+               }
+       }
+       return true
+}
+
+func insertHashGo(bitset32 []uint32, hash uint64) {
+       bucketIdx := uint32(((hash >> 32) * uint64(len(bitset32)/8)) >> 32)
+       key := uint32(hash)
+
+       for i := range bitsSetPerBlock {
+               mask := uint32(1) << ((key * salt[i]) >> 27)
+               bitset32[bitsSetPerBlock*bucketIdx+uint32(i)] |= mask
+       }
+}
+
+func insertBulkGo(bitset32 []uint32, hash []uint64) {
+       for _, h := range hash {
+               insertHash(bitset32, h)
+       }
+}
diff --git a/parquet/metadata/bloom_filter_block_amd64.go 
b/parquet/metadata/bloom_filter_block_amd64.go
new file mode 100644
index 0000000..463e696
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_amd64.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !noasm
+
+package metadata
+
+import (
+       "golang.org/x/sys/cpu"
+)
+
+func init() {
+       if cpu.X86.HasAVX2 {
+               checkHash = checkBlockAvx2
+               insertHash, insertBulk = insertBlockAvx2, insertBulkAvx2
+       } else if cpu.X86.HasSSE42 {
+               checkHash = checkBlockSSE4
+               insertHash, insertBulk = insertBlockSSE4, insertBulkSSE4
+       } else {
+               checkHash = checkHashGo
+               insertHash, insertBulk = insertHashGo, insertBulkGo
+       }
+}
diff --git a/parquet/metadata/bloom_filter_block_avx2_amd64.go 
b/parquet/metadata/bloom_filter_block_avx2_amd64.go
new file mode 100644
index 0000000..01655d0
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_avx2_amd64.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !noasm
+
+package metadata
+
+import (
+       "unsafe"
+)
+
+//go:noescape
+func _check_block_avx2(bitset32 unsafe.Pointer, len int, hash uint64) (result 
bool)
+
+func checkBlockAvx2(bitset32 []uint32, hash uint64) bool {
+       return _check_block_avx2(unsafe.Pointer(unsafe.SliceData(bitset32)), 
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_block_avx2(bitset32 unsafe.Pointer, len int, hash uint64)
+
+func insertBlockAvx2(bitset32 []uint32, hash uint64) {
+       _insert_block_avx2(unsafe.Pointer(unsafe.SliceData(bitset32)), 
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_bulk_avx2(bitset32 unsafe.Pointer, block_len int, hashes 
unsafe.Pointer, hash_len int)
+
+func insertBulkAvx2(bitset32 []uint32, hashes []uint64) {
+       _insert_bulk_avx2(unsafe.Pointer(unsafe.SliceData(bitset32)), 
len(bitset32),
+               unsafe.Pointer(unsafe.SliceData(hashes)), len(hashes))
+}
diff --git a/parquet/metadata/bloom_filter_block_avx2_amd64.s 
b/parquet/metadata/bloom_filter_block_avx2_amd64.s
new file mode 100644
index 0000000..c176800
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_avx2_amd64.s
@@ -0,0 +1,151 @@
+//+build !noasm !appengine
+// AUTO-GENERATED BY C2GOASM -- DO NOT EDIT
+
+DATA LCDATA1<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA1<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA1<>+0x010(SB)/8, $0x2df1424b705495c7
+DATA LCDATA1<>+0x018(SB)/8, $0x5c6bfb319efc4947
+DATA LCDATA1<>+0x020(SB)/8, $0x0000000000000001
+GLOBL LCDATA1<>(SB), 8, $40
+
+TEXT ·_check_block_avx2(SB), $0-32
+
+       MOVQ bitset32+0(FP), DI
+       MOVQ len+8(FP), SI
+       MOVQ hash+16(FP), DX
+       LEAQ LCDATA1<>(SB), BP
+
+       WORD $0x8948; BYTE $0xd1       // mov    rcx, rdx
+       LONG $0x20e9c148               // shr    rcx, 32
+       WORD $0x468d; BYTE $0x07       // lea    eax, [rsi + 7]
+       WORD $0xf685                   // test    esi, esi
+       WORD $0x490f; BYTE $0xc6       // cmovns    eax, esi
+       WORD $0xf8c1; BYTE $0x03       // sar    eax, 3
+       WORD $0x9848                   // cdqe
+       LONG $0xc1af0f48               // imul    rax, rcx
+       LONG $0x1de8c148               // shr    rax, 29
+       WORD $0xe083; BYTE $0xf8       // and    eax, -8
+       LONG $0x137bca69; WORD $0x47b6 // imul    ecx, edx, 1203114875
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       WORD $0x348b; BYTE $0x87       // mov    esi, dword [rdi + 4*rax]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x4d91ca69; WORD $0x4497 // imul    ecx, edx, 1150766481
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x0487748b               // mov    esi, dword [rdi + 4*rax + 4]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0xad5bca69; WORD $0x8824 // imul    ecx, edx, -2010862245
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x0887748b               // mov    esi, dword [rdi + 4*rax + 8]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x289dca69; WORD $0xa2b7 // imul    ecx, edx, -1565054819
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x0c87748b               // mov    esi, dword [rdi + 4*rax + 12]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x95c7ca69; WORD $0x7054 // imul    ecx, edx, 1884591559
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1087748b               // mov    esi, dword [rdi + 4*rax + 16]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x424bca69; WORD $0x2df1 // imul    ecx, edx, 770785867
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1487748b               // mov    esi, dword [rdi + 4*rax + 20]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x4947ca69; WORD $0x9efc // imul    ecx, edx, -1627633337
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1887748b               // mov    esi, dword [rdi + 4*rax + 24]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0xfb31ca69; WORD $0x5c6b // imul    ecx, edx, 1550580529
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1c87448b               // mov    eax, dword [rdi + 4*rax + 28]
+       WORD $0xa30f; BYTE $0xc8       // bt    eax, ecx
+       WORD $0x920f; BYTE $0xd0       // setb    al
+       MOVQ AX, result+24(FP)
+       RET
+
+LBB0_8:
+       WORD $0xc031           // xor    eax, eax
+       MOVQ AX, result+24(FP)
+       RET
+
+TEXT ·_insert_block_avx2(SB), $0-24
+
+       MOVQ bitset32+0(FP), DI
+       MOVQ len+8(FP), SI
+       MOVQ hash+16(FP), DX
+       LEAQ LCDATA1<>(SB), BP
+
+       LONG $0xc26ef9c5                       // vmovd    xmm0, edx
+       LONG $0x20eac148                       // shr    rdx, 32
+       WORD $0x468d; BYTE $0x07               // lea    eax, [rsi + 7]
+       WORD $0xf685                           // test    esi, esi
+       WORD $0x490f; BYTE $0xc6               // cmovns    eax, esi
+       WORD $0xf8c1; BYTE $0x03               // sar    eax, 3
+       WORD $0x9848                           // cdqe
+       LONG $0xc2af0f48                       // imul    rax, rdx
+       LONG $0x1be8c148                       // shr    rax, 27
+       QUAD $0x0003ffffffe0b948; WORD $0x0000 // mov    rcx, 17179869152
+       LONG $0x587de2c4; BYTE $0xc0           // vpbroadcastd    ymm0, xmm0
+       LONG $0x407de2c4; WORD $0x0045         // vpmulld    ymm0, ymm0, yword 
0[rbp] /* [rip + .LCPI2_0] */
+       WORD $0x2148; BYTE $0xc1               // and    rcx, rax
+       LONG $0xd072fdc5; BYTE $0x1b           // vpsrld    ymm0, ymm0, 27
+       LONG $0x587de2c4; WORD $0x204d         // vpbroadcastd    ymm1, dword 
32[rbp] /* [rip + .LCPI2_1] */
+       LONG $0x4775e2c4; BYTE $0xc0           // vpsllvd    ymm0, ymm1, ymm0
+       LONG $0x04ebfdc5; BYTE $0x0f           // vpor    ymm0, ymm0, yword 
[rdi + rcx]
+       LONG $0x047ffec5; BYTE $0x0f           // vmovdqu    yword [rdi + rcx], 
ymm0
+       VZEROUPPER
+       RET
+
+DATA LCDATA2<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA2<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA2<>+0x010(SB)/8, $0x2df1424b705495c7
+DATA LCDATA2<>+0x018(SB)/8, $0x5c6bfb319efc4947
+DATA LCDATA2<>+0x020(SB)/8, $0x0000000000000001
+GLOBL LCDATA2<>(SB), 8, $40
+
+TEXT ·_insert_bulk_avx2(SB), $0-32
+
+       MOVQ bitset32+0(FP), DI
+       MOVQ block_len+8(FP), SI
+       MOVQ hashes+16(FP), DX
+       MOVQ hash_len+24(FP), CX
+       LEAQ LCDATA2<>(SB), BP
+
+       WORD $0xc985                           // test    ecx, ecx
+       JLE  LBB3_4
+       WORD $0x468d; BYTE $0x07               // lea    eax, [rsi + 7]
+       WORD $0xf685                           // test    esi, esi
+       WORD $0x490f; BYTE $0xc6               // cmovns    eax, esi
+       WORD $0xf8c1; BYTE $0x03               // sar    eax, 3
+       WORD $0x9848                           // cdqe
+       WORD $0xc989                           // mov    ecx, ecx
+       WORD $0xf631                           // xor    esi, esi
+       QUAD $0x0003ffffffe0b849; WORD $0x0000 // mov    r8, 17179869152
+       LONG $0x456ffdc5; BYTE $0x00           // vmovdqa    ymm0, yword 0[rbp] 
/* [rip + .LCPI3_0] */
+       LONG $0x587de2c4; WORD $0x204d         // vpbroadcastd    ymm1, dword 
32[rbp] /* [rip + .LCPI3_1] */
+
+LBB3_2:
+       LONG $0xf20c8b4c               // mov    r9, qword [rdx + 8*rsi]
+       LONG $0x6e79c1c4; BYTE $0xd1   // vmovd    xmm2, r9d
+       LONG $0x20e9c149               // shr    r9, 32
+       LONG $0xc8af0f4c               // imul    r9, rax
+       LONG $0x1be9c149               // shr    r9, 27
+       WORD $0x214d; BYTE $0xc1       // and    r9, r8
+       LONG $0x587de2c4; BYTE $0xd2   // vpbroadcastd    ymm2, xmm2
+       LONG $0x406de2c4; BYTE $0xd0   // vpmulld    ymm2, ymm2, ymm0
+       LONG $0xd272edc5; BYTE $0x1b   // vpsrld    ymm2, ymm2, 27
+       LONG $0x4775e2c4; BYTE $0xd2   // vpsllvd    ymm2, ymm1, ymm2
+       LONG $0xeb6da1c4; WORD $0x0f14 // vpor    ymm2, ymm2, yword [rdi + r9]
+       LONG $0x7f7ea1c4; WORD $0x0f14 // vmovdqu    yword [rdi + r9], ymm2
+       WORD $0xff48; BYTE $0xc6       // inc    rsi
+       WORD $0x3948; BYTE $0xf1       // cmp    rcx, rsi
+       JNE  LBB3_2
+
+LBB3_4:
+       VZEROUPPER
+       RET
diff --git a/parquet/metadata/bloom_filter_block_default.go 
b/parquet/metadata/bloom_filter_block_default.go
new file mode 100644
index 0000000..ca1ec75
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_default.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build noasm || !amd64
+
+package metadata
+
+func init() {
+       checkHash, insertHash, insertBulk = checkHashGo, insertHashGo, 
insertBulkGo
+}
diff --git a/parquet/metadata/bloom_filter_block_sse4_amd64.go 
b/parquet/metadata/bloom_filter_block_sse4_amd64.go
new file mode 100644
index 0000000..7c94bf7
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_sse4_amd64.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !noasm
+
+package metadata
+
+import (
+       "unsafe"
+)
+
+//go:noescape
+func _check_block_sse4(bitset32 unsafe.Pointer, len int, hash uint64) (result 
bool)
+
+func checkBlockSSE4(bitset32 []uint32, hash uint64) bool {
+       return _check_block_sse4(unsafe.Pointer(unsafe.SliceData(bitset32)), 
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_block_sse4(bitset32 unsafe.Pointer, len int, hash uint64)
+
+func insertBlockSSE4(bitset32 []uint32, hash uint64) {
+       _insert_block_sse4(unsafe.Pointer(unsafe.SliceData(bitset32)), 
len(bitset32), hash)
+}
+
+//go:noescape
+func _insert_bulk_sse4(bitset32 unsafe.Pointer, block_len int, hashes 
unsafe.Pointer, hash_len int)
+
+func insertBulkSSE4(bitset32 []uint32, hashes []uint64) {
+       _insert_bulk_sse4(unsafe.Pointer(unsafe.SliceData(bitset32)), 
len(bitset32),
+               unsafe.Pointer(unsafe.SliceData(hashes)), len(hashes))
+}
diff --git a/parquet/metadata/bloom_filter_block_sse4_amd64.s 
b/parquet/metadata/bloom_filter_block_sse4_amd64.s
new file mode 100644
index 0000000..ac5ff12
--- /dev/null
+++ b/parquet/metadata/bloom_filter_block_sse4_amd64.s
@@ -0,0 +1,176 @@
+//+build !noasm !appengine
+// AUTO-GENERATED BY C2GOASM -- DO NOT EDIT
+
+DATA LCDATA1<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA1<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA1<>+0x010(SB)/8, $0x3f8000003f800000
+DATA LCDATA1<>+0x018(SB)/8, $0x3f8000003f800000
+DATA LCDATA1<>+0x020(SB)/8, $0x2df1424b705495c7
+DATA LCDATA1<>+0x028(SB)/8, $0x5c6bfb319efc4947
+GLOBL LCDATA1<>(SB), 8, $48
+
+TEXT ·_check_block_sse4(SB), $0-32
+
+       MOVQ bitset32+0(FP), DI
+       MOVQ len+8(FP), SI
+       MOVQ hash+16(FP), DX
+       LEAQ LCDATA1<>(SB), BP
+
+       WORD $0x8948; BYTE $0xd1       // mov    rcx, rdx
+       LONG $0x20e9c148               // shr    rcx, 32
+       WORD $0x468d; BYTE $0x07       // lea    eax, [rsi + 7]
+       WORD $0xf685                   // test    esi, esi
+       WORD $0x490f; BYTE $0xc6       // cmovns    eax, esi
+       WORD $0xf8c1; BYTE $0x03       // sar    eax, 3
+       WORD $0x9848                   // cdqe
+       LONG $0xc1af0f48               // imul    rax, rcx
+       LONG $0x1de8c148               // shr    rax, 29
+       WORD $0xe083; BYTE $0xf8       // and    eax, -8
+       LONG $0x137bca69; WORD $0x47b6 // imul    ecx, edx, 1203114875
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       WORD $0x348b; BYTE $0x87       // mov    esi, dword [rdi + 4*rax]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x4d91ca69; WORD $0x4497 // imul    ecx, edx, 1150766481
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x0487748b               // mov    esi, dword [rdi + 4*rax + 4]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0xad5bca69; WORD $0x8824 // imul    ecx, edx, -2010862245
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x0887748b               // mov    esi, dword [rdi + 4*rax + 8]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x289dca69; WORD $0xa2b7 // imul    ecx, edx, -1565054819
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x0c87748b               // mov    esi, dword [rdi + 4*rax + 12]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x95c7ca69; WORD $0x7054 // imul    ecx, edx, 1884591559
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1087748b               // mov    esi, dword [rdi + 4*rax + 16]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x424bca69; WORD $0x2df1 // imul    ecx, edx, 770785867
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1487748b               // mov    esi, dword [rdi + 4*rax + 20]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0x4947ca69; WORD $0x9efc // imul    ecx, edx, -1627633337
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1887748b               // mov    esi, dword [rdi + 4*rax + 24]
+       WORD $0xa30f; BYTE $0xce       // bt    esi, ecx
+       JAE  LBB0_8
+       LONG $0xfb31ca69; WORD $0x5c6b // imul    ecx, edx, 1550580529
+       WORD $0xe9c1; BYTE $0x1b       // shr    ecx, 27
+       LONG $0x1c87448b               // mov    eax, dword [rdi + 4*rax + 28]
+       WORD $0xa30f; BYTE $0xc8       // bt    eax, ecx
+       WORD $0x920f; BYTE $0xd0       // setb    al
+       MOVQ AX, result+24(FP)
+       RET
+
+LBB0_8:
+       WORD $0xc031           // xor    eax, eax
+       MOVQ AX, result+24(FP)
+       RET
+
+TEXT ·_insert_block_sse4(SB), $0-24
+
+       MOVQ bitset32+0(FP), DI
+       MOVQ len+8(FP), SI
+       MOVQ hash+16(FP), DX
+       LEAQ LCDATA1<>(SB), BP
+
+       LONG $0xc26e0f66                       // movd    xmm0, edx
+       LONG $0x20eac148                       // shr    rdx, 32
+       WORD $0x468d; BYTE $0x07               // lea    eax, [rsi + 7]
+       WORD $0xf685                           // test    esi, esi
+       WORD $0x490f; BYTE $0xc6               // cmovns    eax, esi
+       WORD $0xf8c1; BYTE $0x03               // sar    eax, 3
+       WORD $0x6348; BYTE $0xc8               // movsxd    rcx, eax
+       LONG $0xcaaf0f48                       // imul    rcx, rdx
+       LONG $0x1be9c148                       // shr    rcx, 27
+       QUAD $0x0003ffffffe0b848; WORD $0x0000 // mov    rax, 17179869152
+       WORD $0x2148; BYTE $0xc8               // and    rax, rcx
+       LONG $0xc0700f66; BYTE $0x00           // pshufd    xmm0, xmm0, 0
+       LONG $0x4d6f0f66; BYTE $0x00           // movdqa    xmm1, oword 0[rbp] 
/* [rip + .LCPI2_0] */
+       LONG $0x40380f66; BYTE $0xc8           // pmulld    xmm1, xmm0
+       LONG $0xd1720f66; BYTE $0x1b           // psrld    xmm1, 27
+       LONG $0xf1720f66; BYTE $0x17           // pslld    xmm1, 23
+       LONG $0x556f0f66; BYTE $0x10           // movdqa    xmm2, oword 16[rbp] 
/* [rip + .LCPI2_1] */
+       LONG $0xcafe0f66                       // paddd    xmm1, xmm2
+       LONG $0xc95b0ff3                       // cvttps2dq    xmm1, xmm1
+       LONG $0x071c100f                       // movups    xmm3, oword [rdi + 
rax]
+       WORD $0x560f; BYTE $0xd9               // orps    xmm3, xmm1
+       LONG $0x074c100f; BYTE $0x10           // movups    xmm1, oword [rdi + 
rax + 16]
+       LONG $0x071c110f                       // movups    oword [rdi + rax], 
xmm3
+       LONG $0x40380f66; WORD $0x2045         // pmulld    xmm0, oword 32[rbp] 
/* [rip + .LCPI2_2] */
+       LONG $0xd0720f66; BYTE $0x1b           // psrld    xmm0, 27
+       LONG $0xf0720f66; BYTE $0x17           // pslld    xmm0, 23
+       LONG $0xc2fe0f66                       // paddd    xmm0, xmm2
+       LONG $0xc05b0ff3                       // cvttps2dq    xmm0, xmm0
+       WORD $0x560f; BYTE $0xc1               // orps    xmm0, xmm1
+       LONG $0x0744110f; BYTE $0x10           // movups    oword [rdi + rax + 
16], xmm0
+       RET
+
+DATA LCDATA2<>+0x000(SB)/8, $0x44974d9147b6137b
+DATA LCDATA2<>+0x008(SB)/8, $0xa2b7289d8824ad5b
+DATA LCDATA2<>+0x010(SB)/8, $0x3f8000003f800000
+DATA LCDATA2<>+0x018(SB)/8, $0x3f8000003f800000
+DATA LCDATA2<>+0x020(SB)/8, $0x2df1424b705495c7
+DATA LCDATA2<>+0x028(SB)/8, $0x5c6bfb319efc4947
+GLOBL LCDATA2<>(SB), 8, $48
+
+TEXT ·_insert_bulk_sse4(SB), $0-32
+
+       MOVQ bitset32+0(FP), DI
+       MOVQ block_len+8(FP), SI
+       MOVQ hashes+16(FP), DX
+       MOVQ hash_len+24(FP), CX
+       LEAQ LCDATA2<>(SB), BP
+
+       WORD $0xc985                           // test    ecx, ecx
+       JLE  LBB3_4
+       WORD $0x468d; BYTE $0x07               // lea    eax, [rsi + 7]
+       WORD $0xf685                           // test    esi, esi
+       WORD $0x490f; BYTE $0xc6               // cmovns    eax, esi
+       WORD $0xf8c1; BYTE $0x03               // sar    eax, 3
+       WORD $0x9848                           // cdqe
+       WORD $0xc989                           // mov    ecx, ecx
+       WORD $0xf631                           // xor    esi, esi
+       QUAD $0x0003ffffffe0b849; WORD $0x0000 // mov    r8, 17179869152
+       LONG $0x456f0f66; BYTE $0x00           // movdqa    xmm0, oword 0[rbp] 
/* [rip + .LCPI3_0] */
+       LONG $0x4d6f0f66; BYTE $0x10           // movdqa    xmm1, oword 16[rbp] 
/* [rip + .LCPI3_1] */
+       LONG $0x556f0f66; BYTE $0x20           // movdqa    xmm2, oword 32[rbp] 
/* [rip + .LCPI3_2] */
+
+LBB3_2:
+       LONG $0xf20c8b4c               // mov    r9, qword [rdx + 8*rsi]
+       LONG $0x6e0f4166; BYTE $0xd9   // movd    xmm3, r9d
+       LONG $0x20e9c149               // shr    r9, 32
+       LONG $0xc8af0f4c               // imul    r9, rax
+       LONG $0x1be9c149               // shr    r9, 27
+       WORD $0x214d; BYTE $0xc1       // and    r9, r8
+       LONG $0xdb700f66; BYTE $0x00   // pshufd    xmm3, xmm3, 0
+       LONG $0xe36f0f66               // movdqa    xmm4, xmm3
+       LONG $0x40380f66; BYTE $0xe0   // pmulld    xmm4, xmm0
+       LONG $0xd4720f66; BYTE $0x1b   // psrld    xmm4, 27
+       LONG $0xf4720f66; BYTE $0x17   // pslld    xmm4, 23
+       LONG $0xe1fe0f66               // paddd    xmm4, xmm1
+       LONG $0xe45b0ff3               // cvttps2dq    xmm4, xmm4
+       LONG $0x2c100f42; BYTE $0x0f   // movups    xmm5, oword [rdi + r9]
+       WORD $0x560f; BYTE $0xec       // orps    xmm5, xmm4
+       LONG $0x64100f42; WORD $0x100f // movups    xmm4, oword [rdi + r9 + 16]
+       LONG $0x2c110f42; BYTE $0x0f   // movups    oword [rdi + r9], xmm5
+       LONG $0x40380f66; BYTE $0xda   // pmulld    xmm3, xmm2
+       LONG $0xd3720f66; BYTE $0x1b   // psrld    xmm3, 27
+       LONG $0xf3720f66; BYTE $0x17   // pslld    xmm3, 23
+       LONG $0xd9fe0f66               // paddd    xmm3, xmm1
+       LONG $0xdb5b0ff3               // cvttps2dq    xmm3, xmm3
+       WORD $0x560f; BYTE $0xdc       // orps    xmm3, xmm4
+       LONG $0x5c110f42; WORD $0x100f // movups    oword [rdi + r9 + 16], xmm3
+       WORD $0xff48; BYTE $0xc6       // inc    rsi
+       WORD $0x3948; BYTE $0xf1       // cmp    rcx, rsi
+       JNE  LBB3_2
+
+LBB3_4:
+       RET
diff --git a/parquet/metadata/bloom_filter_reader_test.go 
b/parquet/metadata/bloom_filter_reader_test.go
new file mode 100644
index 0000000..25d9505
--- /dev/null
+++ b/parquet/metadata/bloom_filter_reader_test.go
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata_test
+
+import (
+       "bytes"
+       "runtime"
+       "sync"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/internal/encryption"
+       "github.com/apache/arrow-go/v18/parquet/internal/utils"
+       "github.com/apache/arrow-go/v18/parquet/metadata"
+       "github.com/apache/arrow-go/v18/parquet/schema"
+       "github.com/stretchr/testify/suite"
+)
+
+type BloomFilterBuilderSuite struct {
+       suite.Suite
+
+       sc    *schema.Schema
+       props *parquet.WriterProperties
+       mem   *memory.CheckedAllocator
+       buf   bytes.Buffer
+}
+
+func (suite *BloomFilterBuilderSuite) SetupTest() {
+       suite.props = parquet.NewWriterProperties()
+       suite.mem = memory.NewCheckedAllocator(memory.NewGoAllocator())
+       suite.buf.Reset()
+}
+
+func (suite *BloomFilterBuilderSuite) TearDownTest() {
+       runtime.GC() // we use setfinalizer to clean up the buffers, so run the 
GC
+       suite.mem.AssertSize(suite.T(), 0)
+}
+
+func (suite *BloomFilterBuilderSuite) TestSingleRowGroup() {
+       suite.sc = schema.NewSchema(schema.MustGroup(
+               schema.NewGroupNode("schema", parquet.Repetitions.Repeated,
+                       schema.FieldList{
+                               schema.NewByteArrayNode("c1", 
parquet.Repetitions.Optional, -1),
+                               schema.NewByteArrayNode("c2", 
parquet.Repetitions.Optional, -1),
+                               schema.NewByteArrayNode("c3", 
parquet.Repetitions.Optional, -1),
+                       }, -1)))
+
+       bldr := metadata.FileBloomFilterBuilder{Schema: suite.sc}
+
+       metaBldr := metadata.NewFileMetadataBuilder(suite.sc, suite.props, nil)
+
+       {
+               rgMeta := metaBldr.AppendRowGroup()
+               filterMap := make(map[string]metadata.BloomFilterBuilder)
+               bldr.AppendRowGroup(rgMeta, filterMap)
+
+               bf1 := metadata.NewBloomFilter(32, 1024, suite.mem)
+               bf2 := metadata.NewAdaptiveBlockSplitBloomFilter(1024, 5, 0.01, 
suite.sc.Column(2), suite.mem)
+
+               h1, h2 := bf1.Hasher(), bf2.Hasher()
+
+               rgMeta.NextColumnChunk()
+               rgMeta.NextColumnChunk()
+               rgMeta.NextColumnChunk()
+               rgMeta.Finish(0, 0)
+
+               bf1.InsertHash(metadata.GetHash(h1, parquet.ByteArray("Hello")))
+               bf2.InsertHash(metadata.GetHash(h2, parquet.ByteArray("World")))
+               filterMap["c1"] = bf1
+               filterMap["c3"] = bf2
+
+               wr := &utils.TellWrapper{Writer: &suite.buf}
+               wr.Write([]byte("PAR1")) // offset of 0 means unset, so write 
something
+               // to force the offset to be set as a non-zero value
+               suite.Require().NoError(bldr.WriteTo(wr))
+       }
+       runtime.GC()
+
+       finalMeta, err := metaBldr.Finish()
+       suite.Require().NoError(err)
+       {
+               bufferPool := &sync.Pool{
+                       New: func() interface{} {
+                               buf := memory.NewResizableBuffer(suite.mem)
+                               runtime.SetFinalizer(buf, func(obj 
*memory.Buffer) {
+                                       obj.Release()
+                               })
+                               return buf
+                       },
+               }
+
+               rdr := metadata.BloomFilterReader{
+                       Input:        bytes.NewReader(suite.buf.Bytes()),
+                       FileMetadata: finalMeta,
+                       BufferPool:   bufferPool,
+               }
+
+               bfr, err := rdr.RowGroup(0)
+               suite.Require().NoError(err)
+               suite.Require().NotNil(bfr)
+
+               {
+                       bf1, err := bfr.GetColumnBloomFilter(0)
+                       suite.Require().NoError(err)
+                       suite.Require().NotNil(bf1)
+                       
suite.False(bf1.CheckHash(metadata.GetHash(bf1.Hasher(), 
parquet.ByteArray("World"))))
+                       suite.True(bf1.CheckHash(metadata.GetHash(bf1.Hasher(), 
parquet.ByteArray("Hello"))))
+               }
+               runtime.GC() // force GC to run to put the buffer back into the 
pool
+               {
+                       bf2, err := bfr.GetColumnBloomFilter(1)
+                       suite.Require().NoError(err)
+                       suite.Require().Nil(bf2)
+               }
+               {
+                       bf3, err := bfr.GetColumnBloomFilter(2)
+                       suite.Require().NoError(err)
+                       suite.Require().NotNil(bf3)
+                       
suite.False(bf3.CheckHash(metadata.GetHash(bf3.Hasher(), 
parquet.ByteArray("Hello"))))
+                       suite.True(bf3.CheckHash(metadata.GetHash(bf3.Hasher(), 
parquet.ByteArray("World"))))
+               }
+               runtime.GC() // we're using setfinalizer, so force release
+       }
+       runtime.GC()
+}
+
+const (
+       FooterEncryptionKey    = "0123456789012345"
+       ColumnEncryptionKey1   = "1234567890123450"
+       ColumnEncryptionKey2   = "1234567890123451"
+       FooterEncryptionKeyID  = "kf"
+       ColumnEncryptionKey1ID = "kc1"
+       ColumnEncryptionKey2ID = "kc2"
+)
+
+type EncryptedBloomFilterBuilderSuite struct {
+       suite.Suite
+
+       sc           *schema.Schema
+       props        *parquet.WriterProperties
+       decryptProps *parquet.FileDecryptionProperties
+       mem          *memory.CheckedAllocator
+       buf          bytes.Buffer
+}
+
+func (suite *EncryptedBloomFilterBuilderSuite) SetupTest() {
+       encryptedCols := parquet.ColumnPathToEncryptionPropsMap{
+               "c1": parquet.NewColumnEncryptionProperties("c1",
+                       parquet.WithKey(ColumnEncryptionKey1), 
parquet.WithKeyID(ColumnEncryptionKey1ID)),
+               "c2": parquet.NewColumnEncryptionProperties("c2",
+                       parquet.WithKey(ColumnEncryptionKey2), 
parquet.WithKeyID(ColumnEncryptionKey2ID)),
+       }
+
+       encProps := parquet.NewFileEncryptionProperties(FooterEncryptionKey,
+               parquet.WithFooterKeyID(FooterEncryptionKeyID),
+               parquet.WithEncryptedColumns(encryptedCols))
+
+       suite.decryptProps = parquet.NewFileDecryptionProperties(
+               parquet.WithFooterKey(FooterEncryptionKey),
+               parquet.WithColumnKeys(parquet.ColumnPathToDecryptionPropsMap{
+                       "c1": parquet.NewColumnDecryptionProperties("c1", 
parquet.WithDecryptKey(ColumnEncryptionKey1)),
+                       "c2": parquet.NewColumnDecryptionProperties("c2", 
parquet.WithDecryptKey(ColumnEncryptionKey2)),
+               }))
+
+       suite.props = 
parquet.NewWriterProperties(parquet.WithEncryptionProperties(encProps))
+       suite.mem = memory.NewCheckedAllocator(memory.NewGoAllocator())
+       suite.buf.Reset()
+}
+
+func (suite *EncryptedBloomFilterBuilderSuite) TearDownTest() {
+       runtime.GC() // we use setfinalizer to clean up the buffers, so run the 
GC
+       suite.mem.AssertSize(suite.T(), 0)
+}
+
+func (suite *EncryptedBloomFilterBuilderSuite) TestEncryptedBloomFilters() {
+       suite.sc = schema.NewSchema(schema.MustGroup(
+               schema.NewGroupNode("schema", parquet.Repetitions.Repeated,
+                       schema.FieldList{
+                               schema.NewByteArrayNode("c1", 
parquet.Repetitions.Optional, -1),
+                               schema.NewByteArrayNode("c2", 
parquet.Repetitions.Optional, -1),
+                               schema.NewByteArrayNode("c3", 
parquet.Repetitions.Optional, -1),
+                       }, -1)))
+
+       encryptor := 
encryption.NewFileEncryptor(suite.props.FileEncryptionProperties(), suite.mem)
+       metaBldr := metadata.NewFileMetadataBuilder(suite.sc, suite.props, nil)
+       metaBldr.SetFileEncryptor(encryptor)
+       bldr := metadata.FileBloomFilterBuilder{Schema: suite.sc, Encryptor: 
encryptor}
+       {
+               rgMeta := metaBldr.AppendRowGroup()
+               filterMap := make(map[string]metadata.BloomFilterBuilder)
+               bldr.AppendRowGroup(rgMeta, filterMap)
+
+               bf1 := metadata.NewBloomFilter(32, 1024, suite.mem)
+               bf2 := metadata.NewAdaptiveBlockSplitBloomFilter(1024, 5, 0.01, 
suite.sc.Column(1), suite.mem)
+               h1, h2 := bf1.Hasher(), bf2.Hasher()
+
+               bf1.InsertHash(metadata.GetHash(h1, parquet.ByteArray("Hello")))
+               bf2.InsertHash(metadata.GetHash(h2, parquet.ByteArray("World")))
+               filterMap["c1"] = bf1
+               filterMap["c2"] = bf2
+
+               colChunk1 := rgMeta.NextColumnChunk()
+               colChunk1.Finish(metadata.ChunkMetaInfo{}, false, false, 
metadata.EncodingStats{})
+
+               colChunk2 := rgMeta.NextColumnChunk()
+               colChunk2.Finish(metadata.ChunkMetaInfo{}, false, false, 
metadata.EncodingStats{})
+
+               colChunk3 := rgMeta.NextColumnChunk()
+               colChunk3.Finish(metadata.ChunkMetaInfo{}, false, false, 
metadata.EncodingStats{})
+
+               wr := &utils.TellWrapper{Writer: &suite.buf}
+               wr.Write([]byte("PAR1")) // offset of 0 means unset, so write 
something
+               // to force the offset to be set as a non-zero value
+               suite.Require().NoError(bldr.WriteTo(wr))
+
+               rgMeta.Finish(0, 0)
+       }
+
+       finalMeta, err := metaBldr.Finish()
+       suite.Require().NoError(err)
+       finalMeta.FileDecryptor = 
encryption.NewFileDecryptor(suite.decryptProps,
+               suite.props.FileEncryptionProperties().FileAad(),
+               suite.props.FileEncryptionProperties().Algorithm().Algo, "", 
suite.mem)
+       {
+               bufferPool := &sync.Pool{
+                       New: func() interface{} {
+                               buf := memory.NewResizableBuffer(suite.mem)
+                               runtime.SetFinalizer(buf, func(obj 
*memory.Buffer) {
+                                       obj.Release()
+                               })
+                               return buf
+                       },
+               }
+               defer runtime.GC()
+
+               rdr := metadata.BloomFilterReader{
+                       Input:         bytes.NewReader(suite.buf.Bytes()),
+                       FileMetadata:  finalMeta,
+                       BufferPool:    bufferPool,
+                       FileDecryptor: finalMeta.FileDecryptor,
+               }
+
+               bfr, err := rdr.RowGroup(0)
+               suite.Require().NoError(err)
+               suite.Require().NotNil(bfr)
+
+               {
+                       bf1, err := bfr.GetColumnBloomFilter(0)
+                       suite.Require().NoError(err)
+                       suite.Require().NotNil(bf1)
+                       
suite.False(bf1.CheckHash(metadata.GetHash(bf1.Hasher(), 
parquet.ByteArray("World"))))
+                       suite.True(bf1.CheckHash(metadata.GetHash(bf1.Hasher(), 
parquet.ByteArray("Hello"))))
+               }
+       }
+}
+
+func TestBloomFilterRoundTrip(t *testing.T) {
+       suite.Run(t, new(BloomFilterBuilderSuite))
+       suite.Run(t, new(EncryptedBloomFilterBuilderSuite))
+}
diff --git a/parquet/metadata/bloom_filter_test.go 
b/parquet/metadata/bloom_filter_test.go
new file mode 100644
index 0000000..1f8a601
--- /dev/null
+++ b/parquet/metadata/bloom_filter_test.go
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metadata
+
+import (
+       "fmt"
+       "math/rand/v2"
+       "runtime"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow/bitutil"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestSplitBlockFilter(t *testing.T) {
+       const N = 1000
+       const S = 3
+       const P = 0.01
+
+       bf := blockSplitBloomFilter{
+               bitset32: make([]uint32, optimalNumBytes(N, P)),
+       }
+
+       p := rand.New(rand.NewPCG(S, S))
+       for i := 0; i < N; i++ {
+               bf.InsertHash(p.Uint64())
+       }
+
+       falsePositives := 0
+       p = rand.New(rand.NewPCG(S, S))
+       for i := 0; i < N; i++ {
+               x := p.Uint64()
+
+               if !bf.CheckHash(x) {
+                       t.Fatalf("bloom filter block does not contain value #%d 
that was inserted %d", i, x)
+               }
+
+               if bf.CheckHash(^x) {
+                       falsePositives++
+               }
+       }
+
+       if r := (float64(falsePositives) / N); r > P {
+               t.Fatalf("false positive rate is too high: %f", r)
+       }
+}
+
+func testHash[T parquet.ColumnTypes](t assert.TestingT, h Hasher, vals []T) {
+       results := GetHashes(h, vals)
+       assert.Len(t, results, len(vals))
+       for i, v := range vals {
+               assert.Equal(t, GetHash(h, v), results[i])
+       }
+
+       var (
+               nvalid     = int64(len(vals))
+               validBits  = make([]byte, bitutil.BytesForBits(2*nvalid))
+               spacedVals = make([]T, 2*nvalid)
+       )
+
+       for i, v := range vals {
+               spacedVals[i*2] = v
+               bitutil.SetBit(validBits, i*2)
+
+       }
+
+       results = GetSpacedHashes(h, nvalid, spacedVals, validBits, 0)
+       assert.Len(t, results, len(vals))
+       for i, v := range vals {
+               assert.Equal(t, GetHash(h, v), results[i])
+       }
+}
+
+func TestGetHashes(t *testing.T) {
+       var (
+               h      xxhasher
+               valsBA = []parquet.ByteArray{
+                       []byte("hello"),
+                       []byte("world"),
+               }
+
+               valsFLBA = []parquet.FixedLenByteArray{
+                       []byte("hello"),
+                       []byte("world"),
+               }
+
+               valsI32 = []int32{42, 43}
+       )
+
+       assert.Len(t, GetSpacedHashes[int32](h, 0, nil, nil, 0), 0)
+
+       testHash(t, h, valsBA)
+       testHash(t, h, valsFLBA)
+       testHash(t, h, valsI32)
+}
+
+func TestNewBloomFilter(t *testing.T) {
+       tests := []struct {
+               ndv           uint32
+               fpp           float64
+               maxBytes      int64
+               expectedBytes int64
+       }{
+               {1, 0.09, 0, 0},
+               // cap at maximumBloomFilterBytes
+               {1024 * 1024 * 128, 0.9, maximumBloomFilterBytes + 1, 
maximumBloomFilterBytes},
+               // round to power of 2
+               {1024 * 1024, 0.01, maximumBloomFilterBytes, 1 << 21},
+       }
+
+       for _, tt := range tests {
+               t.Run(fmt.Sprintf("ndv=%d,fpp=%0.3f", tt.ndv, tt.fpp), func(t 
*testing.T) {
+                       mem := 
memory.NewCheckedAllocator(memory.DefaultAllocator)
+                       defer mem.AssertSize(t, 0)
+
+                       {
+                               bf := NewBloomFilterFromNDVAndFPP(tt.ndv, 
tt.fpp, tt.maxBytes, mem)
+                               assert.EqualValues(t, tt.expectedBytes, 
bf.Size())
+                               runtime.GC()
+                       }
+                       runtime.GC() // force GC to run and do the cleanup 
routines
+               })
+       }
+}
+
+func BenchmarkFilterInsert(b *testing.B) {
+       bf := blockSplitBloomFilter{bitset32: make([]uint32, 8)}
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               bf.InsertHash(uint64(i))
+       }
+       b.SetBytes(bytesPerFilterBlock)
+}
+
+func BenchmarkFilterCheck(b *testing.B) {
+       bf := blockSplitBloomFilter{bitset32: make([]uint32, 8)}
+       bf.InsertHash(42)
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               bf.CheckHash(42)
+       }
+       b.SetBytes(bytesPerFilterBlock)
+}
+
+func BenchmarkFilterCheckBulk(b *testing.B) {
+       bf := blockSplitBloomFilter{bitset32: make([]uint32, 
99*bitsSetPerBlock)}
+       x := make([]uint64, 16)
+       r := rand.New(rand.NewPCG(0, 0))
+       for i := range x {
+               x[i] = r.Uint64()
+       }
+
+       bf.InsertBulk(x)
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               bf.CheckBulk(x)
+       }
+       b.SetBytes(bytesPerFilterBlock * int64(len(x)))
+}
+
+func BenchmarkFilterInsertBulk(b *testing.B) {
+       bf := blockSplitBloomFilter{bitset32: make([]uint32, 
99*bitsSetPerBlock)}
+       x := make([]uint64, 16)
+       r := rand.New(rand.NewPCG(0, 0))
+       for i := range x {
+               x[i] = r.Uint64()
+       }
+
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               bf.InsertBulk(x)
+       }
+       b.SetBytes(bytesPerFilterBlock * int64(len(x)))
+}
diff --git a/parquet/metadata/cleanup_bloom_filter.go 
b/parquet/metadata/cleanup_bloom_filter.go
new file mode 100644
index 0000000..ed835c3
--- /dev/null
+++ b/parquet/metadata/cleanup_bloom_filter.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build go1.24
+
+package metadata
+
+import (
+       "runtime"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow/memory"
+)
+
+func addCleanup(bf *blockSplitBloomFilter, bufferPool *sync.Pool) {
+       runtime.AddCleanup(bf, func(data *memory.Buffer) {
+               if bufferPool != nil {
+                       data.ResizeNoShrink(0)
+                       bufferPool.Put(data)
+               } else {
+                       data.Release()
+               }
+       }, bf.data)
+}
diff --git a/parquet/metadata/cleanup_bloom_filter_go1.23.go 
b/parquet/metadata/cleanup_bloom_filter_go1.23.go
new file mode 100644
index 0000000..b4bffbe
--- /dev/null
+++ b/parquet/metadata/cleanup_bloom_filter_go1.23.go
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !go1.24
+
+package metadata
+
+import (
+       "runtime"
+       "sync"
+)
+
+func addCleanup(bf *blockSplitBloomFilter, bufferPool *sync.Pool) {
+       runtime.SetFinalizer(bf, func(f *blockSplitBloomFilter) {
+               if bufferPool != nil {
+                       f.data.ResizeNoShrink(0)
+                       bufferPool.Put(f.data)
+               } else {
+                       f.data.Release()
+               }
+       })
+}
diff --git a/parquet/metadata/column_chunk.go b/parquet/metadata/column_chunk.go
index 22c9b9e..848d20f 100644
--- a/parquet/metadata/column_chunk.go
+++ b/parquet/metadata/column_chunk.go
@@ -218,6 +218,13 @@ func (c *ColumnChunkMetaData) BloomFilterOffset() int64 {
        return c.columnMeta.GetBloomFilterOffset()
 }
 
+// BloomFilterLength is the length of the serialized bloomfilter including the
+// serialized bloom filter header. This was only added in 2.10 so it may not 
exist,
+// returning 0 in that case.
+func (c *ColumnChunkMetaData) BloomFilterLength() int32 {
+       return c.columnMeta.GetBloomFilterLength()
+}
+
 // StatsSet returns true only if there are statistics set in the metadata and 
the column
 // descriptor has a sort order that is not SortUnknown
 //
@@ -267,6 +274,7 @@ type ColumnChunkMetaDataBuilder struct {
 
        compressedSize   int64
        uncompressedSize int64
+       fileOffset       int64
 }
 
 func NewColumnChunkMetaDataBuilder(props *parquet.WriterProperties, column 
*schema.Column) *ColumnChunkMetaDataBuilder {
@@ -347,14 +355,15 @@ type EncodingStats struct {
 }
 
 // Finish finalizes the metadata with the given offsets,
-// flushes any compression that needs to be done, and performs
-// any encryption if an encryptor is provided.
-func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict, 
dictFallback bool, encStats EncodingStats, metaEncryptor encryption.Encryptor) 
error {
+// flushes any compression that needs to be done.
+// Encryption will be performed by calling PopulateCryptoData
+// after this function is called.
+func (c *ColumnChunkMetaDataBuilder) Finish(info ChunkMetaInfo, hasDict, 
dictFallback bool, encStats EncodingStats) error {
        if info.DictPageOffset > 0 {
                c.chunk.MetaData.DictionaryPageOffset = &info.DictPageOffset
-               c.chunk.FileOffset = info.DictPageOffset + info.CompressedSize
+               c.fileOffset = info.DictPageOffset
        } else {
-               c.chunk.FileOffset = info.DataPageOffset + info.CompressedSize
+               c.fileOffset = info.DataPageOffset
        }
 
        c.chunk.MetaData.NumValues = info.NumValues
@@ -411,6 +420,10 @@ func (c *ColumnChunkMetaDataBuilder) Finish(info 
ChunkMetaInfo, hasDict, dictFal
        }
        c.chunk.MetaData.EncodingStats = thriftEncodingStats
 
+       return nil
+}
+
+func (c *ColumnChunkMetaDataBuilder) PopulateCryptoData(encryptor 
encryption.Encryptor) error {
        encryptProps := c.props.ColumnEncryptionProperties(c.column.Path())
        if encryptProps != nil && encryptProps.IsEncrypted() {
                ccmd := format.NewColumnCryptoMetaData()
@@ -436,7 +449,7 @@ func (c *ColumnChunkMetaDataBuilder) Finish(info 
ChunkMetaInfo, hasDict, dictFal
                                return err
                        }
                        var buf bytes.Buffer
-                       metaEncryptor.Encrypt(&buf, data)
+                       encryptor.Encrypt(&buf, data)
                        c.chunk.EncryptedColumnMetadata = buf.Bytes()
 
                        if encryptedFooter {
diff --git a/parquet/metadata/file.go b/parquet/metadata/file.go
index 39a3192..95d3813 100644
--- a/parquet/metadata/file.go
+++ b/parquet/metadata/file.go
@@ -47,6 +47,7 @@ type FileMetaDataBuilder struct {
        currentRgBldr  *RowGroupMetaDataBuilder
        kvmeta         KeyValueMetadata
        cryptoMetadata *format.FileCryptoMetaData
+       fileEncryptor  encryption.FileEncryptor
 }
 
 // NewFileMetadataBuilder will use the default writer properties if nil is 
passed for
@@ -65,6 +66,10 @@ func NewFileMetadataBuilder(schema *schema.Schema, props 
*parquet.WriterProperti
        }
 }
 
+func (f *FileMetaDataBuilder) SetFileEncryptor(encryptor 
encryption.FileEncryptor) {
+       f.fileEncryptor = encryptor
+}
+
 // GetFileCryptoMetaData returns the cryptographic information for encrypting/
 // decrypting the file.
 func (f *FileMetaDataBuilder) GetFileCryptoMetaData() *FileCryptoMetadata {
@@ -92,6 +97,7 @@ func (f *FileMetaDataBuilder) AppendRowGroup() 
*RowGroupMetaDataBuilder {
        rg := format.NewRowGroup()
        f.rowGroups = append(f.rowGroups, rg)
        f.currentRgBldr = NewRowGroupMetaDataBuilder(f.props, f.schema, rg)
+       f.currentRgBldr.fileEncryptor = f.fileEncryptor
        return f.currentRgBldr
 }
 
diff --git a/parquet/metadata/metadata_test.go 
b/parquet/metadata/metadata_test.go
index 2f35d63..fccfbe4 100644
--- a/parquet/metadata/metadata_test.go
+++ b/parquet/metadata/metadata_test.go
@@ -46,8 +46,8 @@ func generateTableMetaData(schema *schema.Schema, props 
*parquet.WriterPropertie
        statsFloat.Signed = true
        col2Builder.SetStats(statsFloat)
 
-       col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 4, 0, 10, 
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, 
dataEncodingStats}, nil)
-       col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 24, 0, 30, 
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, 
dataEncodingStats}, nil)
+       col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 4, 0, 10, 
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, 
dataEncodingStats})
+       col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 24, 0, 30, 
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, 
dataEncodingStats})
 
        rg1Builder.SetNumRows(nrows / 2)
        rg1Builder.Finish(1024, -1)
@@ -60,8 +60,8 @@ func generateTableMetaData(schema *schema.Schema, props 
*parquet.WriterPropertie
        col1Builder.SetStats(statsInt)
        col2Builder.SetStats(statsFloat)
        dictEncodingStats = make(map[parquet.Encoding]int32)
-       col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 0 
/*dictionary page offset*/, 0, 10, 512, 600}, false /* has dictionary */, 
false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
-       col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 16, 0, 26, 
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, 
dataEncodingStats}, nil)
+       col1Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 0 
/*dictionary page offset*/, 0, 10, 512, 600}, false /* has dictionary */, 
false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats})
+       col2Builder.Finish(metadata.ChunkMetaInfo{int64(nrows) / 2, 16, 0, 26, 
512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, 
dataEncodingStats})
 
        rg2Builder.SetNumRows(nrows / 2)
        rg2Builder.Finish(1024, -1)
diff --git a/parquet/metadata/row_group.go b/parquet/metadata/row_group.go
index e4ec4c7..5ccd2e6 100644
--- a/parquet/metadata/row_group.go
+++ b/parquet/metadata/row_group.go
@@ -99,6 +99,8 @@ type RowGroupMetaDataBuilder struct {
        schema      *schema.Schema
        colBuilders []*ColumnChunkMetaDataBuilder
        nextCol     int
+
+       fileEncryptor encryption.FileEncryptor
 }
 
 // NewRowGroupMetaDataBuilder returns a builder using the given properties and 
underlying thrift object.
@@ -166,21 +168,24 @@ func (r *RowGroupMetaDataBuilder) Finish(_ int64, ordinal 
int16) error {
                totalUncompressed int64
        )
 
-       for idx, col := range r.rg.Columns {
-               if col.FileOffset < 0 {
-                       return fmt.Errorf("parquet: Column %d is not complete", 
idx)
-               }
+       for idx := range r.rg.Columns {
                if idx == 0 {
-                       if col.MetaData.IsSetDictionaryPageOffset() && 
col.MetaData.GetDictionaryPageOffset() > 0 {
-                               fileOffset = 
col.MetaData.GetDictionaryPageOffset()
-                       } else {
-                               fileOffset = col.MetaData.DataPageOffset
-                       }
+                       fileOffset = r.colBuilders[idx].fileOffset
                }
+
                // sometimes column metadata is encrypted and not available to 
read
                // so we must get total compressed size from column builder
                totalCompressed += r.colBuilders[idx].TotalCompressedSize()
                totalUncompressed += r.colBuilders[idx].TotalUncompressedSize()
+
+               if r.fileEncryptor != nil {
+                       enc := 
r.fileEncryptor.GetColumnMetaEncryptor(r.colBuilders[idx].Descr().Path())
+                       if enc != nil {
+                               
enc.UpdateAad(encryption.CreateModuleAad(enc.FileAad(), 
encryption.ColumnMetaModule,
+                                       ordinal, int16(idx), -1))
+                               r.colBuilders[idx].PopulateCryptoData(enc)
+                       }
+               }
        }
 
        if len(r.props.SortingColumns()) > 0 {

Reply via email to