[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-02-21 Thread Andrey Paramonov


Andrey Paramonov  added the comment:

Might as_completed() be considered a low-level API, but as of Python 3.7 there 
are seemingly no ready alternatives to achieve proposed behavior. All of 
asyncio.gather(), asyncio.wait(), asyncio.as_completed() expect awaitables list 
of limited size; doing something like 
https://www.artificialworlds.net/blog/2017/06/12/making-100-million-requests-with-python-aiohttp
 is not straightforward.

A function that takes iterator/async iterator of tasks and is itself 
generator/async generator is very much wanted, something in the spirit of (but 
more efficient?)

async def igather(tasks, limit=None):
pending = set()
while True:
for task in islice(tasks, limit - len(pending) if limit else None):
pending.add(task)
if pending:
done, pending = await asyncio.wait(pending, 
return_when=asyncio.FIRST_COMPLETED)
for task in done:
yield task
else:
break


It is an open question whether such function should yield results in the task 
submission order. Albeit useful, it's a bit harder to implement and (most 
importantly) has terrible worst-case memory behavior.

See also:
https://bugs.python.org/issue33533
https://github.com/h2non/paco/issues/38

--
nosy: +aparamon

___
Python tracker 
<https://bugs.python.org/issue30782>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-02-25 Thread Andrey Paramonov


Andrey Paramonov  added the comment:

> an implicit requirement that back pressure from the consumer should be 
> handled (i.e. if whoever's iterating through "async for fut in 
> as_completed(...)" is too slow, then the tasks should pause until it catches 
> up)

No, I don't think it is required or desired to be handled.

My initial sketch was imprecise: it's better to asynchronously yield task 
results, not "completed task" futures. This way, no additional buffer needed, 
all error handling can be consolidated inside `igather()`, and that's actually 
more compatible with `asyncio.gather()`.

I.e, instead of

yield next_fut

use

yield await next_fut


--

___
Python tracker 
<https://bugs.python.org/issue30782>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue30782] Allow limiting the number of concurrent tasks in asyncio.as_completed

2019-04-16 Thread Andrey Paramonov


Andrey Paramonov  added the comment:

Hello!

Below is updated implementation containing more consistent error handling.
The main rough edges encountered:

1. asyncio.Queue alone proved insufficient for precise control of limit, as 
asyncio.create_task() schedules created Task() immediately and it may start 
executing before being added to queue (thus leading to limit+1 tasks running). 
Additional semaphore is used to tackle that.

2. When exception, other running tasks have to be cancel()ed and then await'ed 
to ensure all tasks are successfully finished by the time igather exits. Just 
cancel()ing proved not sufficient.

3. When exception, unscheduled coroutines have to be wrapped with 
asyncio.create_task(coro).cancel() to avoid RuntimeWarning "coroutine was never 
awaited". But maybe there is a more elegant way to suppress this warning for a 
coroutine?

In my client code I didn't so far encounter "an implicit requirement that back 
pressure from the consumer should be handled", but it should be possible to 
implement separately and quite straightforwardly, with the help of 
asyncio.Queue.

async def igather(coros, limit=None):
coros = iter(coros)

buf = asyncio.Queue()
sem = asyncio.Semaphore(limit or math.inf)

async def submit(coros, buf):
while True:
await sem.acquire()
try:
# TODO: additionally support async iterators
coro = next(coros)
except StopIteration:
break
task = asyncio.create_task(coro)
buf.put_nowait(task)
await buf.put(None)

async def consume(buf):
while True:
task = await buf.get()
if task:
v = await asyncio.wait_for(task, None)
sem.release()
yield v
else:
break

submit_task = asyncio.create_task(submit(coros, buf))
try:
async for result in consume(buf):
yield result
except:
submit_task.cancel()
# cancel scheduled
while not buf.empty():
task = buf.get_nowait()
if task:
task.cancel()
try:
await task
except:
pass
# cancel pending
for coro in coros:
asyncio.create_task(coro).cancel()
raise

Shall I go ahead and prepare a PR with docs and tests?

--

___
Python tracker 
<https://bugs.python.org/issue30782>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue34905] Cannot assign memoryview values from array.array

2018-10-05 Thread Andrey Paramonov


New submission from Andrey Paramonov :

Currently, memoryview values can be assigned from all bytes-like objects 
(https://docs.python.org/3/glossary.html#term-bytes-like-object) except byte 
array.array:


import array

mview = memoryview(bytearray(b'hello'))

mview[:] = bytes(b'hello')  # success
mview[:] = bytearray(b'hello')  # success
mview[:] = memoryview(b'hello')  # success
mview[:] = array.array('b', b'hello')  # fail

mview[:] = array.array('b', b'hello')
ValueError: memoryview assignment: lvalue and rvalue have different structures


--
components: Library (Lib)
messages: 327133
nosy: aparamon
priority: normal
severity: normal
status: open
title: Cannot assign memoryview values from array.array
type: behavior
versions: Python 3.6

___
Python tracker 
<https://bugs.python.org/issue34905>
___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com