blankensteiner commented on code in PR #104: URL: https://github.com/apache/pulsar-dotpulsar/pull/104#discussion_r879466029
########## tests/DotPulsar.Tests/ProducerTests.cs: ########## @@ -152,6 +152,7 @@ private IPulsarClient CreateClient() => PulsarClient .Builder() .Authentication(AuthenticationFactory.Token(ct => ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan)))) + .KeepAliveInterval(TimeSpan.FromSeconds(5)) Review Comment: Why is this added? ########## tests/DotPulsar.Tests/xunit.runner.json: ########## @@ -1,4 +1,5 @@ { "$schema": "https://xunit.net/schema/current/xunit.runner.schema.json", - "diagnosticMessages": true + "diagnosticMessages": true, + "parallelizeTestCollections": false Review Comment: This can be removed again if you go ahead with just one cluster for all integration tests. ########## DotPulsar.sln: ########## @@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing", "samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Consumer", "tests\DotPulsar.Consumer\DotPulsar.Consumer.csproj", "{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}" Review Comment: Let's get the new tests into DotPulsar.Tests instead of creating new test projects. ########## tests/DotPulsar.Tests/IntegrationCollection.cs: ########## @@ -18,3 +18,6 @@ namespace DotPulsar.Tests; [CollectionDefinition("Integration")] public class IntegrationCollection : ICollectionFixture<IntegrationFixture> { } + +[CollectionDefinition("KeepAlive")] +public class KeepAliveCollection : ICollectionFixture<KeepAliveFixture> { } Review Comment: Is a new (standalone) cluster needed? Seems we could solve this with just one cluster/integration fixture. ########## src/DotPulsar/Abstractions/IPulsarClientBuilder.cs: ########## @@ -53,6 +53,16 @@ public interface IPulsarClientBuilder /// </summary> IPulsarClientBuilder KeepAliveInterval(TimeSpan interval); + /// <summary> + /// The maximum amount of time to wait without receiving any message from the server at + /// which point the connection is assumed to be dead or the server is not responding. + /// As we are sending pings the server should respond to those at a minimum within this specified timeout period. + /// Once this happens the connection will be torn down and all consumers/producers will enter + /// the disconnected state and attempt to reconnect + /// The default is 60 seconds. + /// </summary> + IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval); Review Comment: Is this also a configurable setting for other clients? If not, we could just hardcode it. ########## src/DotPulsar/Internal/PingPongHandler.cs: ########## @@ -25,29 +25,37 @@ public sealed class PingPongHandler : IAsyncDisposable { private readonly IConnection _connection; private readonly TimeSpan _keepAliveInterval; + private readonly TimeSpan _serverResponseTimeout; private readonly Timer _timer; private readonly CommandPing _ping; private readonly CommandPong _pong; private long _lastCommand; + private readonly TaskCompletionSource<object> _serverNotRespondingTcs; Review Comment: The non-generic TaskCompletionSource is a better fit since the object is never used. ########## src/DotPulsar/Internal/PingPongHandler.cs: ########## @@ -61,13 +69,25 @@ private void Watch(object? state) var lastCommand = Interlocked.Read(ref _lastCommand); var now = Stopwatch.GetTimestamp(); var elapsed = TimeSpan.FromSeconds((now - lastCommand) / Stopwatch.Frequency); - if (elapsed >= _keepAliveInterval) + + if (elapsed > _serverResponseTimeout) { - Task.Factory.StartNew(() => SendPing()); - _timer.Change(_keepAliveInterval, TimeSpan.Zero); + DotPulsarMeter.ServerTimedout(); + _serverNotRespondingTcs.SetResult(new object()); Review Comment: You might as well just return here instead of wrapping the following code in an else. ########## src/DotPulsar/Internal/Connection.cs: ########## @@ -294,6 +298,11 @@ private async Task Send(BaseCommand command, CancellationToken cancellationToken } public async Task ProcessIncommingFrames(CancellationToken cancellationToken) + { + await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding); Review Comment: This needs a ConfigureAwait(false) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org