网上看到的生产者消费者的代码大部分都是执行有限次数的。最近项目中用到类型业务模型。也就模仿的写了一个。该代码生产者每2秒生产一个值,消费者每1秒消费一个值。
import asyncio class Test(object): def __init__(self, loop=None): self._queue = asyncio.Queue(loop=loop) self._future = asyncio.Future(loop=loop) async def _producer(self, interval): v = 0 while True: await asyncio.sleep(interval) print('add value to queue:',str(v)) await self._queue.put(v) v = v + 1 async def _consumer(self): while True: try: r =await asyncio.wait_for(self._queue.get(), timeout=1.0) # r = await self._queue.get() print('consumer value>>>>>>>>>>>>>>>>>>', r) except asyncio.TimeoutError: print('get value timeout') continue except: break print('quit') async def run(self): asyncio.ensure_future(self._producer(2)) asyncio.ensure_future(self._consumer()) loop = asyncio.get_event_loop() t = Test(loop=loop) asyncio.ensure_future(t.run()) try: loop.run_forever() except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()
输出结果:
get value timeout add value to queue: 0 consumer value>>>>>>>>>>>>>>>>>> 0 get value timeout add value to queue: 1 get value timeout consumer value>>>>>>>>>>>>>>>>>> 1 get value timeout