Async generators too evil for Python?
Henrik Holst
Ethology researcher spending most of his time deep down in the comments section searching for intelligent life
Suppose we are getting data from an endpoint and these chunks needs to be handled by a worker. The output of the workers are consumed by an unimportant serialization, in the image below it sums things up but it could be anything.
The idea is that we want all of the aspects to work concurrently but not unbounded. If the chunk generator is very fast we don't want it to produce 1000s of chunks but only sufficient number of chunks to keep the chunk consumers occupied. In the same way we don't need the chunk consumers to produce more data than the sum consumer handles.
This can be done by thread workers and queues and it would be fairly straightforward to implement in Python.
However, there is a more interesting way to do it with asyncio.
First the chunk generator. It produces 10 sequences of numbers spanning all integers from 0 to 99.
async def gen(nam="gen"):
for x in range(10):
r = range(10*x, 10*(x+1))
print(f"{nam} produced range: [{10*x},{10*(x+1)})")
yield r
await asyncio.sleep(1)
Then the chunk consumer. It simply takes a chunk and emits the elements in the consumed range, one by one. This to make things more interesting and not assume a 1:1 correspondence between the inputs and the outputs.
领英推荐
async def flatten(nam, s):
Inf = float('Inf')
while True:
try:
x = (yield)
except (GeneratorExit, CancelledError):
break
else:
m0 = +Inf
m1 = -Inf
for y in x:
m0 = min(m0, y)
m1 = max(m1, y)
await s.asend(y)
await asyncio.sleep(0.2)
print(f"{nam} consumed range: [{m0},{m1+1})")
And finally, the consumer that consumes the emitted output streams.
The important thing to note here is that the yield statement will only happen whence execution is yielded to it via its asend method.
async def reduce(nam="reduce"):
s = 0
while True:
try:
x = (yield)
s = s + x
except (GeneratorExit, CancelledError):
break
print(f"{nam} sum: {s}")
There is one thing missing still, the secret sauce, and the thing that makes this look, kind of evil. We had two chunk consumers, and the final consumer just consumes one generator. To join the two streams we need the Dealer. Note that the evil dealer is not a generator, but a co-routine.
async def dealer(g, a, b):
print("starting dealer")
tasks = set()
async for x in g:
t = asyncio.create_task(a.asend(x))
a, b = b, a
tasks.add(t)
if len(tasks) >= 2:
_, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
await asyncio.wait(tasks)
print("closing down dealer")
If we naively try and run the dealer with await a.asend it will not work it will not trigger two concurrent chunk consumers. The problem is the await. The generator needs to have two co-routines in flight (two calls to asend) to feed both the chunk consumers concurrently.
Feel free to experiment with this and see if you think this is useful or not, or if its just an evil way-of-working that does not belong in Good Python.
Ultimately my "review": a fun experiment, but threading and queues is just so much obvious and I don't see a performance benefit of using asyncio over threading and queue.
Full source for the playground:
Ethology researcher spending most of his time deep down in the comments section searching for intelligent life
4 个月Some intelligent thoughts on the matter: https://berb.github.io/diploma-thesis/original/043_threadsevents.html Theads and queues is to me the perfect mix that gives me the best control with the least complexity and avoids most of the "magic thinking" fallacy around complex systems in general. This is a well understood pattern called Actor model. The only architecture pattern better than sliced bread.