amareshmad commented on issue #114:
URL:
https://github.com/apache/pulsar-dotpulsar/issues/114#issuecomment-1274735759
sample app tried in both .net framework and .net core, both sample apps are
working.
same code tested consoles and integrated to product.
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
internal class Program
{
static void Main(string[] args)
{
}
public async Task TestMethod(string zone)
{
try
{
string myTopic = string.Empty;
if (zone == "east")
myTopic = "persistent://east::xxxx.RequestV1.Queue";
else
myTopic = "persistent://west::xxxxx.RequestV1.Queue";
string SERVICE_URL = "pulsar+ssl://xxxxxxxx.com:6651";
string username = "xxxxxxx";
string password = "xxxxxxxx";
Uri sURl = new(SERVICE_URL);
Console.WriteLine($"building consumer client");
ExceptionHandler pException = new ExceptionHandler();
await using var client = PulsarClient.Builder()
.Authentication(new
AuthenticationBasic1(username, password))
.ServiceUrl(sURl)
.VerifyCertificateName(false)
.VerifyCertificateAuthority(false)
.KeepAliveInterval(TimeSpan.FromSeconds(10))
.RetryInterval(TimeSpan.FromSeconds(2))
.ExceptionHandler(pException.OnException)
.Build();
Console.WriteLine($"creating Consumer.......");
await using var consumer = client.NewConsumer()
.SubscriptionName("xxxx.fdp.extraction.xxxxx-default")
.Topic(myTopic)
.SubscriptionType(SubscriptionType.Shared)
.StateChangedHandler(AuthenticationBasic1.Monitor)
.PriorityLevel(1)
.Create();
Console.WriteLine($"Waiting for consumer message.......");
await foreach (var message in consumer.Messages())
{
Console.WriteLine($"Received consumer:
{Encoding.UTF8.GetString(message.Data.ToArray())}");
await consumer.Acknowledge(message);
}
}
catch (PulsarClientClosedException ex)
{
Console.WriteLine("PulsarClientClosedException message:{0},
stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString());
}
catch (PulsarClientDisposedException ex)
{
Console.WriteLine("Exception PulsarClientDisposedException
message:{0}, stack trace:{1}", ex.Message.ToString(), ex.StackTrace.ToString());
}
catch (Exception ex)
{
Console.WriteLine("Exception message:{0}, stack trace:{1}",
ex.Message.ToString(), ex.StackTrace.ToString());
}
}
}
public class AuthenticationBasic1 : IAuthentication
{
private string username;
private string password;
// static private readonly ILog Console.WriteLine =
LogManager.GetLogger(typeof(AuthenticationBasic1));
public AuthenticationBasic1(string uname, string passwd)
{
username = uname;
password = passwd;
}
public string AuthenticationMethodName
{
get { return "oms3.0"; }
}
public ValueTask<byte[]> GetAuthenticationData(CancellationToken
cancellationToken)
{
return new ValueTask<byte[]>(Encoding.UTF8.GetBytes(username +
":" + password));
}
public static void Monitor(ConsumerStateChanged stateChanged)
{
var stateMessage = stateChanged.ConsumerState switch
{
ConsumerState.Active => "is active",
ConsumerState.Inactive => "is inactive",
ConsumerState.Disconnected => "is disconnected",
ConsumerState.Closed => "has closed",
ConsumerState.ReachedEndOfTopic => "has reached end of
topic",
ConsumerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ConsumerState}'"
};
var topic = stateChanged.Consumer.Topic;
var state = stateChanged.ConsumerState;
Console.WriteLine($"The consumer for topic '{topic}' changed
state to '{state}'");
}
}
public interface IHandleException
{
/// <summary>
/// Called after an action has thrown an Exception.
/// </summary>
ValueTask OnException(ExceptionContext exceptionContext);
}
public class ExceptionHandler : IHandleException
{
//static private readonly ILog Console.WriteLine =
LogManager.GetLogger(typeof(ExceptionHandler));
public ValueTask OnException(ExceptionContext exceptionContext)
{
Console.WriteLine("An error occured while creating pulsar
client", exceptionContext.Exception);
return new ValueTask();
}
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]