python asyncio 生产者消费者模型实现

网上看到的生产者消费者的代码大部分都是执行有限次数的。最近项目中用到类型业务模型。也就模仿的写了一个。该代码生产者每2秒生产一个值,消费者每1秒消费一个值。

import asyncio


class Test(object):
    def __init__(self, loop=None):
        self._queue = asyncio.Queue(loop=loop)
        self._future = asyncio.Future(loop=loop)

    async def _producer(self, interval):
        v = 0
        while True:
            await asyncio.sleep(interval)
            print('add value to queue:',str(v))
            await self._queue.put(v)
            v = v + 1

    async def _consumer(self):
        while True:
            try:
                r =await  asyncio.wait_for(self._queue.get(), timeout=1.0)
                # r = await self._queue.get()
                print('consumer value>>>>>>>>>>>>>>>>>>', r)
            except asyncio.TimeoutError:
                print('get value timeout')
                continue
            except:
                break
        print('quit')

    async def run(self):
        asyncio.ensure_future(self._producer(2))
        asyncio.ensure_future(self._consumer())


loop = asyncio.get_event_loop()
t = Test(loop=loop)
asyncio.ensure_future(t.run())
try:
    loop.run_forever()
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

输出结果:

get value timeout
add value to queue: 0
consumer value>>>>>>>>>>>>>>>>>> 0
get value timeout
add value to queue: 1
get value timeout
consumer value>>>>>>>>>>>>>>>>>> 1
get value timeout

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部