git-hulk commented on issue #2900:
URL: https://github.com/apache/kvrocks/issues/2900#issuecomment-2837848286
@siren, Thanks for your detailed information. It's really helpful for us to
identify this issue. We suspect this issue should be caused by the data race
between the transaction mode and blocking requests.
You could help to verify this issue disappeared after disabling the
transaction in pipeline with `r.pipeline(transaction=False)`.
The following scripts cannot reproduce the crash on my side, but it could
prove that the suspect:
```Python
import redis
import time
import threading
def consumer_worker(r, queue_name):
"""Consumer that uses blocking pop to wait for messages"""
print(f"Consumer: Waiting for messages on {queue_name}...")
while True:
try:
# Use BRPOP to block until a message is available (with 5 second
timeout)
result = r.brpop(queue_name, timeout=5)
if result:
queue, message = result
print(f"Consumer: Received '{message.decode('utf-8')}' from
{queue.decode('utf-8')}")
else:
# No message received within timeout period
print("Consumer: No messages, still waiting...")
except redis.ConnectionError:
# This will happen when we close the connection
break
except Exception as e:
print(f"Consumer error: {e}")
break
def main():
# Create Redis clients
producer = redis.Redis(host='localhost', port=6666, db=0)
consumer = redis.Redis(host='localhost', port=6666, db=0)
queue_name = "demo-queue"
try:
# Clear any existing data
producer.delete(queue_name)
# Start consumer in a separate thread
for i in range(1,5):
consumer_thread = threading.Thread(
target=consumer_worker,
args=(consumer, queue_name),
daemon=True
)
consumer_thread.start()
# Producer: push messages to the queue
for i in range(1, 60):
time.sleep(6)
message = f"Message {i}"
pipe = producer.pipeline()
pipe.lpush(queue_name, message)
results = pipe.execute()
for msg in results:
print(msg)
print(f"Producer: Pushed '{message}'")
# Wait a bit between messages
print("Producer: Done pushing messages")
# Wait to see all messages being consumed
time.sleep(70)
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up (this will cause the consumer to exit)
print("Disconnecting from Redis")
producer.close()
consumer.close()
if __name__ == "__main__":
main()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]