amareshmad commented on issue #114: URL: https://github.com/apache/pulsar-dotpulsar/issues/114#issuecomment-1274735759
sample app tried in both .net framework and .net core, both sample apps are working. same code tested consoles and integrated to product. using DotPulsar; using DotPulsar.Abstractions; using DotPulsar.Exceptions; using DotPulsar.Extensions; using System; using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { internal class Program { static void Main(string[] args) { } public async Task TestMethod(string zone) { try { string myTopic = string.Empty; if (zone == "east") myTopic = "persistent://east::xxxx.RequestV1.Queue"; else myTopic = "persistent://west::xxxxx.RequestV1.Queue"; string SERVICE_URL = "pulsar+ssl://xxxxxxxx.com:6651"; string username = "xxxxxxx"; string password = "xxxxxxxx"; Uri sURl = new(SERVICE_URL); Console.WriteLine($"building consumer client"); ExceptionHandler pException = new ExceptionHandler(); await using var client = PulsarClient.Builder() .Authentication(new AuthenticationBasic1(username, password)) .ServiceUrl(sURl) .VerifyCertificateName(false) .VerifyCertificateAuthority(false) .KeepAliveInterval(TimeSpan.FromSeconds(10)) .RetryInterval(TimeSpan.FromSeconds(2)) .ExceptionHandler(pException.OnException) .Build(); Console.WriteLine($"creating Consumer......."); await using var consumer = client.NewConsumer() .SubscriptionName("xxxx.fdp.extraction.xxxxx-default") .Topic(myTopic) .SubscriptionType(SubscriptionType.Shared) .StateChangedHandler(AuthenticationBasic1.Monitor) .PriorityLevel(1) .Create(); Console.WriteLine($"Waiting for consumer message......."); await foreach (var message in consumer.Messages()) { Console.WriteLine($"Received consumer: {Encoding.UTF8.GetString(message.Data.ToArray())}"); await consumer.Acknowledge(message); } } catch (PulsarClientClosedException ex) { Console.WriteLine("PulsarClientClosedException message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString()); } catch (PulsarClientDisposedException ex) { Console.WriteLine("Exception PulsarClientDisposedException message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString()); } catch (Exception ex) { Console.WriteLine("Exception message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString()); } } } public class AuthenticationBasic1 : IAuthentication { private string username; private string password; // static private readonly ILog Console.WriteLine = LogManager.GetLogger(typeof(AuthenticationBasic1)); public AuthenticationBasic1(string uname, string passwd) { username = uname; password = passwd; } public string AuthenticationMethodName { get { return "oms3.0"; } } public ValueTask<byte[]> GetAuthenticationData(CancellationToken cancellationToken) { return new ValueTask<byte[]>(Encoding.UTF8.GetBytes(username + ":" + password)); } public static void Monitor(ConsumerStateChanged stateChanged) { var stateMessage = stateChanged.ConsumerState switch { ConsumerState.Active => "is active", ConsumerState.Inactive => "is inactive", ConsumerState.Disconnected => "is disconnected", ConsumerState.Closed => "has closed", ConsumerState.ReachedEndOfTopic => "has reached end of topic", ConsumerState.Faulted => "has faulted", _ => $"has an unknown state '{stateChanged.ConsumerState}'" }; var topic = stateChanged.Consumer.Topic; var state = stateChanged.ConsumerState; Console.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'"); } } public interface IHandleException { /// <summary> /// Called after an action has thrown an Exception. /// </summary> ValueTask OnException(ExceptionContext exceptionContext); } public class ExceptionHandler : IHandleException { //static private readonly ILog Console.WriteLine = LogManager.GetLogger(typeof(ExceptionHandler)); public ValueTask OnException(ExceptionContext exceptionContext) { Console.WriteLine("An error occured while creating pulsar client", exceptionContext.Exception); return new ValueTask(); } } } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org