On 4/19/21 10:26 PM, John Snow wrote:
On 4/15/21 5:52 AM, Stefan Hajnoczi wrote:
Yeah, it seems very nice for allowing multiple event listeners that
don't steal each other's events. I like it.
qmp.event_listener() could take a sequence of QMP event names to trigger
on. If the sequence is empty then all QMP events will be reported.
I made something like this:
# Example 1
with qmp.listener('STOP') as listener:
await qmp.execute('stop')
await listener.get()
# Example 2
with qmp.listener('JOB_STATUS_CHANGE') as listener:
await qmp.execute('blockdev-create', ...)
async for event in listener:
if event['data']['status'] == 'concluded':
break
await qmp.execute('job-dismiss', ...)
# Example 3 - all events
with qmp.listener() as events:
async for event in events:
print(f"got '{event['event']}' event!")
# Example 4 - several events on one listener
job_events = (
'JOB_STATUS_CHANGE', 'BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 'BLOCK_JOB_PENDING'
)
with qmp.listener(job_events) as events:
...
There is a *post-filtering* syntax available to EventListener.get(). It
will filter events out using a very simplistic syntax.
# Example 5 -- a short-hand form of Example 2.
with qmp.listener('JOB_STATUS_CHANGE') as job_events:
await qmp.execute('blockdev-create', ...)
await job_events.get(status='concluded')
await qmp.execute('job-dismiss', ...)
A shortcoming with this interface is that it's easy to create a listener
that hears multiple events, but it's not easy to create *several
listeners*. I am not sure what syntax will be the nicest for this, but I
tried by allowing the manual creation of listeners:
# Example 6
listener1 = EventListener('JOB_STATUS_CHANGE')
listener2 = EventListener(job_events)
# Note the use of listen() instead of listener()
with qmp.listen(listener1, listener2) as (ev1, ev2):
# listeners are now active.
...
# listeners are now inactive.
# The context manager clears any stale events in the listener(s).
I thought this might be nicer than trying to extend the listener syntax:
with qmp.listeners(
'JOB_STATUS_CHANGE',
(job_events)
) as (
listener1,
listener2,
):
...
especially because it might get confusing when trying to separate "one
listener with multiple events" vs "several listeners with one event
each, and it makes things a little ambiguous:
with qmp.listeners('STOP') as (stop_events,):
...
And this isn't any prettier, and also likely to confuse:
with qmp.listeners('STOP', 'RESUME') as (stops, resumes):
...
because it's only so very subtly different from this:
with qmp.listeners(('STOP', 'RESUME')) as (runstate_events,):
...
This also doesn't begin to address one of the worst headaches of writing
iotests where transactions are involved: accidentally eating events
meant for other jobs.
I prototyped something where it's possible to create an EventListener
with an optional pre-filter, but it's a little bulky:
# Example 7
listener = EventListener('JOB_STATUS_CHANGE',
lambda e: e['data']['id'] == 'job0')
with qmp.listen(listener):
await qmp.execute('blockdev-create', arguments={'job-id': 'job0'})
await listener.get(status='created')
...
Some thoughts on this:
- Pre-filters are powerful, but involve a lot of boilerplate.
- Accepting two kinds of parameters, name(s) and filter both, makes it
even trickier to write concise context blocks; especially with multiple
jobs.
Here's a final example of something you may very well want to do in
iotest code:
# Example 8
def job_filter(job_id: str) -> EventFilter:
def filter(event: Message) -> bool:
return event.get('data', {}).get('id') == job_id
return filter
listener1 = EventListener('JOB_STATUS_CHANGE', job_filter('job0'))
listener2 = EventListener('JOB_STATUS_CHANGE', job_filter('job1'))
with qmp.listen(listener1, listener2) as (job0, job1):
await asyncio.gather(
qmp.execute('blockdev-create', arguments={'job-id': 'job0'}),
qmp.execute('blockdev-create', arguments={'job-id': 'job1'}),
job0.get(status='concluded'),
job1.get(status='concluded')
)
(Note: gather isn't required here. You could write the execute and get
statements individually and in whichever order you wanted, as long as
the execute statement for a given job appears prior to the corresponding
wait!)
The difficulty I have here is extending that backwards to the "create
listener on the fly" syntax, for the reasons stated above with making it
ambiguous as to whether we're creating one or two listeners, etc. Trying
to minimize boilerplate while leaving the interfaces generic and
powerful is tough.
I'm still playing around with different options and solutions, but your
feedback/input is welcome.
--js
Oh, though of course, the moment I sent this, I realized there is
actually a somewhat nicer way to do this in non-test code that doesn't
care about ordering, but still wouldn't work for QMP transactions; but
it's nice to look at:
# Example 9 -- Multiple jobs without a transaction:
async def blockdev_create(qmp, job_id: str, options: Dict[str, Any]):
with qmp.listener('JOB_STATUS_CHANGE') as listener:
await qmp.execute('blockdev-create', arguments={
'job-id': job_id,
'options': options,
})
await listener.get(id=job_id, status='concluded')
await qmp.execute('job-dismiss', arguments={'id': job_id})
await listener.get(id=job_id, status='null')
await asyncio.gather(
blockdev_create(qmp, 'job2', {...}),
blockdev_create(qmp, 'job3', {...}),
)
It won't work for transactions because we spawn multiple IDs with a
single command in a single context. You could remedy it by creating
multiple listeners and just being very careful to always use just one
per each job, but that's likely prone to failure and hard to catch on
reviews, etc.
--js