Re: [I] Using WithConsumeGoroutineNums in pushconsumer may not work as expected. [rocketmq-client-go]
cserwen commented on issue #1135: URL: https://github.com/apache/rocketmq-client-go/issues/1135#issuecomment-2033738710 How to determine that 8 coroutines for consumption are generated @bogerv -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Using WithConsumeGoroutineNums in pushconsumer may not work as expected. [rocketmq-client-go]
bogerv commented on issue #1135: URL: https://github.com/apache/rocketmq-client-go/issues/1135#issuecomment-2033827068 ` func OrderPersistListDealing(_accountType string, _b string, _q string) { err := rocketmq.RMQ().Consumer(rocketmq.ConsumerGroupPersist).Subscribe( rocketmq.GetTopicPersist(_b, _q), consumer.MessageSelector{Type: consumer.TAG, Expression: util.FormatSymbol(_b, _q)}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { wg := &sync.WaitGroup{} bulkInsertESOrder(orders, wg, &esErr) }) }` Inside the bulkInsertESOrder function, print logs after sleep 5 seconds. The result is that every 5 seconds, 8 logs will be printed at the same time. The scenario is an application process that starts multiple goroutine(OrderPersistListDealing is started multiple goroutine with different _b and _q), with each goroutine using the same consumer to subscribe multiple topics, and use MessageSelector to distinguish different data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Using WithConsumeGoroutineNums in pushconsumer may not work as expected. [rocketmq-client-go]
cserwen commented on issue #1135: URL: https://github.com/apache/rocketmq-client-go/issues/1135#issuecomment-2033836761 > ` func OrderPersistListDealing(_accountType string, _b string, _q string) { err := rocketmq.RMQ().Consumer(rocketmq.ConsumerGroupPersist).Subscribe( rocketmq.GetTopicPersist(_b, _q), consumer.MessageSelector{Type: consumer.TAG, Expression: util.FormatSymbol(_b, _q)}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { wg := &sync.WaitGroup{} bulkInsertESOrder(orders, wg, &esErr) }) }` Inside the bulkInsertESOrder function, print logs after sleep 5 seconds. The result is that every 5 seconds, 8 logs will be printed at the same time. > > The scenario is an application process that starts multiple goroutine(OrderPersistListDealing is started multiple goroutine with different _b and _q), with each goroutine using the same consumer to subscribe multiple topics, and use MessageSelector to distinguish different data. Cosume orderly ? Can you show the init code of rmq-consumer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Using WithConsumeGoroutineNums in pushconsumer may not work as expected. [rocketmq-client-go]
bogerv commented on issue #1135: URL: https://github.com/apache/rocketmq-client-go/issues/1135#issuecomment-2033849433 > > ` func OrderPersistListDealing(_accountType string, _b string, _q string) { err := rocketmq.RMQ().Consumer(rocketmq.ConsumerGroupPersist).Subscribe( rocketmq.GetTopicPersist(_b, _q), consumer.MessageSelector{Type: consumer.TAG, Expression: util.FormatSymbol(_b, _q)}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { wg := &sync.WaitGroup{} bulkInsertESOrder(orders, wg, &esErr) }) }` Inside the bulkInsertESOrder function, print logs after sleep 5 seconds. The result is that every 5 seconds, 8 logs will be printed at the same time. > > The scenario is an application process that starts multiple goroutine(OrderPersistListDealing is started multiple goroutine with different _b and _q), with each goroutine using the same consumer to subscribe multiple topics, and use MessageSelector to distinguish different data. > > Cosume orderly ? Can you show the init code of rmq-consumer? c, err = rocketmq.NewPushConsumer( consumer.WithNameServer(config.Config.NameServers), consumer.WithGroupName(group), consumer.WithConsumerOrder(true), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumeMessageBatchMaxSize(config.ConsumeMaxSize), ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Using WithConsumeGoroutineNums in pushconsumer may not work as expected. [rocketmq-client-go]
cserwen commented on issue #1135: URL: https://github.com/apache/rocketmq-client-go/issues/1135#issuecomment-2033860392 This para only takes effect for concurrent consumption. For order consumption, each queue has one and only one coroutine to consume. @bogerv -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Using WithConsumeGoroutineNums in pushconsumer may not work as expected. [rocketmq-client-go]
bogerv commented on issue #1135: URL: https://github.com/apache/rocketmq-client-go/issues/1135#issuecomment-2033901185 > This param only takes effect for concurrent consumption. For order-consumption, each queue has one and only one coroutine to consume. So does this mean that if we need to consume bulk data in sequence, each consumer will need to subscribe to a separate topic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove Synchronized Lock [rocketmq]
cserwen commented on PR #7912: URL: https://github.com/apache/rocketmq/pull/7912#issuecomment-2033927528 > It has been fixed in `develop` branch. 4.9.x only provides security patches or critical fixes, so this pr should be closed. Closed because of this @Tcytw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Remove Synchronized Lock [rocketmq]
cserwen closed pull request #7912: Remove Synchronized Lock URL: https://github.com/apache/rocketmq/pull/7912 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "Build and Run Tests by Bazel" failed!
The GitHub Actions job "Build and Run Tests by Bazel" on rocketmq.git has failed. Run started by GitHub user leizhiyuan (triggered by cserwen). Head commit for run: 23cede907dffcda98b3c7314e58fea94136fcb20 / leizhiyuan chore: revert some wrong code style Report URL: https://github.com/apache/rocketmq/actions/runs/8133527939 With regards, GitHub Actions via GitBox
[GH] (rocketmq): Workflow run "Build and Run Tests by Bazel" failed!
The GitHub Actions job "Build and Run Tests by Bazel" on rocketmq.git has failed. Run started by GitHub user leizhiyuan (triggered by cserwen). Head commit for run: 23cede907dffcda98b3c7314e58fea94136fcb20 / leizhiyuan chore: revert some wrong code style Report URL: https://github.com/apache/rocketmq/actions/runs/8133527939 With regards, GitHub Actions via GitBox
[I] [Bug] Unable to consume messages in order while using RocketMQ 5 client [rocketmq]
CodingOX opened a new issue, #7999: URL: https://github.com/apache/rocketmq/issues/7999 ### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment Unbuntu 20.04 ### RocketMQ version RocketMQ 5.2 ### JDK Version JDK 21 ### Describe the Bug When attempting to consume messages in order using the RocketMQ 5 client with a 2 master no any slaves, it is observed that the consumer is unable to consume messages sequentially. ### Steps to Reproduce Producer Code, Write in Kotlin ``` import cn.hutool.core.lang.generator.SnowflakeGenerator import org.apache.rocketmq.client.apis.ClientConfiguration import org.apache.rocketmq.client.apis.ClientServiceProvider import org.slf4j.Logger import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicLong fun main() { val log: Logger = LoggerFactory.getLogger("OCP") val provider = ClientServiceProvider.loadService() val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081" val clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build() val topic = "FIFOTopic" val tag = "tag-2" val producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build() val atomicLong = AtomicLong() val snowflakeGenerator = SnowflakeGenerator() val messageGroup = "OrderedGroup-1" try { val ts = System.currentTimeMillis() // repeat(30) { val id = snowflakeGenerator.next() val curIndex = atomicLong.incrementAndGet() val message = provider .newMessageBuilder() .setTag(tag) .setKeys(id.toString()) .setTopic(topic) .setMessageGroup(messageGroup) // Set MessageGroup For FIFO .setBody(("$ts - $curIndex").toByteArray()) .build() val receipt = producer.send(message) log.info("Send message: $curIndex - $id - ${receipt.messageId}") } } catch (ex: Exception) { log.error("Send message error", ex) } } ``` - The producer side seems to be correctly setting the message group for ensuring ordered message delivery. - Messages are being sent to the specified topic with message group `OrderedGroup-1`. Consumer Code ``` import cn.hutool.core.thread.ThreadUtil import org.apache.rocketmq.client.apis.ClientConfiguration import org.apache.rocketmq.client.apis.ClientServiceProvider import org.apache.rocketmq.client.apis.consumer.ConsumeResult import org.apache.rocketmq.client.apis.consumer.FilterExpression import org.apache.rocketmq.client.apis.consumer.FilterExpressionType import org.slf4j.Logger import org.slf4j.LoggerFactory import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.TimeUnit fun main() { val log: Logger = LoggerFactory.getLogger("ORC") val provider = ClientServiceProvider.loadService() val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081" val clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build() val topic = "FIFOTopic" val tag = "tag-2" val topicToTag = mapOf(topic to FilterExpression(tag, FilterExpressionType.TAG)) // val cq = ConcurrentLinkedQueue() provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setSubscriptionExpressions(topicToTag) .setConsumerGroup("OrderConsumerGroupV3") .setMessageListener { val msg = StandardCharsets.UTF_8.decode(it!!.body) cq.offer(msg.toString()) ConsumeResult.SUCCESS }.build() ThreadUtil.sleep(10, TimeUnit.SECONDS) log.info(cq.joinToString("\n")) ThreadUtil.sleep(1, TimeUnit.HOURS) } ``` - The consumer side is using a `ConcurrentLinkedQueue` to store received messages temporarily for logging purposes. - Messages are being consumed from the topic `FIFOTopic` with tag `tag-2`.
[I] add some info log in transactional messsage check [rocketmq]
wangjiao01 opened a new issue, #8000: URL: https://github.com/apache/rocketmq/issues/8000 ### Before Creating the Enhancement Request - [X] I have confirmed that this should be classified as an enhancement rather than a bug/feature. ### Summary In order to know explicitly what trigger the transactional message check method, I want to add some info log in org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check method. ### Motivation In broker version 4.3.2, I encounter a problem that transactional message will execute checkLocalTransaction method, even I return LocalTransactionState.COMMIT_MESSAGE in executeLocalTransaction method, but there is no log info to demonstrate why. ### Describe the Solution You'd Like add some log info for three boolean variable in check method. ### Describe Alternatives You've Considered add some log info for three boolean variable in check method. ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] add some info log in transactional messsage check [rocketmq]
wangjiao01 opened a new issue, #8001: URL: https://github.com/apache/rocketmq/issues/8001 ### Before Creating the Enhancement Request - [X] I have confirmed that this should be classified as an enhancement rather than a bug/feature. ### Summary In order to know explicitly what trigger the transactional message check method, I want to add some info log in org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check method. ### Motivation In broker version 4.3.2, I encounter a problem that transactional message will execute checkLocalTransaction method, even I return LocalTransactionState.COMMIT_MESSAGE in executeLocalTransaction method, but there is no log info ### Describe the Solution You'd Like add some log info for three boolean variable in check method. ### Describe Alternatives You've Considered add some log info for three boolean variable in check method. ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GH] (rocketmq): Workflow run "Build and Run Tests by Maven" is working again!
The GitHub Actions job "Build and Run Tests by Maven" on rocketmq.git has succeeded. Run started by GitHub user leizhiyuan (triggered by cserwen). Head commit for run: 9f316429b4189018fce0570e918735f82970af1e / leizhiyuan fix: use skip to fix the findAny always first Report URL: https://github.com/apache/rocketmq/actions/runs/8399725323 With regards, GitHub Actions via GitBox
Re: [I] [Bug] Unable to consume messages orderly for ordered message while using RocketMQ 5 client [rocketmq]
3424672656 commented on issue #7999: URL: https://github.com/apache/rocketmq/issues/7999#issuecomment-2034898876 rocketmq currently only supports sequential consumption for a single queue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] The incorrect incremental epoch when autoswitch ha. [rocketmq]
github-actions[bot] commented on issue #6505: URL: https://github.com/apache/rocketmq/issues/6505#issuecomment-2035832917 This issue was closed because it has been inactive for 3 days since being marked as stale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] MappedFile#warmMappedFile there may be code that doesn't work [rocketmq]
github-actions[bot] closed issue #5571: MappedFile#warmMappedFile there may be code that doesn't work URL: https://github.com/apache/rocketmq/issues/5571 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] The incorrect incremental epoch when autoswitch ha. [rocketmq]
github-actions[bot] closed issue #6505: The incorrect incremental epoch when autoswitch ha. URL: https://github.com/apache/rocketmq/issues/6505 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] MappedFile#warmMappedFile there may be code that doesn't work [rocketmq]
github-actions[bot] commented on issue #5571: URL: https://github.com/apache/rocketmq/issues/5571#issuecomment-2035832969 This issue was closed because it has been inactive for 3 days since being marked as stale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Feature Request] Add `imagePullSecrets` to service account in operator helm chart [rocketmq-operator]
DavidSonoda opened a new issue, #221: URL: https://github.com/apache/rocketmq-operator/issues/221 **FEATURE REQUEST** 1. Please describe the feature you are requesting. Allow the operator helm chart to specify `imagePullSecrets` in the serviceAccount. 2. Provide any additional detail on your proposed use case for this feature. When building operator image with different rocketmq version and pushed to private registry that requires auth, `imagePullSecrets` are needed. 2. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue? - importance: should-have - workaround now: manually modify the `charts/rocketmq-operator/templates/serviceaccount.yaml` manifest to add `imagePullSecrets` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [ISSUE #221] Add `imagePullSecrets` to operator helm chart SA [rocketmq-operator]
DavidSonoda opened a new pull request, #222: URL: https://github.com/apache/rocketmq-operator/pull/222 ## What is the purpose of the change Close #221. *Why*: When building operator image with different rocketmq version and pushed to private registry that requires auth, imagePullSecrets are needed. *How*: Added configurable `imagePullSecrets` to `values.yaml` and corresponding template to `serviceaccount.yaml` ## Brief changelog ## Verifying this change **Please go through this checklist to help us incorporate your contribution quickly and easily.** Notice: `It would be helpful if you could finish the following checklist (the last one is not necessary) before request the community to review your PR`. - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq-operator/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body. - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Check RBAC rights for Kubernetes roles. - [ ] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. - [ ] Run `make docker-build` to build docker image for operator, try your changes from Pod inside your Kubernetes cluster, **not just locally**. Also provide screenshots to show that the RocketMQ cluster is healthy after the changes. - [ ] Before committing your changes, remember to run `make manifests` to make sure the CRD files are updated. - [ ] Update documentation if necessary. - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org