blankensteiner commented on a change in pull request #23:
URL: https://github.com/apache/pulsar-dotpulsar/pull/23#discussion_r449505264



##########
File path: src/DotPulsar/Internal/ProducerChannel.cs
##########
@@ -90,18 +90,16 @@ public Task<CommandSendReceipt> Send(MessageMetadata 
metadata, ReadOnlySequence<
 
                 if (autoAssignSequenceId)
                 {
-                    sendPackage.Command.SequenceId = _sequenceId.Current;
-                    sendPackage.Metadata.SequenceId = _sequenceId.Current;
+                    var newSequenceId = _sequenceId.FetchNext();
+                    sendPackage.Command.SequenceId = newSequenceId;
+                    sendPackage.Metadata.SequenceId = newSequenceId;
                 }
                 else
                     sendPackage.Command.SequenceId = 
sendPackage.Metadata.SequenceId;
 
                 var response = await _connection.Send(sendPackage, 
cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
 
-                if (autoAssignSequenceId)
-                    _sequenceId.Increment();

Review comment:
       The problem with removing this is that the sequenceId will be moved 
forward even if sending the package fails (either the broker rejects it or the 
connection is lost).

##########
File path: src/DotPulsar/DotPulsar.csproj
##########
@@ -23,7 +23,7 @@
   <ItemGroup>    
     <PackageReference Include="Microsoft.Extensions.ObjectPool" 
Version="3.1.5" />    
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" 
PrivateAssets="All" />
-    <PackageReference Include="protobuf-net" Version="2.4.6" />
+    <PackageReference Include="protobuf-net" Version="2.3.*" />

Review comment:
       Hi Tim :-)
   Could you elaborate a bit on the need for this downgrade?
   I'm asking because version 3.0.13 is out now and version 3.1 (which I hope 
will be coming soon) will bring support for [ReadOnly]Memory<T> and 
ReadOnlySequence<T>, which will give a significant performance boost.
   Supporting older version of protobuf-net is therefore not something I aim 
for, but if it's a problem, then we could look into including protobuf-net in 
DotPulsar the same way Newtonsoft.Json is included in Elasticsearch.Net (or 
something like that).

##########
File path: src/DotPulsar/Internal/SequenceId.cs
##########
@@ -12,21 +12,24 @@
  * limitations under the License.
  */
 
+using System.Threading;
+
 namespace DotPulsar.Internal
 {
     public sealed class SequenceId
     {
+        private long _current;
+
         public SequenceId(ulong initialSequenceId)
         {
-            Current = initialSequenceId;
-
-            if (initialSequenceId > 0)
-                Increment();
+            // Subtracting one because Interlocked.Increment will return the 
post-incremented value
+            // which is expected to be the initialSequenceId for the first call
+            _current = unchecked((long)initialSequenceId - 1);
         }
 
-        public ulong Current { get; private set; }
-
-        public void Increment()
-            => ++Current;
+        public ulong FetchNext()
+        {
+            return unchecked((ulong)Interlocked.Increment(ref _current));

Review comment:
       I wonder if it would be more performant to have _current be a ulong and 
just return Interlocked.Increment(ref _current) - 1?

##########
File path: src/DotPulsar/Internal/RequestResponseHandler.cs
##########
@@ -53,31 +54,40 @@ private void SetRequestId(BaseCommand cmd)
             switch (cmd.CommandType)
             {
                 case BaseCommand.Type.Seek:
-                    cmd.Seek.RequestId = _requestId++;
+                    cmd.Seek.RequestId = _requestId.FetchNext();
+                    _pastInitialRequestId = true;

Review comment:
       An alternative implementation would be to store the initial value on the 
SequenceId object and then have an IsPastInitialRequestId comparing that to the 
current number. Let me know what you think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to