amareshmad commented on issue #114:
URL: 
https://github.com/apache/pulsar-dotpulsar/issues/114#issuecomment-1274735759

   sample app tried in both .net framework and .net core, both sample apps are 
working.
   
   same code tested consoles and integrated to product.
   using DotPulsar;
   using DotPulsar.Abstractions;
   using DotPulsar.Exceptions;
   using DotPulsar.Extensions;
   using System;
   using System.Buffers;
   using System.Collections.Generic;
   using System.Linq;
   using System.Text;
   using System.Threading;
   using System.Threading.Tasks;
   
   namespace ConsoleApp1
   {
       internal class Program
       {
           static void Main(string[] args)
           {
           }
           public async Task TestMethod(string zone)
           {
   
               try
               {
                   string myTopic = string.Empty;
                   if (zone == "east")
                       myTopic = "persistent://east::xxxx.RequestV1.Queue";
   
                   else
                       myTopic = "persistent://west::xxxxx.RequestV1.Queue";
   
                   string SERVICE_URL = "pulsar+ssl://xxxxxxxx.com:6651";
                   string username = "xxxxxxx";
                   string password = "xxxxxxxx";
                   Uri sURl = new(SERVICE_URL);
   
                   Console.WriteLine($"building consumer client");
                   ExceptionHandler pException = new ExceptionHandler();
   
                   await using var client = PulsarClient.Builder()
                                                        .Authentication(new 
AuthenticationBasic1(username, password))
                                                        .ServiceUrl(sURl)
                                                        
.VerifyCertificateName(false)
                                                        
.VerifyCertificateAuthority(false)
                                                        
.KeepAliveInterval(TimeSpan.FromSeconds(10))
                                                        
.RetryInterval(TimeSpan.FromSeconds(2))
                                                        
.ExceptionHandler(pException.OnException)
                                                         .Build();
   
                   Console.WriteLine($"creating Consumer.......");
                   await using var consumer = client.NewConsumer()
                                                     
.SubscriptionName("xxxx.fdp.extraction.xxxxx-default")
                                                    .Topic(myTopic)
                                                    
.SubscriptionType(SubscriptionType.Shared)
                                                    
.StateChangedHandler(AuthenticationBasic1.Monitor)
                                                    .PriorityLevel(1)
                                                    .Create();
   
                   Console.WriteLine($"Waiting for consumer message.......");
                   await foreach (var message in consumer.Messages())
                   {
                       Console.WriteLine($"Received consumer: 
{Encoding.UTF8.GetString(message.Data.ToArray())}");
   
                       await consumer.Acknowledge(message);
                   }
               }
               catch (PulsarClientClosedException ex)
               {
                   Console.WriteLine("PulsarClientClosedException message:{0}, 
stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString());
               }
               catch (PulsarClientDisposedException ex)
               {
                   Console.WriteLine("Exception PulsarClientDisposedException 
message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString());
               }
               catch (Exception ex)
               {
                   Console.WriteLine("Exception message:{0}, stack trace:{1}", 
ex.Message.ToString(), ex.StackTrace.ToString());
               }
           }
   
   
       }
   
   
       public class AuthenticationBasic1 : IAuthentication
       {
           private string username;
           private string password;
   
          // static private readonly ILog Console.WriteLine = 
LogManager.GetLogger(typeof(AuthenticationBasic1));
           public AuthenticationBasic1(string uname, string passwd)
           {
               username = uname;
               password = passwd;
           }
   
           public string AuthenticationMethodName
           {
               get { return "oms3.0"; }
           }
   
           public ValueTask<byte[]> GetAuthenticationData(CancellationToken 
cancellationToken)
           {
               return new ValueTask<byte[]>(Encoding.UTF8.GetBytes(username + 
":" + password));
           }
   
           public static void Monitor(ConsumerStateChanged stateChanged)
           {
               var stateMessage = stateChanged.ConsumerState switch
               {
                   ConsumerState.Active => "is active",
                   ConsumerState.Inactive => "is inactive",
                   ConsumerState.Disconnected => "is disconnected",
                   ConsumerState.Closed => "has closed",
                   ConsumerState.ReachedEndOfTopic => "has reached end of 
topic",
                   ConsumerState.Faulted => "has faulted",
                   _ => $"has an unknown state '{stateChanged.ConsumerState}'"
               };
   
               var topic = stateChanged.Consumer.Topic;
               var state = stateChanged.ConsumerState;
               Console.WriteLine($"The consumer for topic '{topic}' changed 
state to '{state}'");
           }
       }
   
       public interface IHandleException
       {
           /// <summary>
           /// Called after an action has thrown an Exception.
           /// </summary>
           ValueTask OnException(ExceptionContext exceptionContext);
       }
   
       public class ExceptionHandler : IHandleException
       {
           //static private readonly ILog Console.WriteLine = 
LogManager.GetLogger(typeof(ExceptionHandler));
   
           public ValueTask OnException(ExceptionContext exceptionContext)
           {
               Console.WriteLine("An error occured while creating pulsar 
client", exceptionContext.Exception);
               return new ValueTask();
           }
       }
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to