This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit c8af7a4c41fd000a129b5bf4311f8cd297677c18 Author: Aaron Ai <[email protected]> AuthorDate: Mon Mar 6 11:30:14 2023 +0800 Adjust settings sync frequency --- csharp/rocketmq-client-csharp/Client.cs | 7 +------ csharp/rocketmq-client-csharp/Session.cs | 13 ++++--------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index e78a8651..b3a38dc6 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -40,7 +40,7 @@ namespace Org.Apache.Rocketmq private readonly CancellationTokenSource _topicRouteUpdateCts; private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1); - private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromSeconds(1); + private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5); private readonly CancellationTokenSource _settingsSyncCts; private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(60); @@ -227,11 +227,6 @@ namespace Org.Apache.Rocketmq Logger.Error(e, $"Failed to update topic route cache, topic={item}"); } } - - foreach (var topic in GetTopics()) - { - await FetchTopicRoute(topic); - } } catch (Exception e) { diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs index c6e98418..86316b9d 100644 --- a/csharp/rocketmq-client-csharp/Session.cs +++ b/csharp/rocketmq-client-csharp/Session.cs @@ -17,7 +17,6 @@ using System; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using Grpc.Core; using grpc = Grpc.Core; @@ -32,12 +31,12 @@ namespace Org.Apache.Rocketmq private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger(); private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3); + private readonly ManualResetEventSlim _event = new(false); private readonly AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> _streamingCall; private readonly Client _client; - private readonly Channel<bool> _channel; private readonly Endpoints _endpoints; private readonly SemaphoreSlim _semaphore; @@ -49,10 +48,6 @@ namespace Org.Apache.Rocketmq _semaphore = new SemaphoreSlim(1); _streamingCall = streamingCall; _client = client; - _channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(1) - { - FullMode = BoundedChannelFullMode.DropOldest - }); Loop(); } @@ -66,7 +61,7 @@ namespace Org.Apache.Rocketmq public async Task SyncSettings(bool awaitResp) { // Add more buffer time. - await _semaphore.WaitAsync(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout)); + await _semaphore.WaitAsync(); try { var settings = _client.GetSettings(); @@ -78,7 +73,7 @@ namespace Org.Apache.Rocketmq // await writer.CompleteAsync(); if (awaitResp) { - await _channel.Reader.ReadAsync(); + _event.Wait(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout)); } } finally @@ -100,7 +95,7 @@ namespace Org.Apache.Rocketmq Logger.Info( $"Receive setting from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}"); _client.OnSettingsCommand(_endpoints, response.Settings); - await _channel.Writer.WriteAsync(true); + _event.Set(); break; } case Proto.TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
