This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new e7be6ddd1 feat(server): support A2A protocol (#2656)
e7be6ddd1 is described below
commit e7be6ddd1f5dff848c0ccc087cfb37d4119e6aaa
Author: Tyooughtul <[email protected]>
AuthorDate: Fri Apr 10 16:43:26 2026 +0800
feat(server): support A2A protocol (#2656)
A2A protocol requires JWKS support to enable secure agent authentication
with multiple identity providers. This change allows agents from
different tenants to authenticate using their own public keys, and
supports key rotation without requiring server restarts.
Closes #1762
---
.typos.toml | 2 +
Cargo.lock | 174 +++++++----
Cargo.toml | 3 +-
DEPENDENCIES.md | 35 ++-
core/common/src/error/iggy_error.rs | 2 +
.../http_config/http_client_config.rs | 4 +
.../http_config/http_client_config_builder.rs | 6 +
core/configs/src/server_config/defaults.rs | 1 +
core/configs/src/server_config/http.rs | 11 +
core/harness_derive/src/attrs.rs | 55 ++++
core/harness_derive/src/codegen.rs | 32 +-
core/integration/Cargo.toml | 2 +
.../src/harness/config/jwks.rs} | 16 +-
core/integration/src/harness/config/mod.rs | 2 +
core/integration/src/harness/handle/server.rs | 2 +-
core/integration/src/harness/mod.rs | 2 +-
.../src/harness/orchestrator/builder.rs | 13 +-
.../src/harness/orchestrator/harness.rs | 44 ++-
core/integration/tests/server/a2a_jwt/config.toml | 66 ++++
core/integration/tests/server/a2a_jwt/jwt_tests.rs | 331 +++++++++++++++++++++
.../tests/server/a2a_jwt}/mod.rs | 10 +-
.../server/a2a_jwt/wiremock/__files/jwks.json | 10 +
.../server/a2a_jwt/wiremock/mappings/jwks.json | 13 +
core/integration/tests/server/mod.rs | 1 +
core/sdk/Cargo.toml | 2 +-
core/sdk/src/client_provider.rs | 1 +
core/sdk/src/clients/client_builder.rs | 6 +
core/sdk/src/clients/consumer.rs | 3 +
core/sdk/src/http/http_client.rs | 4 +-
core/server/Cargo.toml | 1 +
core/server/config.toml | 6 +
core/server/src/http/jwt/json_web_token.rs | 109 ++++++-
core/server/src/http/jwt/jwks.rs | 316 ++++++++++++++++++++
core/server/src/http/jwt/jwt_manager.rs | 200 +++++++++++--
core/server/src/http/jwt/middleware.rs | 12 +-
core/server/src/http/jwt/mod.rs | 1 +
36 files changed, 1379 insertions(+), 119 deletions(-)
diff --git a/.typos.toml b/.typos.toml
index bf6e17ec9..f310651c3 100644
--- a/.typos.toml
+++ b/.typos.toml
@@ -142,4 +142,6 @@ extend-exclude = [
"go.work",
# Benchmark dashboard frontend build output
"core/bench/dashboard/frontend/dist",
+ # WireMock test fixtures with base64-encoded JWT data
+ "**/wiremock/__files/*.json",
]
diff --git a/Cargo.lock b/Cargo.lock
index cb11aa7ff..c7416b066 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -436,9 +436,9 @@ checksum =
"c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1"
[[package]]
name = "arc-swap"
-version = "1.9.0"
+version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a07d1f37ff60921c83bdfc7407723bdefe89b44b98a9b772f225c8f9d67141a6"
+checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207"
dependencies = [
"rustversion",
]
@@ -589,7 +589,7 @@ dependencies = [
"arrow-schema",
"chrono",
"half",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"itoa",
"lexical-core",
"memchr",
@@ -704,6 +704,16 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "assert-json-diff"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
+dependencies = [
+ "serde",
+ "serde_json",
+]
+
[[package]]
name = "assert_cmd"
version = "2.2.0"
@@ -1554,7 +1564,7 @@ dependencies = [
"getrandom 0.2.17",
"getrandom 0.3.4",
"hex",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"js-sys",
"once_cell",
"rand 0.9.2",
@@ -2148,9 +2158,9 @@ dependencies = [
[[package]]
name = "compio-tls"
-version = "0.9.0"
+version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e462f3f836226cc293795c87d8e7df783ca7f88811e433ee79a9a2eace0b253"
+checksum = "3a7056da226af42cda4c83b00a021cce3e1ee5f4cffc8a0ff8801381e618cf1c"
dependencies = [
"compio-buf",
"compio-io",
@@ -2161,9 +2171,9 @@ dependencies = [
[[package]]
name = "compio-ws"
-version = "0.3.0"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32b0174d0a3da33ac73efddbe62a3fb046a9bc3a58124b2f8c1d2e0354e54222"
+checksum = "99d45f47c6e64babcaa6b8df1dffced56012e60e58401255e679f428ddbe9fb6"
dependencies = [
"compio-buf",
"compio-io",
@@ -2927,6 +2937,24 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "deadpool"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b"
+dependencies = [
+ "deadpool-runtime",
+ "lazy_static",
+ "num_cpus",
+ "tokio",
+]
+
+[[package]]
+name = "deadpool-runtime"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
+
[[package]]
name = "debugid"
version = "0.8.0"
@@ -2957,7 +2985,7 @@ dependencies = [
"deno_path_util",
"deno_unsync",
"futures",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"libc",
"parking_lot",
"percent-encoding",
@@ -3012,7 +3040,7 @@ version = "0.227.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bab1eaf578a8cc0ae6fb933e91dc3388b41df22e5974d5891c17ba66b3a0bbb"
dependencies = [
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"proc-macro-rules",
"proc-macro2",
"quote",
@@ -3715,11 +3743,11 @@ dependencies = [
[[package]]
name = "fastrand"
-version = "2.3.0"
+version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
+checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
dependencies = [
- "getrandom 0.2.17",
+ "getrandom 0.3.4",
]
[[package]]
@@ -4248,9 +4276,9 @@ dependencies = [
[[package]]
name = "gif"
-version = "0.14.1"
+version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f5df2ba84018d80c213569363bdcd0c64e6933c67fe4c1d60ecf822971a3c35e"
+checksum = "ee8cfcc411d9adbbaba82fb72661cc1bcca13e8bba98b364e62b2dba8f960159"
dependencies = [
"color_quant",
"weezl",
@@ -4719,7 +4747,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.12",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"slab",
"tokio",
"tokio-util",
@@ -4738,7 +4766,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http 1.4.0",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"slab",
"tokio",
"tokio-util",
@@ -4844,6 +4872,12 @@ dependencies = [
"foldhash 0.2.0",
]
+[[package]]
+name = "hashbrown"
+version = "0.17.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51"
+
[[package]]
name = "hashlink"
version = "0.10.0"
@@ -5434,7 +5468,7 @@ checksum =
"cd62e6b5e86ea8eeeb8db1de02880a6abc01a397b2ebb64b5d74ac255318f5cb"
[[package]]
name = "iggy"
-version = "0.10.0"
+version = "0.10.1"
dependencies = [
"async-broadcast",
"async-dropper",
@@ -5517,7 +5551,7 @@ dependencies = [
"tracing-subscriber",
"uuid",
"walkdir",
- "zip 8.5.0",
+ "zip 8.5.1",
]
[[package]]
@@ -6006,7 +6040,7 @@ dependencies = [
"byteorder-lite",
"color_quant",
"exr",
- "gif 0.14.1",
+ "gif 0.14.2",
"image-webp",
"moxcms",
"num-traits",
@@ -6055,7 +6089,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "1689b939ee35e3a075b0834b5672efd43aec8a6e81a1c6002b76a5ca2f211ae0"
dependencies = [
"implicit-clone-derive",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
]
[[package]]
@@ -6081,12 +6115,12 @@ dependencies = [
[[package]]
name = "indexmap"
-version = "2.13.1"
+version = "2.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff"
+checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9"
dependencies = [
"equivalent",
- "hashbrown 0.16.1",
+ "hashbrown 0.17.0",
"serde",
"serde_core",
]
@@ -6160,6 +6194,7 @@ dependencies = [
"iggy_binary_protocol",
"iggy_common",
"iggy_connector_sdk",
+ "jsonwebtoken",
"keyring",
"lazy_static",
"libc",
@@ -6189,7 +6224,8 @@ dependencies = [
"tracing-subscriber",
"twox-hash",
"uuid",
- "zip 8.5.0",
+ "wiremock",
+ "zip 8.5.1",
]
[[package]]
@@ -6690,9 +6726,9 @@ dependencies = [
[[package]]
name = "liblzma-sys"
-version = "0.4.5"
+version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f2db66f3268487b5033077f266da6777d057949b8f93c8ad82e441df25e6186"
+checksum = "1a60851d15cd8c5346eca4ab8babff585be2ae4bc8097c067291d3ffe2add3b6"
dependencies = [
"cc",
"libc",
@@ -6717,14 +6753,14 @@ dependencies = [
[[package]]
name = "libredox"
-version = "0.1.15"
+version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08"
+checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c"
dependencies = [
"bitflags 2.11.0",
"libc",
"plain",
- "redox_syscall 0.7.3",
+ "redox_syscall 0.7.4",
]
[[package]]
@@ -6739,9 +6775,9 @@ dependencies = [
[[package]]
name = "libz-sys"
-version = "1.1.25"
+version = "1.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d52f4c29e2a68ac30c9087e1b772dc9f44a2b66ed44edf2266cf2be9b03dafc1"
+checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22"
dependencies = [
"cc",
"libc",
@@ -7708,9 +7744,9 @@ checksum =
"7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
[[package]]
name = "openssl-src"
-version = "300.5.5+3.5.5"
+version = "300.6.0+3.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3f1787d533e03597a7934fd0a765f0d28e94ecc5fb7789f8053b1e699a56f709"
+checksum = "a8e8cbfd3a4a8c8f089147fd7aaa33cf8c7450c4d09f8f80698a0cf093abeff4"
dependencies = [
"cc",
]
@@ -8980,9 +9016,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
-version = "0.7.3"
+version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16"
+checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a"
dependencies = [
"bitflags 2.11.0",
]
@@ -9840,9 +9876,9 @@ dependencies = [
[[package]]
name = "semver"
-version = "1.0.27"
+version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2"
+checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd"
dependencies = [
"serde",
"serde_core",
@@ -9940,7 +9976,7 @@ version = "1.0.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86"
dependencies = [
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"itoa",
"memchr",
"serde",
@@ -10024,7 +10060,7 @@ dependencies = [
"chrono",
"hex",
"indexmap 1.9.3",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"schemars 0.9.0",
"schemars 1.2.1",
"serde_core",
@@ -10051,7 +10087,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4db627b98b36d4203a7b458cf3573730f2bb591b28871d916dfa9efabfd41f"
dependencies = [
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"itoa",
"ryu",
"serde",
@@ -10139,6 +10175,7 @@ dependencies = [
"secrecy",
"send_wrapper",
"serde",
+ "serde_json",
"slab",
"socket2 0.6.3",
"strum 0.28.0",
@@ -10564,7 +10601,7 @@ dependencies = [
"futures-util",
"hashbrown 0.15.5",
"hashlink",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"log",
"memchr",
"once_cell",
@@ -11333,9 +11370,9 @@ checksum =
"1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.51.0"
+version = "1.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd"
+checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c"
dependencies = [
"bytes",
"libc",
@@ -11445,7 +11482,7 @@ version = "1.1.2+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee"
dependencies = [
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"serde_core",
"serde_spanned 1.1.1",
"toml_datetime 1.1.1+spec-1.1.0",
@@ -11478,7 +11515,7 @@ version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"toml_datetime 0.6.11",
"winnow 0.5.40",
]
@@ -11489,7 +11526,7 @@ version = "0.22.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
dependencies = [
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"serde",
"serde_spanned 0.6.9",
"toml_datetime 0.6.11",
@@ -11579,7 +11616,7 @@ checksum =
"ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
"futures-core",
"futures-util",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"pin-project-lite",
"slab",
"sync_wrapper",
@@ -12397,7 +12434,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
dependencies = [
"anyhow",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"wasm-encoder",
"wasmparser",
]
@@ -12446,7 +12483,7 @@ checksum =
"47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags 2.11.0",
"hashbrown 0.15.5",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"semver",
]
@@ -13114,6 +13151,29 @@ version = "0.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904"
+[[package]]
+name = "wiremock"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031"
+dependencies = [
+ "assert-json-diff",
+ "base64 0.22.1",
+ "deadpool",
+ "futures",
+ "http 1.4.0",
+ "http-body-util",
+ "hyper",
+ "hyper-util",
+ "log",
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tokio",
+ "url",
+]
+
[[package]]
name = "wit-bindgen"
version = "0.51.0"
@@ -13142,7 +13202,7 @@ checksum =
"b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
"heck",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"prettyplease",
"syn 2.0.117",
"wasm-metadata",
@@ -13173,7 +13233,7 @@ checksum =
"9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags 2.11.0",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"log",
"serde",
"serde_derive",
@@ -13192,7 +13252,7 @@ checksum =
"ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
dependencies = [
"anyhow",
"id-arena",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"log",
"semver",
"serde",
@@ -13282,7 +13342,7 @@ dependencies = [
"futures",
"gloo 0.11.0",
"implicit-clone",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"js-sys",
"rustversion",
"serde",
@@ -13472,13 +13532,13 @@ dependencies = [
[[package]]
name = "zip"
-version = "8.5.0"
+version = "8.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2726508a48f38dceb22b35ecbbd2430efe34ff05c62bd3285f965d7911b33464"
+checksum = "dcab981e19633ebcf0b001ddd37dd802996098bc1864f90b7c5d970ce76c1d59"
dependencies = [
"crc32fast",
"flate2",
- "indexmap 2.13.1",
+ "indexmap 2.14.0",
"memchr",
"typed-path",
"zopfli",
diff --git a/Cargo.toml b/Cargo.toml
index 6539453b2..320c827eb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -166,7 +166,7 @@ hwlocality = "1.0.0-alpha.12"
iceberg = "0.9.0"
iceberg-catalog-rest = "0.9.0"
iceberg-storage-opendal = "0.9.0"
-iggy = { path = "core/sdk", version = "0.10.0" }
+iggy = { path = "core/sdk", version = "0.10.1" }
iggy-cli = { path = "core/cli", version = "0.13.0" }
iggy_binary_protocol = { path = "core/binary_protocol", version = "0.10.0" }
iggy_common = { path = "core/common", version = "0.10.0" }
@@ -293,6 +293,7 @@ trait-variant = "0.1.2"
tungstenite = "0.29.0"
twox-hash = { version = "2.1.2", features = ["xxhash32"] }
ulid = "1.2.1"
+ureq = "2.10"
uuid = { version = "1.23.0", features = ["v4", "v7", "fast-rng", "serde",
"zerocopy"] }
vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc",
"si"] }
walkdir = "2.5.0"
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 5c5b78edf..739266563 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -33,7 +33,7 @@ anstyle-wincon: 3.0.11, "Apache-2.0 OR MIT",
anyhow: 1.0.102, "Apache-2.0 OR MIT",
apache-avro: 0.21.0, "Apache-2.0",
arbitrary: 1.4.2, "Apache-2.0 OR MIT",
-arc-swap: 1.9.0, "Apache-2.0 OR MIT",
+arc-swap: 1.9.1, "Apache-2.0 OR MIT",
arg_enum_proc_macro: 0.3.4, "MIT",
argon2: 0.5.3, "Apache-2.0 OR MIT",
array-init: 2.1.0, "Apache-2.0 OR MIT",
@@ -55,6 +55,7 @@ as-slice: 0.2.1, "Apache-2.0 OR MIT",
asn1-rs: 0.7.1, "Apache-2.0 OR MIT",
asn1-rs-derive: 0.6.0, "Apache-2.0 OR MIT",
asn1-rs-impl: 0.2.0, "Apache-2.0 OR MIT",
+assert-json-diff: 2.0.2, "MIT",
assert_cmd: 2.2.0, "Apache-2.0 OR MIT",
astral-tokio-tar: 0.6.0, "Apache-2.0 OR MIT",
async-broadcast: 0.7.2, "Apache-2.0 OR MIT",
@@ -180,8 +181,8 @@ compio-macros: 0.1.2, "MIT",
compio-net: 0.11.1, "MIT",
compio-quic: 0.7.2, "MIT",
compio-runtime: 0.11.0, "MIT",
-compio-tls: 0.9.0, "MIT",
-compio-ws: 0.3.0, "MIT",
+compio-tls: 0.9.1, "MIT",
+compio-ws: 0.3.1, "MIT",
compression-codecs: 0.4.37, "Apache-2.0 OR MIT",
compression-core: 0.4.31, "Apache-2.0 OR MIT",
concurrent-queue: 2.5.0, "Apache-2.0 OR MIT",
@@ -252,6 +253,8 @@ data-encoding: 2.10.0, "MIT",
data-url: 0.3.2, "Apache-2.0 OR MIT",
dbus: 0.9.10, "Apache-2.0 OR MIT",
dbus-secret-service: 4.1.0, "Apache-2.0 OR MIT",
+deadpool: 0.12.3, "Apache-2.0 OR MIT",
+deadpool-runtime: 0.1.4, "Apache-2.0 OR MIT",
debugid: 0.8.0, "Apache-2.0",
deno_core: 0.351.0, "MIT",
deno_core_icudata: 0.74.0, "MIT",
@@ -325,7 +328,7 @@ ext-trait-proc_macros: 1.0.1, "Apache-2.0 OR MIT OR Zlib",
extension-traits: 1.0.1, "Apache-2.0 OR MIT OR Zlib",
fastbloom: 0.14.1, "Apache-2.0 OR MIT",
fastnum: 0.7.4, "Apache-2.0 OR MIT",
-fastrand: 2.3.0, "Apache-2.0 OR MIT",
+fastrand: 2.4.1, "Apache-2.0 OR MIT",
fax: 0.2.6, "MIT",
fax_derive: 0.2.0, "MIT",
fdeflate: 0.3.7, "Apache-2.0 OR MIT",
@@ -379,7 +382,7 @@ getrandom: 0.4.2, "Apache-2.0 OR MIT",
ghash: 0.5.1, "Apache-2.0 OR MIT",
gherkin: 0.15.0, "Apache-2.0 OR MIT",
gif: 0.13.3, "Apache-2.0 OR MIT",
-gif: 0.14.1, "Apache-2.0 OR MIT",
+gif: 0.14.2, "Apache-2.0 OR MIT",
git2: 0.20.4, "Apache-2.0 OR MIT",
glob: 0.3.3, "Apache-2.0 OR MIT",
globset: 0.4.18, "MIT OR Unlicense",
@@ -425,6 +428,7 @@ hashbrown: 0.12.3, "Apache-2.0 OR MIT",
hashbrown: 0.14.5, "Apache-2.0 OR MIT",
hashbrown: 0.15.5, "Apache-2.0 OR MIT",
hashbrown: 0.16.1, "Apache-2.0 OR MIT",
+hashbrown: 0.17.0, "Apache-2.0 OR MIT",
hashlink: 0.10.0, "Apache-2.0 OR MIT",
heapless: 0.7.17, "Apache-2.0 OR MIT",
heck: 0.5.0, "Apache-2.0 OR MIT",
@@ -471,7 +475,7 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
if_chain: 1.0.3, "Apache-2.0 OR MIT",
-iggy: 0.10.0, "Apache-2.0",
+iggy: 0.10.1, "Apache-2.0",
iggy-bench: 0.5.0, "Apache-2.0",
iggy-bench-dashboard-server: 0.7.0, "Apache-2.0",
iggy-cli: 0.13.0, "Apache-2.0",
@@ -502,7 +506,7 @@ impl-more: 0.1.9, "Apache-2.0 OR MIT",
implicit-clone: 0.6.0, "Apache-2.0 OR MIT",
implicit-clone-derive: 0.1.2, "Apache-2.0 OR MIT",
indexmap: 1.9.3, "Apache-2.0 OR MIT",
-indexmap: 2.13.1, "Apache-2.0 OR MIT",
+indexmap: 2.14.0, "Apache-2.0 OR MIT",
inflections: 1.1.1, "MIT",
inlinable_string: 0.1.15, "Apache-2.0 OR MIT",
inotify: 0.11.1, "ISC",
@@ -560,12 +564,12 @@ libfuzzer-sys: 0.4.12, "(Apache-2.0 OR MIT) AND NCSA",
libgit2-sys: 0.18.3+1.9.2, "Apache-2.0 OR MIT",
libloading: 0.8.9, "ISC",
liblzma: 0.4.6, "Apache-2.0 OR MIT",
-liblzma-sys: 0.4.5, "Apache-2.0 OR MIT",
+liblzma-sys: 0.4.6, "Apache-2.0 OR MIT",
libm: 0.2.16, "MIT",
libmimalloc-sys: 0.1.44, "MIT",
-libredox: 0.1.15, "MIT",
+libredox: 0.1.16, "MIT",
libsqlite3-sys: 0.30.1, "MIT",
-libz-sys: 1.1.25, "Apache-2.0 OR MIT",
+libz-sys: 1.1.28, "Apache-2.0 OR MIT",
linked-hash-map: 0.5.6, "Apache-2.0 OR MIT",
linux-raw-sys: 0.4.15, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
linux-raw-sys: 0.12.1, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
@@ -657,7 +661,7 @@ opendal: 0.55.0, "Apache-2.0",
openssl: 0.10.76, "Apache-2.0",
openssl-macros: 0.1.1, "Apache-2.0 OR MIT",
openssl-probe: 0.2.1, "Apache-2.0 OR MIT",
-openssl-src: 300.5.5+3.5.5, "Apache-2.0 OR MIT",
+openssl-src: 300.6.0+3.6.2, "Apache-2.0 OR MIT",
openssl-sys: 0.9.112, "MIT",
opentelemetry: 0.31.0, "Apache-2.0",
opentelemetry-appender-tracing: 0.31.1, "Apache-2.0",
@@ -776,7 +780,7 @@ rayon: 1.11.0, "Apache-2.0 OR MIT",
rayon-core: 1.13.0, "Apache-2.0 OR MIT",
rcgen: 0.14.7, "Apache-2.0 OR MIT",
redox_syscall: 0.5.18, "MIT",
-redox_syscall: 0.7.3, "MIT",
+redox_syscall: 0.7.4, "MIT",
redox_users: 0.5.2, "MIT",
ref-cast: 1.0.25, "Apache-2.0 OR MIT",
ref-cast-impl: 1.0.25, "Apache-2.0 OR MIT",
@@ -846,7 +850,7 @@ secrecy: 0.10.3, "Apache-2.0 OR MIT",
security-framework: 3.7.0, "Apache-2.0 OR MIT",
security-framework-sys: 2.17.0, "Apache-2.0 OR MIT",
seize: 0.5.1, "MIT",
-semver: 1.0.27, "Apache-2.0 OR MIT",
+semver: 1.0.28, "Apache-2.0 OR MIT",
send_wrapper: 0.6.0, "Apache-2.0 OR MIT",
seq-macro: 0.3.6, "Apache-2.0 OR MIT",
serde: 1.0.228, "Apache-2.0 OR MIT",
@@ -965,7 +969,7 @@ tiny-skia-path: 0.11.4, "BSD-3-Clause",
tinystr: 0.8.3, "Unicode-3.0",
tinyvec: 1.11.0, "Apache-2.0 OR MIT OR Zlib",
tinyvec_macros: 0.1.1, "Apache-2.0 OR MIT OR Zlib",
-tokio: 1.51.0, "MIT",
+tokio: 1.51.1, "MIT",
tokio-macros: 2.7.0, "MIT",
tokio-rustls: 0.26.4, "Apache-2.0 OR MIT",
tokio-stream: 0.1.18, "MIT",
@@ -1153,6 +1157,7 @@ winnow: 0.5.40, "MIT",
winnow: 0.7.15, "MIT",
winnow: 1.0.1, "MIT",
winsafe: 0.0.19, "MIT",
+wiremock: 0.6.5, "Apache-2.0 OR MIT",
wit-bindgen: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
wit-bindgen-core: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR
MIT",
wit-bindgen-rust: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR
MIT",
@@ -1183,7 +1188,7 @@ zerotrie: 0.2.4, "Unicode-3.0",
zerovec: 0.11.6, "Unicode-3.0",
zerovec-derive: 0.11.3, "Unicode-3.0",
zip: 0.6.6, "MIT",
-zip: 8.5.0, "MIT",
+zip: 8.5.1, "MIT",
zlib-rs: 0.6.3, "Zlib",
zmij: 1.0.21, "MIT",
zopfli: 0.8.3, "Apache-2.0",
diff --git a/core/common/src/error/iggy_error.rs
b/core/common/src/error/iggy_error.rs
index fca1ed1e8..79e1c8123 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -148,6 +148,8 @@ pub enum IggyError {
AccessTokenMissing = 77,
#[error("Invalid access token")]
InvalidAccessToken = 78,
+ #[error("Cannot fetch JWKS from URL: {0}")]
+ CannotFetchJwks(String) = 79,
#[error("Invalid size bytes")]
InvalidSizeBytes = 80,
#[error("Invalid UTF-8")]
diff --git
a/core/common/src/types/configuration/http_config/http_client_config.rs
b/core/common/src/types/configuration/http_config/http_client_config.rs
index 06ec4182f..f182e1a91 100644
--- a/core/common/src/types/configuration/http_config/http_client_config.rs
+++ b/core/common/src/types/configuration/http_config/http_client_config.rs
@@ -25,6 +25,8 @@ pub struct HttpClientConfig {
pub api_url: String,
/// The number of retries to perform on transient errors.
pub retries: u32,
+ /// The JWT for A2A authentication.
+ pub jwt: Option<String>,
}
impl Default for HttpClientConfig {
@@ -32,6 +34,7 @@ impl Default for HttpClientConfig {
HttpClientConfig {
api_url: "http://127.0.0.1:3000".to_string(),
retries: 3,
+ jwt: None,
}
}
}
@@ -41,6 +44,7 @@ impl From<ConnectionString<HttpConnectionStringOptions>> for
HttpClientConfig {
HttpClientConfig {
api_url: format!("http://{}", connection_string.server_address()),
retries: connection_string.options().retries().unwrap(),
+ jwt: None,
}
}
}
diff --git
a/core/common/src/types/configuration/http_config/http_client_config_builder.rs
b/core/common/src/types/configuration/http_config/http_client_config_builder.rs
index 097df0da5..029eaf6b8 100644
---
a/core/common/src/types/configuration/http_config/http_client_config_builder.rs
+++
b/core/common/src/types/configuration/http_config/http_client_config_builder.rs
@@ -45,6 +45,12 @@ impl HttpClientConfigBuilder {
self
}
+ /// Sets the JWT for A2A authentication.
+ pub fn with_jwt(mut self, token: String) -> Self {
+ self.config.jwt = Some(token);
+ self
+ }
+
/// Builds the `HttpClientConfig` instance.
pub fn build(self) -> HttpClientConfig {
self.config
diff --git a/core/configs/src/server_config/defaults.rs
b/core/configs/src/server_config/defaults.rs
index 27eae580d..7403196df 100644
--- a/core/configs/src/server_config/defaults.rs
+++ b/core/configs/src/server_config/defaults.rs
@@ -264,6 +264,7 @@ impl Default for HttpJwtConfig {
encoding_secret:
SERVER_CONFIG.http.jwt.encoding_secret.parse().unwrap(),
decoding_secret:
SERVER_CONFIG.http.jwt.decoding_secret.parse().unwrap(),
use_base64_secret: SERVER_CONFIG.http.jwt.use_base_64_secret,
+ trusted_issuers: None,
}
}
}
diff --git a/core/configs/src/server_config/http.rs
b/core/configs/src/server_config/http.rs
index b19b53cae..2e3f440a8 100644
--- a/core/configs/src/server_config/http.rs
+++ b/core/configs/src/server_config/http.rs
@@ -26,6 +26,15 @@ use serde::{Deserialize, Serialize};
use serde_with::DisplayFromStr;
use serde_with::serde_as;
+#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
+pub struct TrustedIssuerConfig {
+ pub issuer: String,
+ pub audience: String,
+ pub jwks_url: String,
+ #[serde(default)]
+ pub user_id: u32,
+}
+
#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
pub struct HttpConfig {
pub enabled: bool,
@@ -72,6 +81,8 @@ pub struct HttpJwtConfig {
#[config_env(secret)]
pub decoding_secret: String,
pub use_base64_secret: bool,
+ #[serde(default)]
+ pub trusted_issuers: Option<Vec<TrustedIssuerConfig>>,
}
#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs
index 2e4389faa..7d8e68f7d 100644
--- a/core/harness_derive/src/attrs.rs
+++ b/core/harness_derive/src/attrs.rs
@@ -57,6 +57,14 @@ pub struct IggyTestAttrs {
pub server: ServerAttrs,
pub seed_fn: Option<syn::Path>,
pub cluster_nodes: ClusterNodesValue,
+ pub jwks_server: Option<JwksAttrs>,
+}
+
+/// JWKS server attributes.
+#[derive(Debug, Default, Clone)]
+pub struct JwksAttrs {
+ pub enabled: bool,
+ pub store_path: Option<String>,
}
/// MCP configuration attributes.
@@ -82,6 +90,7 @@ impl IggyTestAttrs {
server: ServerAttrs::default(),
seed_fn: None,
cluster_nodes: ClusterNodesValue::None,
+ jwks_server: None,
}
}
}
@@ -173,6 +182,9 @@ pub struct ServerAttrs {
/// Dynamic config overrides using dot-notation paths.
pub config_overrides: Vec<ConfigOverride>,
+ /// Path to a TOML config file for the server.
+ pub config_path: Option<String>,
+
/// Special cases requiring custom codegen.
pub mcp: Option<McpAttrs>,
pub connectors_runtime: Option<ConnectorsRuntimeAttrs>,
@@ -248,6 +260,9 @@ impl Parse for IggyTestAttrs {
AttrItem::ClusterNodes(cluster) => {
attrs.cluster_nodes = cluster;
}
+ AttrItem::JwksServer(jwks) => {
+ attrs.jwks_server = Some(jwks);
+ }
}
}
@@ -264,6 +279,7 @@ enum AttrItem {
Server(Box<ServerAttrs>),
Seed(syn::Path),
ClusterNodes(ClusterNodesValue),
+ JwksServer(JwksAttrs),
}
impl Parse for AttrItem {
@@ -293,6 +309,12 @@ impl Parse for AttrItem {
let path: syn::Path = input.parse()?;
Ok(AttrItem::Seed(path))
}
+ "jwks_server" => {
+ let content;
+ parenthesized!(content in input);
+ let jwks = parse_jwks_attrs(&content)?;
+ Ok(AttrItem::JwksServer(jwks))
+ }
_ => Err(syn::Error::new(
ident.span(),
format!("unknown attribute: {ident_str}"),
@@ -413,6 +435,11 @@ fn parse_server_attrs(input: ParseStream) ->
syn::Result<ServerAttrs> {
input.parse::<Token![=]>()?;
server.websocket_tls = Some(parse_tls_value(input, span)?);
}
+ "config_path" => {
+ input.parse::<Token![=]>()?;
+ let lit: LitStr = input.parse()?;
+ server.config_path = Some(lit.value());
+ }
_ => {
input.parse::<Token![=]>()?;
let value = parse_config_value(input)?;
@@ -538,6 +565,34 @@ impl ArrayLiteral {
}
}
+fn parse_jwks_attrs(input: ParseStream) -> syn::Result<JwksAttrs> {
+ let mut attrs = JwksAttrs {
+ enabled: true,
+ ..Default::default()
+ };
+
+ let items: Punctuated<KeyValueAttrItem, Token![,]> =
Punctuated::parse_terminated(input)?;
+
+ for item in items {
+ match item.key.as_str() {
+ "enabled" => {
+ attrs.enabled = item.value.parse().map_err(|_| {
+ syn::Error::new(Span::call_site(), "enabled must be true
or false")
+ })?;
+ }
+ "store_path" => attrs.store_path = Some(item.value),
+ other => {
+ return Err(syn::Error::new(
+ Span::call_site(),
+ format!("unknown jwks_server attribute: {other}"),
+ ));
+ }
+ }
+ }
+
+ Ok(attrs)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/core/harness_derive/src/codegen.rs
b/core/harness_derive/src/codegen.rs
index 2abd54368..7f6e23a5e 100644
--- a/core/harness_derive/src/codegen.rs
+++ b/core/harness_derive/src/codegen.rs
@@ -392,12 +392,12 @@ fn generate_harness_setup(
quote! {
let __config_overrides: ::std::collections::HashMap<String,
String> =
[#(#config_entries),*].into_iter().collect();
- let __extra_envs =
::integration::harness::resolve_config_paths(&__config_overrides)
+ let mut __extra_envs =
::integration::harness::resolve_config_paths(&__config_overrides)
.unwrap_or_else(|e| panic!("invalid config path in
#[iggy_harness]:\n{}", e));
}
} else {
quote! {
- let __extra_envs = ::std::collections::HashMap::<String,
String>::new();
+ let mut __extra_envs = ::std::collections::HashMap::<String,
String>::new();
}
};
@@ -424,6 +424,15 @@ fn generate_harness_setup(
// Always add extra_envs (may be empty)
server_builder_calls.push(quote!(.extra_envs(__extra_envs)));
+ // If a config_path is specified, inject IGGY_CONFIG_PATH into extra_envs
+ let config_path_setup = if let Some(ref config_path) =
attrs.server.config_path {
+ quote! {
+ __extra_envs.insert("IGGY_CONFIG_PATH".to_string(),
#config_path.to_string());
+ }
+ } else {
+ quote!()
+ };
+
let server_config = quote! {
::integration::harness::TestServerConfig::builder()
#(#server_builder_calls)*
@@ -491,14 +500,32 @@ fn generate_harness_setup(
quote!()
};
+ let jwks_builder_call = if let Some(ref jwks) = attrs.jwks_server {
+ let enabled = jwks.enabled;
+ let store_path_call = match &jwks.store_path {
+ Some(p) => quote!(.store_path(#p)),
+ None => quote!(),
+ };
+ quote! {
+ .jwks(::integration::harness::JwksConfig::builder()
+ .enabled(#enabled)
+ #store_path_call
+ .build())
+ }
+ } else {
+ quote!()
+ };
+
quote! {
#config_resolution
+ #config_path_setup
let mut __harness = ::integration::harness::TestHarness::builder()
.server(#server_config)
.primary_client(#client_config)
#mcp_builder_call
#connectors_runtime_builder_call
#cluster_builder_call
+ #jwks_builder_call
.build()
.unwrap_or_else(|e| panic!("failed to build test harness: {e}"));
let _ = ::integration::__macro_support::TransportProtocol::#transport;
@@ -795,6 +822,7 @@ mod tests {
},
seed_fn: None,
cluster_nodes: crate::attrs::ClusterNodesValue::None,
+ jwks_server: None,
};
let variants = generate_variants(&attrs);
// 2 transports * 2 segment sizes * 2 cache modes = 8 variants
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index c8830b19f..9b5a993df 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -45,6 +45,7 @@ iggy-cli = { workspace = true }
iggy_binary_protocol = { workspace = true }
iggy_common = { workspace = true }
iggy_connector_sdk = { workspace = true, features = ["api"] }
+jsonwebtoken = { workspace = true }
keyring = { workspace = true }
lazy_static = { workspace = true }
libc = { workspace = true }
@@ -79,4 +80,5 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
twox-hash = { workspace = true }
uuid = { workspace = true }
+wiremock = "0.6"
zip = { workspace = true }
diff --git a/core/server/src/http/jwt/mod.rs
b/core/integration/src/harness/config/jwks.rs
similarity index 73%
copy from core/server/src/http/jwt/mod.rs
copy to core/integration/src/harness/config/jwks.rs
index 5481f110c..663213277 100644
--- a/core/server/src/http/jwt/mod.rs
+++ b/core/integration/src/harness/config/jwks.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -16,9 +17,12 @@
* under the License.
*/
-pub mod json_web_token;
-pub mod jwt_manager;
-pub mod middleware;
-pub mod storage;
+use bon::Builder;
-pub const COMPONENT: &str = "HTTP_JWT";
+#[derive(Debug, Clone, Builder, Default)]
+pub struct JwksConfig {
+ pub enabled: bool,
+ pub issuer_url: Option<String>,
+ #[builder(into)]
+ pub store_path: Option<String>,
+}
diff --git a/core/integration/src/harness/config/mod.rs
b/core/integration/src/harness/config/mod.rs
index cc6c0ba6a..c12b41029 100644
--- a/core/integration/src/harness/config/mod.rs
+++ b/core/integration/src/harness/config/mod.rs
@@ -20,6 +20,7 @@
mod client;
mod common;
mod connectors_runtime;
+mod jwks;
mod mcp;
mod resolve;
mod server;
@@ -27,6 +28,7 @@ mod server;
pub use client::{AutoLoginConfig, ClientConfig};
pub use common::{EncryptionConfig, IpAddrKind, TlsConfig};
pub use connectors_runtime::ConnectorsRuntimeConfig;
+pub use jwks::JwksConfig;
pub use mcp::McpConfig;
pub use resolve::resolve_config_paths;
pub use server::TestServerConfig;
diff --git a/core/integration/src/harness/handle/server.rs
b/core/integration/src/harness/handle/server.rs
index 2c64ac5c4..e39612800 100644
--- a/core/integration/src/harness/handle/server.rs
+++ b/core/integration/src/harness/handle/server.rs
@@ -216,7 +216,7 @@ impl ServerHandle {
for (key, value) in std::env::vars() {
if key.starts_with("IGGY_") && !PROTECTED_PREFIXES.iter().any(|p|
key.starts_with(p)) {
- self.envs.insert(key, value);
+ self.envs.entry(key).or_insert(value);
}
}
diff --git a/core/integration/src/harness/mod.rs
b/core/integration/src/harness/mod.rs
index 13793ab05..81d846728 100644
--- a/core/integration/src/harness/mod.rs
+++ b/core/integration/src/harness/mod.rs
@@ -55,7 +55,7 @@ mod traits;
pub use config::{
AutoLoginConfig, ClientConfig, ConnectorsRuntimeConfig, EncryptionConfig,
IpAddrKind,
- McpConfig, TestServerConfig, TlsConfig, resolve_config_paths,
+ JwksConfig, McpConfig, TestServerConfig, TlsConfig, resolve_config_paths,
};
pub use context::{TestContext, get_test_directory};
diff --git a/core/integration/src/harness/orchestrator/builder.rs
b/core/integration/src/harness/orchestrator/builder.rs
index 46afddd3b..3126fde63 100644
--- a/core/integration/src/harness/orchestrator/builder.rs
+++ b/core/integration/src/harness/orchestrator/builder.rs
@@ -19,8 +19,9 @@
use super::harness::TestHarness;
use crate::harness::config::{
- ClientConfig, ConnectorsRuntimeConfig, IpAddrKind, McpConfig,
TestServerConfig,
+ ClientConfig, ConnectorsRuntimeConfig, IpAddrKind, JwksConfig, McpConfig,
TestServerConfig,
};
+
use crate::harness::context::TestContext;
use crate::harness::error::TestBinaryError;
use crate::harness::handle::ServerHandle;
@@ -36,6 +37,7 @@ pub struct TestHarnessBuilder {
server_config: Option<TestServerConfig>,
mcp_config: Option<McpConfig>,
connectors_runtime_config: Option<ConnectorsRuntimeConfig>,
+ jwks_config: Option<JwksConfig>,
primary_transport: Option<iggy_common::TransportProtocol>,
primary_client_config: Option<ClientConfig>,
clients: Vec<ClientConfig>,
@@ -49,6 +51,7 @@ impl Default for TestHarnessBuilder {
test_name: None,
server_config: None,
mcp_config: None,
+ jwks_config: None,
connectors_runtime_config: None,
primary_transport: None,
primary_client_config: None,
@@ -102,6 +105,12 @@ impl TestHarnessBuilder {
self
}
+ /// Configure the JWKS server.
+ pub fn jwks(mut self, config: JwksConfig) -> Self {
+ self.jwks_config = Some(config);
+ self
+ }
+
/// Add a TCP client.
pub fn tcp_client(mut self) -> Self {
self.clients.push(ClientConfig::tcp());
@@ -225,6 +234,8 @@ impl TestHarnessBuilder {
client_configs: self.clients,
primary_transport,
primary_client_config: self.primary_client_config,
+ jwks_config: self.jwks_config,
+ jwks_server: None,
started: false,
})
}
diff --git a/core/integration/src/harness/orchestrator/harness.rs
b/core/integration/src/harness/orchestrator/harness.rs
index 6897b31b4..3018cb39f 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -18,7 +18,7 @@
*/
use super::builder::TestHarnessBuilder;
-use crate::harness::config::ClientConfig;
+use crate::harness::config::{ClientConfig, JwksConfig};
use crate::harness::context::TestContext;
use crate::harness::error::TestBinaryError;
use crate::harness::handle::{
@@ -31,6 +31,8 @@ use iggy::prelude::{ClientWrapper, IggyClient};
use iggy_common::TransportProtocol;
use std::path::Path;
use std::sync::Arc;
+use wiremock::matchers::{method, path};
+use wiremock::{Mock, MockServer, ResponseTemplate};
/// Collected logs from all binaries in the harness.
#[derive(Debug)]
@@ -46,6 +48,8 @@ pub struct TestHarness {
pub(super) client_configs: Vec<ClientConfig>,
pub(super) primary_transport: Option<TransportProtocol>,
pub(super) primary_client_config: Option<ClientConfig>,
+ pub(super) jwks_config: Option<JwksConfig>,
+ pub(super) jwks_server: Option<MockServer>,
pub(super) started: bool,
}
@@ -106,6 +110,44 @@ impl TestHarness {
return Err(TestBinaryError::AlreadyStarted);
}
+ if let Some(jwks_config) = &self.jwks_config
+ && jwks_config.enabled
+ {
+ let mock_server = MockServer::start().await;
+
+ if let Some(store_path) = &jwks_config.store_path {
+ let content = std::fs::read_to_string(store_path).map_err(|e| {
+ TestBinaryError::InvalidState {
+ message: format!("Failed to read JWKS file at {}: {}",
store_path, e),
+ }
+ })?;
+
+ Mock::given(method("GET"))
+ .and(path("/.well-known/jwks.json"))
+
.respond_with(ResponseTemplate::new(200).set_body_string(content))
+ .mount(&mock_server)
+ .await;
+ }
+
+ let jwks_url = format!("{}/.well-known/jwks.json",
mock_server.uri());
+ let issuer = jwks_config
+ .issuer_url
+ .as_deref()
+ .unwrap_or("https://test-issuer.com");
+
+ for server in &mut self.servers {
+ server.add_env("IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_ISSUER",
issuer);
+ server.add_env(
+ "IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_JWKS_URL",
+ jwks_url.as_str(),
+ );
+ server.add_env("IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_AUDIENCE",
"iggy");
+ server.add_env("IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_USER_ID", "1");
+ }
+
+ self.jwks_server = Some(mock_server);
+ }
+
for server in &mut self.servers {
server.start()?;
}
diff --git a/core/integration/tests/server/a2a_jwt/config.toml
b/core/integration/tests/server/a2a_jwt/config.toml
new file mode 100644
index 000000000..5c08041ec
--- /dev/null
+++ b/core/integration/tests/server/a2a_jwt/config.toml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[http]
+enabled = true
+address = "127.0.0.1:0"
+
+[http.jwt]
+enabled = true
+issuer = "iggy"
+audience = "iggy"
+access_token_expiry = "1h"
+refresh_token_expiry = "30d"
+clock_skew = "5s"
+
+# Configure trusted A2A issuer
+[[http.jwt.trusted_issuers]]
+issuer = "https://test-issuer.com"
+audience = "iggy"
+jwks_url = "http://localhost:8080/.well-known/jwks.json"
+user_id = 1
+
+[system]
+path = "local_data"
+
+[system.database]
+type = "file"
+path = "local_data/db"
+
+[system.message_bus]
+type = "in_memory"
+
+[system.streams]
+path = "local_data/streams"
+
+[system.topics]
+path = "local_data/topics"
+
+[system.partitions]
+path = "local_data/partitions"
+
+[system.segments]
+path = "local_data/segments"
+
+[system.users]
+path = "local_data/users"
+
+[system.personal_access_tokens]
+path = "local_data/pats"
+
+[system.consumer_groups]
+path = "local_data/consumer_groups"
diff --git a/core/integration/tests/server/a2a_jwt/jwt_tests.rs
b/core/integration/tests/server/a2a_jwt/jwt_tests.rs
new file mode 100644
index 000000000..0e152e9a7
--- /dev/null
+++ b/core/integration/tests/server/a2a_jwt/jwt_tests.rs
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy::prelude::{GlobalPermissions, IggyClientBuilder, Permissions,
UserStatus};
+use iggy_common::{StreamClient, UserClient};
+use integration::iggy_harness;
+use jsonwebtoken::{Algorithm, EncodingKey, Header, encode};
+use serde::{Deserialize, Serialize};
+use server::http::jwt::json_web_token::Audience;
+
+const TEST_ISSUER: &str = "https://test-issuer.com";
+const TEST_AUDIENCE: &str = "iggy";
+const TEST_KEY_ID: &str = "iggy-jwt-key-1";
+const TEST_PRIVATE_KEY: &[u8] =
include_bytes!("../../../../certs/iggy_key.pem");
+
+/// Seed function to create the A2A user with proper permissions
+async fn seed_a2a_user(
+ client: &iggy::prelude::IggyClient,
+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+ // Create a user that will be used for A2A JWT authentication.
+ // The first user created after root will have user_id = 1.
+ // Grant read_streams permission so the user can call get_streams().
+ let permissions = Permissions {
+ global: GlobalPermissions {
+ read_streams: true,
+ ..GlobalPermissions::default()
+ },
+ streams: None,
+ };
+
+ match client
+ .create_user(
+ "a2a-test-user",
+ "a2a-test-password",
+ UserStatus::Active,
+ Some(permissions),
+ )
+ .await
+ {
+ Ok(user) => {
+ println!("A2A user created successfully with ID: {}", user.id);
+ }
+ Err(e) => {
+ println!(
+ "Note: Could not create A2A user (may already exist): {:?}",
+ e
+ );
+ }
+ }
+ Ok(())
+}
+
+/// Test claims structure for JWT tokens
+/// Supports both single string and array audience per RFC 7519
+#[derive(Debug, Serialize, Deserialize)]
+struct TestClaims {
+ jti: String,
+ iss: String,
+ aud: Audience,
+ sub: String,
+ exp: u64,
+ iat: u64,
+ nbf: u64,
+}
+
+/// Get current timestamp in seconds since Unix epoch
+fn now_timestamp() -> u64 {
+ std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs()
+}
+
+/// Creates a valid JWT token with specified expiration time
+fn create_valid_jwt(exp_seconds: u64) -> String {
+ let now = now_timestamp();
+ let claims = TestClaims {
+ jti: uuid::Uuid::now_v7().to_string(),
+ iss: TEST_ISSUER.to_string(),
+ aud: Audience::from(TEST_AUDIENCE),
+ sub: "external-a2a-user-123".to_string(),
+ exp: now + exp_seconds,
+ iat: now,
+ nbf: now,
+ };
+
+ let mut header = Header::new(Algorithm::RS256);
+ header.kid = Some(TEST_KEY_ID.to_string());
+ let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap();
+
+ encode(&header, &claims, &encoding_key).unwrap()
+}
+
+/// Creates a valid JWT token with audience as array
+fn create_valid_jwt_with_array_aud(exp_seconds: u64) -> String {
+ let now = now_timestamp();
+ let claims = TestClaims {
+ jti: uuid::Uuid::now_v7().to_string(),
+ iss: TEST_ISSUER.to_string(),
+ aud: Audience::from(vec![
+ "some-other-service".to_string(),
+ TEST_AUDIENCE.to_string(),
+ "another-service".to_string(),
+ ]),
+ sub: "external-a2a-user-123".to_string(),
+ exp: now + exp_seconds,
+ iat: now,
+ nbf: now,
+ };
+
+ let mut header = Header::new(Algorithm::RS256);
+ header.kid = Some(TEST_KEY_ID.to_string());
+ let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap();
+
+ encode(&header, &claims, &encoding_key).unwrap()
+}
+
+/// Creates an expired JWT token (expired 1 hour ago)
+fn create_expired_jwt() -> String {
+ let now = now_timestamp();
+ let claims = TestClaims {
+ jti: uuid::Uuid::now_v7().to_string(),
+ iss: TEST_ISSUER.to_string(),
+ aud: Audience::from(TEST_AUDIENCE),
+ sub: "external-a2a-user-123".to_string(),
+ exp: now.saturating_sub(3600),
+ iat: now.saturating_sub(7200),
+ nbf: now.saturating_sub(7200),
+ };
+
+ let mut header = Header::new(Algorithm::RS256);
+ header.kid = Some(TEST_KEY_ID.to_string());
+ let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap();
+
+ encode(&header, &claims, &encoding_key).unwrap()
+}
+
+/// Creates a JWT token with unknown issuer
+fn create_unknown_issuer_jwt() -> String {
+ let now = now_timestamp();
+ let claims = TestClaims {
+ jti: uuid::Uuid::now_v7().to_string(),
+ iss: "https://unknown-issuer.com".to_string(),
+ aud: Audience::from(TEST_AUDIENCE),
+ sub: "external-a2a-user-123".to_string(),
+ exp: now + 3600,
+ iat: now,
+ nbf: now,
+ };
+
+ let mut header = Header::new(Algorithm::RS256);
+ header.kid = Some(TEST_KEY_ID.to_string());
+ let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap();
+
+ encode(&header, &claims, &encoding_key).unwrap()
+}
+
+/// Create an IggyClient with the provided JWT token
+async fn create_client_with_jwt(http_addr: &str, token: String) ->
iggy::prelude::IggyClient {
+ IggyClientBuilder::new()
+ .with_http()
+ .with_api_url(format!("http://{}", http_addr))
+ .with_jwt(token)
+ .build()
+ .expect("failed to build client")
+}
+
+/// Test that valid A2A JWT token allows access to API
+#[iggy_harness(
+ server(config_path = "tests/server/a2a_jwt/config.toml"),
+ jwks_server(store_path =
"tests/server/a2a_jwt/wiremock/__files/jwks.json"),
+ seed = seed_a2a_user
+)]
+async fn test_a2a_jwt_valid_token(harness: &TestHarness) {
+ let server = harness
+ .all_servers()
+ .first()
+ .expect("server should be available");
+ let http_addr = server
+ .http_addr()
+ .expect("http address should be available");
+
+ let token = create_valid_jwt(3600);
+ let client = create_client_with_jwt(&http_addr.to_string(), token).await;
+
+ // get_streams() should succeed with valid JWT token
+ let result = client.get_streams().await;
+ assert!(result.is_ok(), "Expected Ok, got {:?}", result);
+}
+
+/// Test that valid A2A JWT token with array audience allows access to API
+#[iggy_harness(
+ server(config_path = "tests/server/a2a_jwt/config.toml"),
+ jwks_server(store_path =
"tests/server/a2a_jwt/wiremock/__files/jwks.json"),
+ seed = seed_a2a_user
+)]
+async fn test_a2a_jwt_array_audience(harness: &TestHarness) {
+ let server = harness
+ .all_servers()
+ .first()
+ .expect("server should be available");
+ let http_addr = server
+ .http_addr()
+ .expect("http address should be available");
+
+ let token = create_valid_jwt_with_array_aud(3600);
+ let client = create_client_with_jwt(&http_addr.to_string(), token).await;
+
+ // get_streams() should succeed with valid JWT token
+ let result = client.get_streams().await;
+ assert!(result.is_ok(), "Expected Ok, got {:?}", result);
+}
+
+/// Test that expired A2A JWT token is rejected
+#[iggy_harness(
+ server(config_path = "tests/server/a2a_jwt/config.toml"),
+ jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json")
+)]
+async fn test_a2a_jwt_expired_token(harness: &TestHarness) {
+ let server = harness
+ .all_servers()
+ .first()
+ .expect("server should be available");
+ let http_addr = server
+ .http_addr()
+ .expect("http address should be available");
+
+ let token = create_expired_jwt();
+ let client = create_client_with_jwt(&http_addr.to_string(), token).await;
+
+ // get_streams() should fail with Unauthenticated error
+ let result = client.get_streams().await;
+ assert!(
+ result.is_err(),
+ "Expected Unauthenticated error, got {:?}",
+ result
+ );
+ let err = result.unwrap_err();
+ assert_eq!(
+ err.as_code(),
+ iggy::prelude::IggyError::Unauthenticated.as_code(),
+ "Expected Unauthenticated error, got {:?}",
+ err
+ );
+}
+
+/// Test that JWT token with unknown issuer is rejected
+#[iggy_harness(
+ server(config_path = "tests/server/a2a_jwt/config.toml"),
+ jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json")
+)]
+async fn test_a2a_jwt_unknown_issuer(harness: &TestHarness) {
+ let server = harness
+ .all_servers()
+ .first()
+ .expect("server should be available");
+ let http_addr = server
+ .http_addr()
+ .expect("http address should be available");
+
+ let token = create_unknown_issuer_jwt();
+ let client = create_client_with_jwt(&http_addr.to_string(), token).await;
+
+ // get_streams() should fail with Unauthenticated error
+ let result = client.get_streams().await;
+ assert!(
+ result.is_err(),
+ "Expected Unauthenticated error, got {:?}",
+ result
+ );
+ let err = result.unwrap_err();
+ assert_eq!(
+ err.as_code(),
+ iggy::prelude::IggyError::Unauthenticated.as_code(),
+ "Expected Unauthenticated error, got {:?}",
+ err
+ );
+}
+
+/// Test that missing JWT token results in authentication failure
+#[iggy_harness(
+ server(config_path = "tests/server/a2a_jwt/config.toml"),
+ jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json")
+)]
+async fn test_a2a_jwt_missing_token(harness: &TestHarness) {
+ let server = harness
+ .all_servers()
+ .first()
+ .expect("server should be available");
+ let http_addr = server
+ .http_addr()
+ .expect("http address should be available");
+
+ // Create client without JWT token
+ let client = IggyClientBuilder::new()
+ .with_http()
+ .with_api_url(format!("http://{}", http_addr))
+ .build()
+ .expect("failed to build client");
+
+ // get_streams() should fail with Unauthenticated error
+ let result = client.get_streams().await;
+ assert!(
+ result.is_err(),
+ "Expected Unauthenticated error, got {:?}",
+ result
+ );
+ let err = result.unwrap_err();
+ assert_eq!(
+ err.as_code(),
+ iggy::prelude::IggyError::Unauthenticated.as_code(),
+ "Expected Unauthenticated error, got {:?}",
+ err
+ );
+}
diff --git a/core/server/src/http/jwt/mod.rs
b/core/integration/tests/server/a2a_jwt/mod.rs
similarity index 80%
copy from core/server/src/http/jwt/mod.rs
copy to core/integration/tests/server/a2a_jwt/mod.rs
index 5481f110c..46392e47e 100644
--- a/core/server/src/http/jwt/mod.rs
+++ b/core/integration/tests/server/a2a_jwt/mod.rs
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -16,9 +17,4 @@
* under the License.
*/
-pub mod json_web_token;
-pub mod jwt_manager;
-pub mod middleware;
-pub mod storage;
-
-pub const COMPONENT: &str = "HTTP_JWT";
+mod jwt_tests;
diff --git a/core/integration/tests/server/a2a_jwt/wiremock/__files/jwks.json
b/core/integration/tests/server/a2a_jwt/wiremock/__files/jwks.json
new file mode 100644
index 000000000..ede4eba57
--- /dev/null
+++ b/core/integration/tests/server/a2a_jwt/wiremock/__files/jwks.json
@@ -0,0 +1,10 @@
+{
+ "keys": [
+ {
+ "kty": "RSA",
+ "kid": "iggy-jwt-key-1",
+ "n":
"2cuC8EotFJdBYkwg78EzQNIejpKcI_5EB2yNMskZK37KWQmyXtYBUS0NeSg3G7IxraFOq1RqibWzN7SF5GBwaReWefHqE2zbfLLJD_fZIiHzVE8fREzGLtAYIj1U8sP5pEWJYMhEKK35ARth8iRLuTbGQiKkmyiqkEsnIUwzbPr4oRrMltTQCPHHDzWds5IVIZIwPt0hWCeh34bKkwEqHOWx6-fkxNwQnVU9YCfBIuwVR5nUytQE6LZuvCsPvJjv1gDtyBm_K1MDf1-7BrCmTsDw0ACB2CoSwPZM4A1jHZo4unWdfsuyph5FzzCK4Lf5jdsxQD6W8bqQbATibKBqsw",
+ "e": "AQAB"
+ }
+ ]
+}
diff --git a/core/integration/tests/server/a2a_jwt/wiremock/mappings/jwks.json
b/core/integration/tests/server/a2a_jwt/wiremock/mappings/jwks.json
new file mode 100644
index 000000000..c6009b9e3
--- /dev/null
+++ b/core/integration/tests/server/a2a_jwt/wiremock/mappings/jwks.json
@@ -0,0 +1,13 @@
+{
+ "request": {
+ "method": "GET",
+ "url": "/.well-known/jwks.json"
+ },
+ "response": {
+ "status": 200,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ "bodyFileName": "jwks.json"
+ }
+}
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index 6ea1338af..546f95f15 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+mod a2a_jwt;
mod cg;
mod concurrent_addition;
mod general;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index d7ad2c287..bb89dd70a 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.10.0"
+version = "0.10.1"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs
index 694351815..87257363c 100644
--- a/core/sdk/src/client_provider.rs
+++ b/core/sdk/src/client_provider.rs
@@ -127,6 +127,7 @@ impl ClientProviderConfig {
config.http = Some(Arc::new(HttpClientConfig {
api_url: args.http_api_url,
retries: args.http_retries,
+ jwt: None,
}));
}
TransportProtocol::Tcp => {
diff --git a/core/sdk/src/clients/client_builder.rs
b/core/sdk/src/clients/client_builder.rs
index 9600f6b38..4f07c75ab 100644
--- a/core/sdk/src/clients/client_builder.rs
+++ b/core/sdk/src/clients/client_builder.rs
@@ -308,6 +308,12 @@ impl HttpClientBuilder {
self
}
+ /// Sets the JWT for A2A (Agent-to-Agent) authentication.
+ pub fn with_jwt(mut self, token: String) -> Self {
+ self.config = self.config.with_jwt(token);
+ self
+ }
+
/// Builds the parent `IggyClient` with HTTP configuration.
pub fn build(self) -> Result<IggyClient, IggyError> {
let client = HttpClient::create(Arc::new(self.config.build()))?;
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 17d2c425b..2b9177a5e 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -569,6 +569,9 @@ impl IggyConsumer {
DiagnosticEvent::Connected => {
trace!("Connected to the server");
joined_consumer_group.store(false, ORDERING);
+ if !is_consumer_group {
+ can_poll.store(true, ORDERING);
+ }
if disconnected {
reconnected = true;
disconnected = false;
diff --git a/core/sdk/src/http/http_client.rs b/core/sdk/src/http/http_client.rs
index a6ddcaa65..bdc093dcf 100644
--- a/core/sdk/src/http/http_client.rs
+++ b/core/sdk/src/http/http_client.rs
@@ -277,11 +277,13 @@ impl HttpClient {
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
+ let access_token = config.jwt.clone().unwrap_or_default();
+
Ok(Self {
api_url,
client,
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
- access_token: IggyRwLock::new("".to_string()),
+ access_token: IggyRwLock::new(access_token),
events: broadcast(1000),
})
}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 4254c5788..3125c43b3 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -88,6 +88,7 @@ rustls-pemfile = { workspace = true }
secrecy = { workspace = true }
send_wrapper = { workspace = true }
serde = { workspace = true }
+serde_json = { workspace = true }
slab = { workspace = true }
socket2 = { workspace = true }
strum = { workspace = true }
diff --git a/core/server/config.toml b/core/server/config.toml
index b3be68e58..367d5f995 100644
--- a/core/server/config.toml
+++ b/core/server/config.toml
@@ -126,6 +126,12 @@ decoding_secret = ""
# `false` means the secret is in plain text.
use_base64_secret = false
+# Trusted issuers for A2A (Application-to-Application) authentication
+[[http.jwt.trusted_issuers]]
+issuer = "test-issuer"
+jwks_url = "http://127.0.0.1:8081/.well-known/jwks.json"
+audience = "iggy.apache.org"
+
# Metrics configuration for HTTP.
[http.metrics]
# Enable or disable the metrics endpoint.
diff --git a/core/server/src/http/jwt/json_web_token.rs
b/core/server/src/http/jwt/json_web_token.rs
index 93862c77e..e0a64674e 100644
--- a/core/server/src/http/jwt/json_web_token.rs
+++ b/core/server/src/http/jwt/json_web_token.rs
@@ -17,8 +17,9 @@
*/
use iggy_common::UserId;
-use serde::{Deserialize, Serialize};
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::net::SocketAddr;
+use std::{fmt, fmt::Display};
#[derive(Debug, Clone)]
pub struct Identity {
@@ -28,12 +29,114 @@ pub struct Identity {
pub ip_address: SocketAddr,
}
+#[derive(Debug, Clone)]
+pub enum Audience {
+ Single(String),
+ Multiple(Vec<String>),
+}
+
+impl Audience {
+ pub fn contains(&self, audience: &str) -> bool {
+ match self {
+ Audience::Single(aud) => aud == audience,
+ Audience::Multiple(auds) => auds.iter().any(|a| a == audience),
+ }
+ }
+}
+
+impl Display for Audience {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Audience::Single(aud) => f.write_str(aud),
+ Audience::Multiple(auds) => f.write_str(&auds.join(",")),
+ }
+ }
+}
+
+impl From<String> for Audience {
+ fn from(aud: String) -> Self {
+ Audience::Single(aud)
+ }
+}
+
+impl From<&str> for Audience {
+ fn from(aud: &str) -> Self {
+ Audience::Single(aud.to_string())
+ }
+}
+
+impl From<Vec<String>> for Audience {
+ fn from(auds: Vec<String>) -> Self {
+ if auds.len() == 1 {
+ Audience::Single(auds.into_iter().next().unwrap())
+ } else {
+ Audience::Multiple(auds)
+ }
+ }
+}
+
+impl Serialize for Audience {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ match self {
+ Audience::Single(aud) => serializer.serialize_str(aud),
+ Audience::Multiple(auds) => auds.serialize(serializer),
+ }
+ }
+}
+
+impl<'de> Deserialize<'de> for Audience {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ struct AudienceVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for AudienceVisitor {
+ type Value = Audience;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) ->
std::fmt::Result {
+ formatter.write_str("a string or an array of strings")
+ }
+
+ fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(Audience::Single(value.to_string()))
+ }
+
+ fn visit_string<E>(self, value: String) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(Audience::Single(value))
+ }
+
+ fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
+ where
+ A: serde::de::SeqAccess<'de>,
+ {
+ let mut auds = Vec::new();
+ while let Some(aud) = seq.next_element::<String>()? {
+ auds.push(aud);
+ }
+ Ok(Audience::Multiple(auds))
+ }
+ }
+
+ deserializer.deserialize_any(AudienceVisitor)
+ }
+}
+
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct JwtClaims {
pub jti: String,
pub iss: String,
- pub aud: String,
- pub sub: u32,
+ pub aud: Audience,
+ pub sub: String,
pub iat: u64,
pub exp: u64,
pub nbf: u64,
diff --git a/core/server/src/http/jwt/jwks.rs b/core/server/src/http/jwt/jwks.rs
new file mode 100644
index 000000000..db7a9a46a
--- /dev/null
+++ b/core/server/src/http/jwt/jwks.rs
@@ -0,0 +1,316 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dashmap::DashMap;
+use iggy_common::IggyError;
+use jsonwebtoken::DecodingKey;
+use serde::Deserialize;
+use std::hash::Hash;
+use std::sync::OnceLock;
+use strum::{Display, EnumString};
+
+static HTTP_CLIENT: OnceLock<cyper::Client> = OnceLock::new();
+
+fn get_http_client() -> &'static cyper::Client {
+ HTTP_CLIENT.get_or_init(cyper::Client::new)
+}
+
+/// JWK key type enumeration
+#[derive(Debug, Clone, Copy, Display, EnumString, Deserialize, PartialEq, Eq)]
+#[strum(serialize_all = "UPPERCASE")]
+#[serde(rename_all = "UPPERCASE")]
+enum JwkKeyType {
+ /// RSA key type
+ #[strum(serialize = "RSA")]
+ Rsa,
+ /// EC (Elliptic Curve) key type
+ #[strum(serialize = "EC")]
+ Ec,
+}
+
+/// EC curve type enumeration
+#[derive(Debug, Clone, Copy, Display, EnumString, Deserialize, PartialEq, Eq)]
+#[strum(serialize_all = "UPPERCASE")]
+#[serde(rename_all = "UPPERCASE")]
+enum EcCurve {
+ /// P-256 curve
+ #[strum(serialize = "P-256")]
+ P256,
+ /// P-384 curve
+ #[strum(serialize = "P-384")]
+ P384,
+ /// P-521 curve
+ #[strum(serialize = "P-521")]
+ P521,
+}
+
+#[derive(Debug, Deserialize)]
+struct Jwk {
+ kty: JwkKeyType,
+ kid: Option<String>,
+ n: Option<String>,
+ e: Option<String>,
+ x: Option<String>,
+ y: Option<String>,
+ crv: Option<String>,
+}
+
+#[derive(Debug, Deserialize)]
+struct JwkSet {
+ keys: Vec<Jwk>,
+}
+
+#[derive(Debug, Clone, Hash, Eq, PartialEq)]
+struct CacheKey {
+ issuer: String,
+ kid: String,
+}
+
+#[derive(Debug, Clone)]
+pub struct JwksClient {
+ cache: DashMap<CacheKey, DecodingKey>,
+}
+
+impl Default for JwksClient {
+ fn default() -> Self {
+ Self {
+ cache: DashMap::new(),
+ }
+ }
+}
+
+impl JwksClient {
+ pub async fn get_key(&self, issuer: &str, jwks_url: &str, kid: &str) ->
Option<DecodingKey> {
+ let cache_key = CacheKey {
+ issuer: issuer.to_string(),
+ kid: kid.to_string(),
+ };
+
+ // try to get from cache first
+ if let Some(key) = self.cache.get(&cache_key) {
+ return Some(key.clone());
+ }
+
+ // fetch and cache if not found
+ if let Ok(key) = self.fetch_and_cache_key(issuer, jwks_url, kid).await
{
+ return Some(key);
+ }
+
+ None
+ }
+
+ async fn fetch_and_cache_key(
+ &self,
+ issuer: &str,
+ jwks_url: &str,
+ kid: &str,
+ ) -> Result<DecodingKey, IggyError> {
+ if let Err(e) = self.refresh_keys(issuer, jwks_url).await {
+ return Err(IggyError::CannotFetchJwks(format!(
+ "Failed to refresh keys: {}",
+ e
+ )));
+ }
+
+ let cache_key = CacheKey {
+ issuer: issuer.to_string(),
+ kid: kid.to_string(),
+ };
+
+ self.cache
+ .get(&cache_key)
+ .map(|entry| entry.clone())
+ .ok_or(IggyError::InvalidAccessToken)
+ }
+
+ async fn refresh_keys(&self, issuer: &str, jwks_url: &str) -> Result<(),
IggyError> {
+ let client = get_http_client();
+ let request = client
+ .get(jwks_url)
+ .map_err(|e| IggyError::CannotFetchJwks(format!("Failed to build
request: {}", e)))?
+ .build();
+ let response = client
+ .execute(request)
+ .await
+ .map_err(|e| IggyError::CannotFetchJwks(format!("HTTP request
failed: {}", e)))?;
+
+ let body = response.text().await.map_err(|e| {
+ IggyError::CannotFetchJwks(format!("Failed to read response body:
{}", e))
+ })?;
+
+ let jwks: JwkSet = serde_json::from_str(&body)
+ .map_err(|e| IggyError::CannotFetchJwks(format!("Failed to parse
JWKS: {}", e)))?;
+
+ // Collect all current kids from the JWKS response
+ let current_kids: std::collections::HashSet<String> =
+ jwks.keys.iter().filter_map(|key| key.kid.clone()).collect();
+
+ // Remove cached keys for this issuer that are no longer in the JWKS
response
+ // Security fix: Clean up revoked/rotated keys to prevent accepting
tokens signed with old keys
+ let keys_to_remove: Vec<CacheKey> = self
+ .cache
+ .iter()
+ .filter(|entry| {
+ entry.key().issuer == issuer &&
!current_kids.contains(&entry.key().kid)
+ })
+ .map(|entry| entry.key().clone())
+ .collect();
+
+ for key in keys_to_remove {
+ self.cache.remove(&key);
+ }
+
+ for key in jwks.keys {
+ if let Some(kid) = key.kid {
+ let decoding_key: DecodingKey = match key.kty {
+ JwkKeyType::Rsa => {
+ if let (Some(n), Some(e)) = (key.n.as_deref(),
key.e.as_deref()) {
+ DecodingKey::from_rsa_components(n, e).map_err(|e|
{
+ IggyError::CannotFetchJwks(format!("Invalid
RSA key: {}", e))
+ })?
+ } else {
+ continue;
+ }
+ }
+ JwkKeyType::Ec => {
+ if let (Some(x), Some(y), Some(crv_str)) =
+ (key.x.as_deref(), key.y.as_deref(),
key.crv.as_deref())
+ {
+ if let Ok(_curve) = crv_str.parse::<EcCurve>() {
+ DecodingKey::from_ec_components(x,
y).map_err(|e| {
+
IggyError::CannotFetchJwks(format!("Invalid EC key: {}", e))
+ })?
+ } else {
+ continue;
+ }
+ } else {
+ continue;
+ }
+ }
+ };
+
+ let cache_key = CacheKey {
+ issuer: issuer.to_string(),
+ kid,
+ };
+ self.cache.insert(cache_key, decoding_key);
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use jsonwebtoken::DecodingKey;
+
+ const TEST_ISSUER: &str = "https://test-issuer.com";
+ const TEST_KID: &str = "test-key";
+
+ fn create_test_decoding_key() -> DecodingKey {
+ // Use HMAC secret to create a simple test DecodingKey
+ // Note: This is only for testing cache logic, not a real RSA/EC key
+ DecodingKey::from_secret(b"test-secret-key-for-cache-testing-only")
+ }
+
+ #[test]
+ fn test_cache_key_equality() {
+ let key1 = CacheKey {
+ issuer: TEST_ISSUER.to_string(),
+ kid: TEST_KID.to_string(),
+ };
+ let key2 = CacheKey {
+ issuer: TEST_ISSUER.to_string(),
+ kid: TEST_KID.to_string(),
+ };
+ assert_eq!(key1, key2);
+ }
+
+ #[test]
+ fn test_cache_key_different_issuer() {
+ let key1 = CacheKey {
+ issuer: "issuer1".to_string(),
+ kid: TEST_KID.to_string(),
+ };
+ let key2 = CacheKey {
+ issuer: "issuer2".to_string(),
+ kid: TEST_KID.to_string(),
+ };
+ assert_ne!(key1, key2);
+ }
+
+ #[test]
+ fn test_cache_key_different_kid() {
+ let key1 = CacheKey {
+ issuer: TEST_ISSUER.to_string(),
+ kid: "kid1".to_string(),
+ };
+ let key2 = CacheKey {
+ issuer: TEST_ISSUER.to_string(),
+ kid: "kid2".to_string(),
+ };
+ assert_ne!(key1, key2);
+ }
+
+ #[test]
+ fn test_jwks_client_default() {
+ let client = JwksClient::default();
+ assert!(client.cache.is_empty());
+ }
+
+ #[test]
+ fn test_cache_insert_and_get() {
+ let client = JwksClient::default();
+ let cache_key = CacheKey {
+ issuer: TEST_ISSUER.to_string(),
+ kid: TEST_KID.to_string(),
+ };
+ let decoding_key = create_test_decoding_key();
+
+ client.cache.insert(cache_key.clone(), decoding_key.clone());
+
+ let cached = client.cache.get(&cache_key);
+ assert!(cached.is_some());
+ }
+
+ #[test]
+ fn test_cache_multiple_keys() {
+ let client = JwksClient::default();
+
+ let key1 = CacheKey {
+ issuer: "issuer1".to_string(),
+ kid: "kid1".to_string(),
+ };
+ let key2 = CacheKey {
+ issuer: "issuer2".to_string(),
+ kid: "kid2".to_string(),
+ };
+
+ let decoding_key1 = create_test_decoding_key();
+ let decoding_key2 = create_test_decoding_key();
+
+ client.cache.insert(key1.clone(), decoding_key1);
+ client.cache.insert(key2.clone(), decoding_key2);
+
+ assert_eq!(client.cache.len(), 2);
+ assert!(client.cache.get(&key1).is_some());
+ assert!(client.cache.get(&key2).is_some());
+ }
+}
diff --git a/core/server/src/http/jwt/jwt_manager.rs
b/core/server/src/http/jwt/jwt_manager.rs
index 8980a1389..77a181309 100644
--- a/core/server/src/http/jwt/jwt_manager.rs
+++ b/core/server/src/http/jwt/jwt_manager.rs
@@ -16,9 +16,10 @@
* under the License.
*/
-use crate::configs::http::HttpJwtConfig;
+use crate::configs::http::{HttpJwtConfig, TrustedIssuerConfig};
use crate::http::jwt::COMPONENT;
-use crate::http::jwt::json_web_token::{GeneratedToken, JwtClaims,
RevokedAccessToken};
+use crate::http::jwt::json_web_token::{Audience, GeneratedToken, JwtClaims,
RevokedAccessToken};
+use crate::http::jwt::jwks::JwksClient;
use crate::http::jwt::storage::TokenStorage;
use crate::streaming::persistence::persister::PersisterKind;
use ahash::AHashMap;
@@ -31,6 +32,7 @@ use iggy_common::UserId;
use iggy_common::locking::IggyRwLock;
use iggy_common::locking::IggyRwLockFn;
use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, TokenData,
Validation, encode};
+use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, error, info};
@@ -56,6 +58,8 @@ pub struct JwtManager {
tokens_storage: TokenStorage,
revoked_tokens: IggyRwLock<AHashMap<String, u64>>,
validations: AHashMap<Algorithm, Validation>,
+ jwks_client: JwksClient,
+ trusted_issuer: HashMap<String, TrustedIssuerConfig>,
}
impl JwtManager {
@@ -78,6 +82,8 @@ impl JwtManager {
validator,
tokens_storage: TokenStorage::new(persister, path),
revoked_tokens: IggyRwLock::new(AHashMap::new()),
+ jwks_client: JwksClient::default(),
+ trusted_issuer: HashMap::new(),
})
}
@@ -105,7 +111,18 @@ impl JwtManager {
format!("{COMPONENT} (error: {e}) - failed to get decoding
key")
})?,
};
- JwtManager::new(persister, path, issuer, validator)
+ let mut manager = JwtManager::new(persister, path, issuer, validator)?;
+
+ if let Some(trusted_issuers) = config.trusted_issuers.as_ref() {
+ for issuer_config in trusted_issuers {
+ let normalized_issuer =
normalize_issuer_url(&issuer_config.issuer);
+ manager
+ .trusted_issuer
+ .insert(normalized_issuer, issuer_config.clone());
+ }
+ }
+
+ Ok(manager)
}
fn create_validation(
@@ -181,8 +198,8 @@ impl JwtManager {
let nbf = iat + self.issuer.not_before.as_secs() as u64;
let claims = JwtClaims {
jti: uuid::Uuid::now_v7().to_string(),
- sub: user_id,
- aud: self.issuer.audience.to_string(),
+ sub: user_id.to_string(),
+ aud: Audience::from(self.issuer.audience.clone()),
iss: self.issuer.issuer.to_string(),
iat,
exp,
@@ -210,7 +227,19 @@ impl JwtManager {
let token_header =
jsonwebtoken::decode_header(token).map_err(|_|
IggyError::InvalidAccessToken)?;
- let jwt_claims = self.decode(token, token_header.alg)?;
+ let jwt_claims = self.decode(token, token_header.alg).await?;
+
+ // Security fix: Reject A2A tokens from external trusted issuers
+ // A2A tokens should not be refreshable - they have their own lifecycle
+ let normalized_iss = normalize_issuer_url(&jwt_claims.claims.iss);
+ if self.trusted_issuer.contains_key(&normalized_iss) {
+ error!(
+ "Cannot refresh A2A token from external issuer: {}",
+ jwt_claims.claims.iss
+ );
+ return Err(IggyError::InvalidAccessToken);
+ }
+
let id = jwt_claims.claims.jti;
let expiry = jwt_claims.claims.exp;
if self
@@ -232,26 +261,95 @@ impl JwtManager {
.error(|e: &IggyError| {
format!("{COMPONENT} (error: {e}) - failed to save revoked
access token: {id}")
})?;
- self.generate(jwt_claims.claims.sub)
+ let user_id = jwt_claims
+ .claims
+ .sub
+ .parse::<u32>()
+ .map_err(|_| IggyError::InvalidAccessToken)?;
+ self.generate(user_id)
}
- pub fn decode(
+ pub async fn decode(
&self,
token: &str,
algorithm: Algorithm,
) -> Result<TokenData<JwtClaims>, IggyError> {
let validation = self.validations.get(&algorithm);
- if validation.is_none() {
- return Err(IggyError::InvalidJwtAlgorithm(
- Self::map_algorithm_to_string(algorithm),
- ));
- }
+ let kid = jsonwebtoken::decode_header(token).ok().and_then(|h| h.kid);
+
+ // try to decode using JWKS if it's a trusted issuer
+ let insecure = match
jsonwebtoken::dangerous::insecure_decode::<JwtClaims>(token) {
+ Ok(claims) => claims,
+ Err(_) => {
+ error!("Failed to decode JWT insecurely");
+ return self.decode_with_fallback(token, validation, algorithm);
+ }
+ };
+
+ let normalized_iss = normalize_issuer_url(&insecure.claims.iss);
+ let config = match self.trusted_issuer.get(&normalized_iss) {
+ Some(config) => config,
+ None => {
+ debug!("No trusted issuer found for: {}", insecure.claims.iss);
+ return self.decode_with_fallback(token, validation, algorithm);
+ }
+ };
- let validation = validation.unwrap();
- match jsonwebtoken::decode::<JwtClaims>(token, &self.validator.key,
validation) {
- Ok(claims) => Ok(claims),
- _ => Err(IggyError::Unauthenticated),
+ if config.user_id == 0 {
+ error!(
+ "A2A token cannot map to root user (user_id = 0) for issuer:
{}",
+ config.issuer
+ );
+ return Err(IggyError::Unauthenticated);
}
+
+ let kid_str = match kid.as_deref() {
+ Some(kid) => kid,
+ None => {
+ error!("No kid found in JWT header");
+ return self.decode_with_fallback(token, validation, algorithm);
+ }
+ };
+
+ let decoding_key = match self
+ .jwks_client
+ .get_key(&config.issuer, &config.jwks_url, kid_str)
+ .await
+ {
+ Some(key) => key,
+ None => {
+ error!("Failed to get decoding key from JWKS for kid: {}",
kid_str);
+ return self.decode_with_fallback(token, validation, algorithm);
+ }
+ };
+ let mut validation = Validation::new(algorithm);
+ validation.set_issuer(std::slice::from_ref(&config.issuer));
+ validation.set_audience(std::slice::from_ref(&config.audience));
+
+ let mut result = jsonwebtoken::decode::<JwtClaims>(token,
&decoding_key, &validation)
+ .map_err(|e| {
+ error!("Failed to decode JWT: {}", e);
+ IggyError::Unauthenticated
+ })?;
+
+ result.claims.sub = config.user_id.to_string();
+
+ Ok(result)
+ }
+
+ /// fallback to standard JWT validation if JWKS validation fails
+ fn decode_with_fallback(
+ &self,
+ token: &str,
+ validation: Option<&Validation>,
+ algorithm: Algorithm,
+ ) -> Result<TokenData<JwtClaims>, IggyError> {
+ let validation = validation.ok_or_else(|| {
+
IggyError::InvalidJwtAlgorithm(Self::map_algorithm_to_string(algorithm))
+ })?;
+
+ jsonwebtoken::decode::<JwtClaims>(token, &self.validator.key,
validation)
+ .map_err(|_| IggyError::Unauthenticated)
}
fn map_algorithm_to_string(algorithm: Algorithm) -> String {
@@ -290,3 +388,71 @@ impl JwtManager {
revoked_tokens.contains_key(token_id)
}
}
+
+/// Normalize issuer URL by lowercasing scheme and host, preserving path case
+///
+/// Example: "HTTPS://Example.COM/PATH" -> "https://example.com/PATH"
+fn normalize_issuer_url(url: &str) -> String {
+ match url.split_once("://") {
+ Some((scheme, rest)) => {
+ let scheme = scheme.to_lowercase();
+ // Find end of host (first '/' or end of string)
+ let (host, path) = match rest.find('/') {
+ Some(idx) => rest.split_at(idx),
+ None => (rest, ""),
+ };
+ format!("{}://{}{}", scheme, host.to_lowercase(), path)
+ }
+ None => url.trim_end_matches('/').to_lowercase(),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_normalize_issuer_url_basic() {
+ assert_eq!(
+ normalize_issuer_url("HTTPS://Example.COM/PATH"),
+ "https://example.com/PATH"
+ );
+ }
+
+ #[test]
+ fn test_normalize_issuer_url_no_path() {
+ assert_eq!(
+ normalize_issuer_url("HTTPS://Example.COM"),
+ "https://example.com"
+ );
+ }
+
+ #[test]
+ fn test_normalize_issuer_url_no_scheme() {
+ assert_eq!(normalize_issuer_url("Example.COM"), "example.com");
+ }
+
+ #[test]
+ fn test_normalize_issuer_url_trailing_slash() {
+ assert_eq!(
+ normalize_issuer_url("HTTPS://Example.COM/"),
+ "https://example.com/"
+ );
+ }
+
+ #[test]
+ fn test_normalize_issuer_url_preserves_path_case() {
+ assert_eq!(
+ normalize_issuer_url("https://EXAMPLE.com/MyPath/SubPath"),
+ "https://example.com/MyPath/SubPath"
+ );
+ }
+
+ #[test]
+ fn test_normalize_issuer_url_already_normalized() {
+ assert_eq!(
+ normalize_issuer_url("https://example.com/path"),
+ "https://example.com/path"
+ );
+ }
+}
diff --git a/core/server/src/http/jwt/middleware.rs
b/core/server/src/http/jwt/middleware.rs
index 910e02ccb..dfef00b03 100644
--- a/core/server/src/http/jwt/middleware.rs
+++ b/core/server/src/http/jwt/middleware.rs
@@ -26,7 +26,6 @@ use axum::{
response::Response,
};
use err_trail::ErrContext;
-use iggy_common::IggyError;
use std::sync::Arc;
const COMPONENT: &str = "JWT_MIDDLEWARE";
@@ -79,9 +78,7 @@ pub async fn jwt_auth(
let jwt_claims = state
.jwt_manager
.decode(jwt_token, token_header.alg)
- .error(|e: &IggyError| {
- format!("{COMPONENT} (error: {e}) - failed to decode JWT with
provided algorithm")
- })
+ .await
.map_err(|_| UNAUTHORIZED)?;
if state
.jwt_manager
@@ -92,10 +89,15 @@ pub async fn jwt_auth(
}
let request_details =
request.extensions().get::<RequestDetails>().unwrap();
+ let user_id = jwt_claims
+ .claims
+ .sub
+ .parse::<u32>()
+ .map_err(|_| UNAUTHORIZED)?;
let identity = Identity {
token_id: jwt_claims.claims.jti,
token_expiry: jwt_claims.claims.exp,
- user_id: jwt_claims.claims.sub,
+ user_id,
ip_address: request_details.ip_address,
};
request.extensions_mut().insert(identity);
diff --git a/core/server/src/http/jwt/mod.rs b/core/server/src/http/jwt/mod.rs
index 5481f110c..d5e413cb4 100644
--- a/core/server/src/http/jwt/mod.rs
+++ b/core/server/src/http/jwt/mod.rs
@@ -17,6 +17,7 @@
*/
pub mod json_web_token;
+pub mod jwks;
pub mod jwt_manager;
pub mod middleware;
pub mod storage;