blankensteiner commented on a change in pull request #23: URL: https://github.com/apache/pulsar-dotpulsar/pull/23#discussion_r449505264
########## File path: src/DotPulsar/Internal/ProducerChannel.cs ########## @@ -90,18 +90,16 @@ public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence< if (autoAssignSequenceId) { - sendPackage.Command.SequenceId = _sequenceId.Current; - sendPackage.Metadata.SequenceId = _sequenceId.Current; + var newSequenceId = _sequenceId.FetchNext(); + sendPackage.Command.SequenceId = newSequenceId; + sendPackage.Metadata.SequenceId = newSequenceId; } else sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId; var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false); response.Expect(BaseCommand.Type.SendReceipt); - if (autoAssignSequenceId) - _sequenceId.Increment(); Review comment: The problem with removing this is that the sequenceId will be moved forward even if sending the package fails (either the broker rejects it or the connection is lost). ########## File path: src/DotPulsar/DotPulsar.csproj ########## @@ -23,7 +23,7 @@ <ItemGroup> <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.5" /> <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" /> - <PackageReference Include="protobuf-net" Version="2.4.6" /> + <PackageReference Include="protobuf-net" Version="2.3.*" /> Review comment: Hi Tim :-) Could you elaborate a bit on the need for this downgrade? I'm asking because version 3.0.13 is out now and version 3.1 (which I hope will be coming soon) will bring support for [ReadOnly]Memory<T> and ReadOnlySequence<T>, which will give a significant performance boost. Supporting older version of protobuf-net is therefore not something I aim for, but if it's a problem, then we could look into including protobuf-net in DotPulsar the same way Newtonsoft.Json is included in Elasticsearch.Net (or something like that). ########## File path: src/DotPulsar/Internal/SequenceId.cs ########## @@ -12,21 +12,24 @@ * limitations under the License. */ +using System.Threading; + namespace DotPulsar.Internal { public sealed class SequenceId { + private long _current; + public SequenceId(ulong initialSequenceId) { - Current = initialSequenceId; - - if (initialSequenceId > 0) - Increment(); + // Subtracting one because Interlocked.Increment will return the post-incremented value + // which is expected to be the initialSequenceId for the first call + _current = unchecked((long)initialSequenceId - 1); } - public ulong Current { get; private set; } - - public void Increment() - => ++Current; + public ulong FetchNext() + { + return unchecked((ulong)Interlocked.Increment(ref _current)); Review comment: I wonder if it would be more performant to have _current be a ulong and just return Interlocked.Increment(ref _current) - 1? ########## File path: src/DotPulsar/Internal/RequestResponseHandler.cs ########## @@ -53,31 +54,40 @@ private void SetRequestId(BaseCommand cmd) switch (cmd.CommandType) { case BaseCommand.Type.Seek: - cmd.Seek.RequestId = _requestId++; + cmd.Seek.RequestId = _requestId.FetchNext(); + _pastInitialRequestId = true; Review comment: An alternative implementation would be to store the initial value on the SequenceId object and then have an IsPastInitialRequestId comparing that to the current number. Let me know what you think. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org