This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch headers_encryption in repository https://gitbox.apache.org/repos/asf/iggy.git
commit da14ba9252b202faf52448529df1792ff179bb3f Author: spetz <[email protected]> AuthorDate: Mon Mar 30 08:20:00 2026 +0200 Add more c# tests --- .../HeaderEncryptionIntegrationTests.cs | 221 +++++++++++++++++++++ foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs | 14 +- foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs | 9 + .../csharp/Iggy_SDK/Publishers/IggyPublisher.cs | 9 +- 4 files changed, 249 insertions(+), 4 deletions(-) diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs b/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs new file mode 100644 index 000000000..1a3377bd5 --- /dev/null +++ b/foreign/csharp/Iggy_SDK.Tests.Integration/HeaderEncryptionIntegrationTests.cs @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Text; +using Apache.Iggy.Consumers; +using Apache.Iggy.Encryption; +using Apache.Iggy.Enums; +using Apache.Iggy.Headers; +using Apache.Iggy.IggyClient; +using Apache.Iggy.Kinds; +using Apache.Iggy.Mappers; +using Apache.Iggy.Messages; +using Apache.Iggy.Publishers; +using Apache.Iggy.Tests.Integrations.Fixtures; +using Shouldly; +using Partitioning = Apache.Iggy.Kinds.Partitioning; + +namespace Apache.Iggy.Tests.Integrations; + +public class HeaderEncryptionIntegrationTests +{ + [ClassDataSource<IggyServerFixture>(Shared = SharedType.PerAssembly)] + public required IggyServerFixture Fixture { get; init; } + + [Test] + [MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))] + public async Task SendMessages_WithEncryptedHeaders_Should_NotBeReadableWithoutDecryptor(Protocol protocol) + { + var client = protocol == Protocol.Tcp + ? await Fixture.CreateTcpClient() + : await Fixture.CreateHttpClient(); + + var encryptor = CreateEncryptor(); + var testStream = await CreateTestStream(client, protocol); + var streamId = Identifier.String(testStream.StreamId); + var topicId = Identifier.String(testStream.TopicId); + + // Send message with encrypted headers via publisher + var publisher = IggyPublisherBuilder + .Create(client, streamId, topicId) + .WithPartitioning(Partitioning.PartitionId(0)) + .WithEncryptor(encryptor) + .Build(); + + await publisher.InitAsync(); + + var headers = CreateTestHeaders(); + var messages = new List<Message> + { + new(Guid.NewGuid(), Encoding.UTF8.GetBytes("encrypted payload"), headers) + }; + + await publisher.SendMessagesAsync(messages); + await publisher.DisposeAsync(); + + // Poll with a normal client (no decryptor) + var polled = await client.PollMessagesAsync( + streamId, topicId, 0, + Consumer.New(0), + PollingStrategy.Next(), + 1, false); + + polled.Messages.Count.ShouldBe(1); + var msg = polled.Messages[0]; + + // Payload should be encrypted (not readable as plaintext) + Encoding.UTF8.GetString(msg.Payload).ShouldNotBe("encrypted payload"); + + // Extract encrypted header bytes: + // TCP path: RawUserHeaders is set, UserHeaders is null + // HTTP path: UserHeaders contains sentinel key with encrypted bytes + byte[]? encryptedHeaderBytes = msg.RawUserHeaders; + if (encryptedHeaderBytes is null + && msg.UserHeaders is { Count: 1 } + && msg.UserHeaders.TryGetValue(HeaderKey.EncryptedHeadersSentinel, out var sentinelValue)) + { + encryptedHeaderBytes = sentinelValue.Value; + } + + encryptedHeaderBytes.ShouldNotBeNull(); + encryptedHeaderBytes!.Length.ShouldBeGreaterThan(0); + + // Manually decrypt and verify payload + var decryptedPayload = encryptor.Decrypt(msg.Payload); + Encoding.UTF8.GetString(decryptedPayload).ShouldBe("encrypted payload"); + + // Manually decrypt and verify headers + var decryptedHeaderBytesResult = encryptor.Decrypt(encryptedHeaderBytes); + var decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytesResult); + decryptedHeaders.Count.ShouldBe(3); + + var typeHeader = decryptedHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() }]; + Encoding.UTF8.GetString(typeHeader.Value).ShouldBe("test-message"); + } + + [Test] + [MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))] + public async Task ReceiveAsync_WithDecryptor_Should_DecryptHeadersCorrectly(Protocol protocol) + { + var client = protocol == Protocol.Tcp + ? await Fixture.CreateTcpClient() + : await Fixture.CreateHttpClient(); + + var encryptor = CreateEncryptor(); + var testStream = await CreateTestStream(client, protocol); + var streamId = Identifier.String(testStream.StreamId); + var topicId = Identifier.String(testStream.TopicId); + + // Send encrypted message via publisher + var publisher = IggyPublisherBuilder + .Create(client, streamId, topicId) + .WithPartitioning(Partitioning.PartitionId(0)) + .WithEncryptor(encryptor) + .Build(); + + await publisher.InitAsync(); + + var headers = CreateTestHeaders(); + var messages = new List<Message> + { + new(Guid.NewGuid(), Encoding.UTF8.GetBytes("consumer test payload"), headers) + }; + + await publisher.SendMessagesAsync(messages); + await publisher.DisposeAsync(); + + // Poll with IggyConsumer that has decryptor configured + var consumer = IggyConsumerBuilder + .Create(client, streamId, topicId, Consumer.New(1)) + .WithPollingStrategy(PollingStrategy.Next()) + .WithBatchSize(1) + .WithPartitionId(0) + .WithAutoCommitMode(AutoCommitMode.Disabled) + .WithDecryptor(encryptor) + .Build(); + + await consumer.InitAsync(); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + ReceivedMessage? received = null; + + await foreach (var msg in consumer.ReceiveAsync(cts.Token)) + { + received = msg; + break; + } + + await consumer.DisposeAsync(); + + // Message should be successfully decrypted + received.ShouldNotBeNull(); + received!.Status.ShouldBe(MessageStatus.Success); + + // Payload should be decrypted + Encoding.UTF8.GetString(received.Message.Payload).ShouldBe("consumer test payload"); + + // Headers should be decrypted and parsed + received.Message.UserHeaders.ShouldNotBeNull(); + received.Message.UserHeaders!.Count.ShouldBe(3); + + var batchHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "batch"u8.ToArray() }]; + BitConverter.ToUInt64(batchHeader.Value).ShouldBe(1UL); + + var typeHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() }]; + Encoding.UTF8.GetString(typeHeader.Value).ShouldBe("test-message"); + + var encHeader = received.Message.UserHeaders[new HeaderKey { Kind = HeaderKind.String, Value = "encrypted"u8.ToArray() }]; + encHeader.Value[0].ShouldBe((byte)1); + } + + private static AesMessageEncryptor CreateEncryptor() + { + return new AesMessageEncryptor(AesMessageEncryptor.GenerateKey()); + } + + private static Dictionary<HeaderKey, HeaderValue> CreateTestHeaders() + { + return new Dictionary<HeaderKey, HeaderValue> + { + { + new HeaderKey { Kind = HeaderKind.String, Value = "batch"u8.ToArray() }, + new HeaderValue { Kind = HeaderKind.Uint64, Value = BitConverter.GetBytes(1UL) } + }, + { + new HeaderKey { Kind = HeaderKind.String, Value = "type"u8.ToArray() }, + new HeaderValue { Kind = HeaderKind.String, Value = "test-message"u8.ToArray() } + }, + { + new HeaderKey { Kind = HeaderKind.String, Value = "encrypted"u8.ToArray() }, + new HeaderValue { Kind = HeaderKind.Bool, Value = [1] } + }, + }; + } + + private async Task<TestStreamInfo> CreateTestStream(IIggyClient client, Protocol protocol) + { + var streamId = $"enc_stream_{Guid.NewGuid()}_{protocol.ToString().ToLowerInvariant()}"; + var topicId = "enc_topic"; + + await client.CreateStreamAsync(streamId); + await client.CreateTopicAsync(Identifier.String(streamId), topicId, 1); + + return new TestStreamInfo(streamId, topicId); + } + + private record TestStreamInfo(string StreamId, string TopicId); +} diff --git a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs index 74713deab..1f656356f 100644 --- a/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs +++ b/foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs @@ -383,10 +383,18 @@ public partial class IggyConsumer : IAsyncDisposable { var decryptedPayload = _config.MessageEncryptor.Decrypt(message.Payload); - Dictionary<HeaderKey, HeaderValue>? decryptedHeaders = message.UserHeaders; - if (decryptedHeaders is null && message.RawUserHeaders is { Length: > 0 }) + byte[]? encryptedHeaderBytes = message.RawUserHeaders; + if (encryptedHeaderBytes is null + && message.UserHeaders is { Count: 1 } + && message.UserHeaders.TryGetValue(HeaderKey.EncryptedHeadersSentinel, out var sentinelValue)) { - var decryptedHeaderBytes = _config.MessageEncryptor.Decrypt(message.RawUserHeaders); + encryptedHeaderBytes = sentinelValue.Value; + } + + Dictionary<HeaderKey, HeaderValue>? decryptedHeaders = null; + if (encryptedHeaderBytes is { Length: > 0 }) + { + var decryptedHeaderBytes = _config.MessageEncryptor.Decrypt(encryptedHeaderBytes); decryptedHeaders = BinaryMapper.MapHeaders(decryptedHeaderBytes); } diff --git a/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs b/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs index b6a19ad21..ddb2ecb2d 100644 --- a/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs +++ b/foreign/csharp/Iggy_SDK/Headers/HeaderKey.cs @@ -100,6 +100,15 @@ public readonly struct HeaderKey : IEquatable<HeaderKey> return hash.ToHashCode(); } + /// <summary> + /// Sentinel key used to wrap encrypted user headers as a single raw header entry. + /// </summary> + internal static readonly HeaderKey EncryptedHeadersSentinel = new() + { + Kind = HeaderKind.Raw, + Value = "__encrypted_headers"u8.ToArray() + }; + /// <summary> /// Determines whether two HeaderKey instances are equal. /// </summary> diff --git a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs index 68ea63e2e..81550e7fc 100644 --- a/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs +++ b/foreign/csharp/Iggy_SDK/Publishers/IggyPublisher.cs @@ -18,6 +18,7 @@ using Apache.Iggy.Contracts.Tcp; using Apache.Iggy.Enums; using Apache.Iggy.Exceptions; +using Apache.Iggy.Headers; using Apache.Iggy.IggyClient; using Apache.Iggy.Messages; using Microsoft.Extensions.Logging; @@ -331,7 +332,13 @@ public partial class IggyPublisher : IAsyncDisposable var headerBytes = TcpContracts.GetHeadersBytes(message.UserHeaders); var encryptedHeaderBytes = _config.MessageEncryptor.Encrypt(headerBytes); message.RawUserHeaders = encryptedHeaderBytes; - message.UserHeaders = null; + message.UserHeaders = new Dictionary<HeaderKey, HeaderValue> + { + { + HeaderKey.EncryptedHeadersSentinel, + new HeaderValue { Kind = HeaderKind.Raw, Value = encryptedHeaderBytes } + } + }; message.Header.UserHeadersLength = encryptedHeaderBytes.Length; } }
