blankensteiner commented on a change in pull request #47: URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446158457
########## File path: samples/Consuming/Program.cs ########## @@ -14,91 +14,93 @@ namespace Consuming { - using DotPulsar; - using DotPulsar.Abstractions; - using DotPulsar.Extensions; - using System; - using System.Buffers; - using System.Text; - using System.Threading; - using System.Threading.Tasks; - - internal static class Program + using DotPulsar; + using DotPulsar.Abstractions; + using DotPulsar.Extensions; + using System; + using System.Buffers; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + + internal static class Program + { + private static async Task Main(string[] args) { - private static async Task Main(string[] args) - { - const string myTopic = "persistent://public/default/mytopic"; + const string myTopic = "persistent://public/default/mytopic"; - await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650 + await using var client = PulsarClient.Builder() + .ServiceUrl(new Uri("pulsar://host.docker.internal:6650")) Review comment: Correct me if I'm wrong, but all the changes to this file can be discarded as they are style changes and the one change here is (the hostname) is not something needed for this feature to work. ########## File path: src/DotPulsar/Internal/Connection.cs ########## @@ -97,6 +97,9 @@ public Task Send(CommandAck command, CancellationToken cancellationToken) public Task Send(CommandFlow command, CancellationToken cancellationToken) => Send(command.AsBaseCommand(), cancellationToken); + public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken) + => Send(command.AsBaseCommand(), cancellationToken); Review comment: It looks like your IDE has a different style setting since it only indents with 2 spaces. ########## File path: samples/Producing/Program.cs ########## @@ -14,88 +14,90 @@ namespace Producing { - using DotPulsar; - using DotPulsar.Abstractions; - using DotPulsar.Extensions; - using System; - using System.Text; - using System.Threading; - using System.Threading.Tasks; - - internal static class Program + using DotPulsar; + using DotPulsar.Abstractions; + using DotPulsar.Extensions; + using System; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + + internal static class Program + { + private static async Task Main(string[] args) { - private static async Task Main(string[] args) - { - const string myTopic = "persistent://public/default/mytopic"; + const string myTopic = "persistent://public/default/mytopic"; - await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650 + await using var client = PulsarClient.Builder() + .ServiceUrl(new Uri("pulsar://host.docker.internal:6650")) Review comment: Correct me if I'm wrong, but all the changes to this file can be discarded as they are style changes and the one change here is (the hostname) is not something needed for this feature to work. ########## File path: src/DotPulsar/Abstractions/IConsumer.cs ########## @@ -106,5 +106,15 @@ public interface IConsumer : IAsyncDisposable /// Unsubscribe the consumer. /// </summary> ValueTask Unsubscribe(CancellationToken cancellationToken = default); + + /// <summary> + /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged. + /// </summary> + ValueTask RedeliverUnacknowledgedMessages(List<MessageId> messageIds, CancellationToken cancellationToken); Review comment: Public API's should receive/return IEnumerable<T> instead of concrete collection types like List<T> ########## File path: src/DotPulsar/Internal/Consumer.cs ########## @@ -104,6 +107,12 @@ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken) => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false); + public async ValueTask RedeliverUnacknowledgedMessages(List<MessageId> messageIds, CancellationToken cancellationToken) + => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken); + + public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken) + => await RedeliverUnacknowledgedMessages(new List<MessageIdData>(), cancellationToken); Review comment: Use Enumerable.Empty<MessageIdData>() instead or creating a new list. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org