lizhanhui commented on code in PR #682: URL: https://github.com/apache/rocketmq-clients/pull/682#discussion_r1497660393
########## rust/src/producer.rs: ########## @@ -93,23 +116,80 @@ impl Producer { }; let logger = log::logger(option.logging_format()); let settings = build_producer_settings(&option, &client_option); - let mut client = Client::new(&logger, client_option, settings)?; - client.set_transaction_checker(transaction_checker); + let client = Client::new(&logger, client_option, settings)?; + let option = Arc::new(RwLock::new(option)); Ok(Producer { option, logger, client, + transaction_checker: Some(transaction_checker), + shutdown_tx: None, }) } + async fn get_resource_namespace(&self) -> String { + let option_guard = self.option.read(); + let resource_namespace = option_guard.await.namespace().to_string(); + resource_namespace + } + /// Start the producer pub async fn start(&mut self) -> Result<(), ClientError> { - self.client.start().await?; - if let Some(topics) = self.option.topics() { + let (telemetry_command_tx, mut telemetry_command_rx) = mpsc::channel(16); + let telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command> = + telemetry_command_tx; + self.client.start(telemetry_command_tx).await?; + let option_guard = self.option.read().await; + let topics = option_guard.topics(); + if let Some(topics) = topics { for topic in topics { self.client.topic_route(topic, true).await?; } } + drop(option_guard); + let transaction_checker = self.transaction_checker.take(); + if transaction_checker.is_some() { + self.transaction_checker = Some(Box::new(|_, _| TransactionResolution::UNKNOWN)); Review Comment: Why not simply take the transaction checker and leave None afterward? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org