blankensteiner commented on code in PR #104:
URL: https://github.com/apache/pulsar-dotpulsar/pull/104#discussion_r879466029


##########
tests/DotPulsar.Tests/ProducerTests.cs:
##########
@@ -152,6 +152,7 @@ private IPulsarClient CreateClient()
         => PulsarClient
         .Builder()
         .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+        .KeepAliveInterval(TimeSpan.FromSeconds(5))

Review Comment:
   Why is this added?



##########
tests/DotPulsar.Tests/xunit.runner.json:
##########
@@ -1,4 +1,5 @@
 {
   "$schema": "https://xunit.net/schema/current/xunit.runner.schema.json";,
-  "diagnosticMessages": true
+  "diagnosticMessages": true,
+  "parallelizeTestCollections": false

Review Comment:
   This can be removed again if you go ahead with just one cluster for all 
integration tests.



##########
DotPulsar.sln:
##########
@@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution 
Items", "Solution
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing", 
"samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}"
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Consumer", 
"tests\DotPulsar.Consumer\DotPulsar.Consumer.csproj", 
"{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}"

Review Comment:
   Let's get the new tests into DotPulsar.Tests instead of creating new test 
projects.



##########
tests/DotPulsar.Tests/IntegrationCollection.cs:
##########
@@ -18,3 +18,6 @@ namespace DotPulsar.Tests;
 
 [CollectionDefinition("Integration")]
 public class IntegrationCollection : ICollectionFixture<IntegrationFixture> { }
+
+[CollectionDefinition("KeepAlive")]
+public class KeepAliveCollection : ICollectionFixture<KeepAliveFixture> { }

Review Comment:
   Is a new (standalone) cluster needed? 
   Seems we could solve this with just one cluster/integration fixture.



##########
src/DotPulsar/Abstractions/IPulsarClientBuilder.cs:
##########
@@ -53,6 +53,16 @@ public interface IPulsarClientBuilder
     /// </summary>
     IPulsarClientBuilder KeepAliveInterval(TimeSpan interval);
 
+    /// <summary>
+    /// The maximum amount of time to wait without receiving any message from 
the server at
+    /// which point the connection is assumed to be dead or the server is not 
responding.
+    /// As we are sending pings the server should respond to those at a 
minimum within this specified timeout period.
+    /// Once this happens the connection will be torn down and all 
consumers/producers will enter
+    /// the disconnected state and attempt to reconnect
+    /// The default is 60 seconds.
+    /// </summary>
+    IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval);

Review Comment:
   Is this also a configurable setting for other clients? If not, we could just 
hardcode it.



##########
src/DotPulsar/Internal/PingPongHandler.cs:
##########
@@ -25,29 +25,37 @@ public sealed class PingPongHandler : IAsyncDisposable
 {
     private readonly IConnection _connection;
     private readonly TimeSpan _keepAliveInterval;
+    private readonly TimeSpan _serverResponseTimeout;
     private readonly Timer _timer;
     private readonly CommandPing _ping;
     private readonly CommandPong _pong;
     private long _lastCommand;
+    private readonly TaskCompletionSource<object> _serverNotRespondingTcs;

Review Comment:
   The non-generic TaskCompletionSource is a better fit since the object is 
never used.



##########
src/DotPulsar/Internal/PingPongHandler.cs:
##########
@@ -61,13 +69,25 @@ private void Watch(object? state)
             var lastCommand = Interlocked.Read(ref _lastCommand);
             var now = Stopwatch.GetTimestamp();
             var elapsed = TimeSpan.FromSeconds((now - lastCommand) / 
Stopwatch.Frequency);
-            if (elapsed >= _keepAliveInterval)
+
+            if (elapsed > _serverResponseTimeout)
             {
-                Task.Factory.StartNew(() => SendPing());
-                _timer.Change(_keepAliveInterval, TimeSpan.Zero);
+                DotPulsarMeter.ServerTimedout();
+                _serverNotRespondingTcs.SetResult(new object());

Review Comment:
   You might as well just return here instead of wrapping the following code in 
an else.



##########
src/DotPulsar/Internal/Connection.cs:
##########
@@ -294,6 +298,11 @@ private async Task Send(BaseCommand command, 
CancellationToken cancellationToken
     }
 
     public async Task ProcessIncommingFrames(CancellationToken 
cancellationToken)
+    {
+        await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), 
_pingPongHandler.ServerNotResponding);

Review Comment:
   This needs a ConfigureAwait(false)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to