yang1hu commented on issue #161:
URL:
https://github.com/apache/rocketmq-client-python/issues/161#issuecomment-4233192041
> 能描述的具体一些吗,提供一下具体的场景和日志
def send_task_message(tag: str, business_type: str, payload: dict) -> str:
"""
向 MQ 推送一条任务消息,返回 message_id。
tag: conf.MQ_TASK_SCAM_TAG 或 conf.MQ_TASK_DETAIL_TAG
"""
if not mq_producer:
raise RuntimeError("MQ Producer 尚未初始化")
msg = Message(conf.MQ_TOPIC)
body = {
"seq": str(uuid.uuid4()),
"businessType": business_type,
"localTxCheckBeanType": "",
"businessKey": str(uuid.uuid4()),
"msgJson": json.dumps(payload, ensure_ascii=False),
"createTime": int(time.time() * 1000),
}
msg.set_body(json.dumps(body, ensure_ascii=False))
msg.set_tags(tag)
msg.set_keys(body["seq"])
ret = mq_producer.send_message_in_transaction(msg,
_local_transaction_execute, "")
logger.info(f"📤 [任务推送] businessType:{business_type} tag:{tag}
msg_id:{ret.msg_id}")
return ret.msg_id
这个是我的发送代码,然后应用场景是在 ```python
try:
for item in request.items:
message_id = send_task_message(
tag=conf.MQ_TASK_DETAIL_TAG,
business_type='DETAIL_QUERY',
payload={"url": item.url, "batch_id": item.batch_id},
)
message_ids.append(message_id)
logger.info("async detail task submitted, url:{} batch_id:{}
message_id:{}".format(item.url, item.batch_id, message_id))
return {"success": True, "message_ids": message_ids, "message": ""}
except Exception as e:
logger.exception(e)
message = {
"message": "批量提交detail任务到MQ失败: {}".format(e),
"loggerName": os.path.abspath(__file__).replace(os.sep, '.')
}
logger.error(get_warning_message(message,
BizKeyAndDesc.MQ_CONSUME_ERROR))
return {"success": False, "message_ids": message_ids, "message":
str(e)}
``` 一份fastapi的服务,中,当这个接口并发调用的时候,K8S中的139(内存崩溃的问题)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]