import asyncio
import threading
import collections
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.base as s_base
[docs]
class AQueue(s_base.Base):
'''
An async queue with chunk optimized sync compatible consumer.
'''
async def __anit__(self):
await s_base.Base.__anit__(self)
self.fifo = []
self.event = asyncio.Event()
self.onfini(self.event.set)
[docs]
def put(self, item):
'''
Add an item to the queue.
'''
if self.isfini:
return False
self.fifo.append(item)
if len(self.fifo) == 1:
self.event.set()
return True
[docs]
async def slice(self):
# sync interface to the async queue
if len(self.fifo) == 0:
await self.event.wait()
retn = list(self.fifo)
self.fifo.clear()
self.event.clear()
return retn
[docs]
class Queue:
'''
An asyncio Queue with batch methods and graceful close.
'''
def __init__(self, maxsize=None):
self.q = asyncio.Queue(maxsize=maxsize)
self.closed = False
[docs]
async def close(self):
await self.q.put(s_common.novalu)
self.closed = True
[docs]
async def put(self, item):
if self.closed:
mesg = 'The Queue has been closed.'
raise s_exc.BadArg(mesg=mesg)
await self.q.put(item)
[docs]
async def size(self):
size = self.q.qsize()
if self.closed:
size -= 1
return size
[docs]
async def puts(self, items):
if self.closed:
mesg = 'The Queue has been closed.'
raise s_exc.BadArg(mesg=mesg)
for item in items:
await self.q.put(item)
[docs]
async def slice(self, size=1000):
if self.closed and self.q.qsize() == 0:
return None
items = []
item = await self.q.get()
if item is s_common.novalu:
return None
items.append(item)
size -= 1
for i in range(min(size, self.q.qsize())):
item = await self.q.get()
if item is s_common.novalu:
break
items.append(item)
return items
[docs]
async def slices(self, size=1000):
while True:
items = await self.slice(size=size)
if items is None:
return
yield items
[docs]
class Window(s_base.Base):
'''
A Queue like object which yields added items. If the queue ever reaches
its maxsize, it will be fini()d. On fini(), the Window will continue to
yield results until empty and then return.
'''
async def __anit__(self, maxsize=None):
await s_base.Base.__anit__(self)
self.maxsize = maxsize
self.event = asyncio.Event()
self.linklist = collections.deque()
async def fini():
self.event.set()
self.onfini(fini)
async def __aiter__(self):
while True:
if self.linklist:
yield self.linklist.popleft()
continue
if self.isfini:
return
self.event.clear()
await self.event.wait()
[docs]
async def put(self, item):
'''
Add a single item to the Window.
'''
if self.isfini:
return False
self.linklist.append(item)
self.event.set()
if self.maxsize is not None and len(self.linklist) >= self.maxsize:
await self.fini()
return True
[docs]
async def puts(self, items):
'''
Add multiple items to the window.
'''
if self.isfini:
return False
self.linklist.extend(items)
self.event.set()
if self.maxsize is not None and len(self.linklist) >= self.maxsize:
await self.fini()
return True