Test Utilities
asyncTestOfProcess(asyncProcessFunc=None)
The asyncTestOfProcess decorator expects as an argument an async coroutine which runs the long running asynchronous process to be tested. The asyncTestOfProcess decorates another async coroutine containing the tests and assertions used to test the long running process itself. Both the argument and the decorated function MUST be async coroutines, AND the process needs to place "things" into the t.asyncTestQueue, AND the decorated function MUST await on t.asyncTestQueue.get() to allow the tested process time to function.
Source code in tests/testUtils.py
def asyncTestOfProcess(asyncProcessFunc=None) :
"""
The asyncTestOfProcess **decorator** expects as an *argument* an async
coroutine which runs the long running asynchronous process to be tested.
The asyncTestOfProcess *decorates* another async coroutine containing
the tests and assertions used to test the long running process itself.
Both the argument and the decorated function MUST be async coroutines,
AND the process needs to place "things" into the t.asyncTestQueue, AND
the decorated function MUST await on t.asyncTestQueue.get() to allow the
tested process time to function.
"""
def decorateTest(asyncTestFunc) :
@functools.wraps(asyncTestFunc)
def wrappedTest(t) :
async def asyncRunner() :
t.asyncTestQueue = asyncio.Queue()
if asyncProcessFunc :
if not asyncio.iscoroutinefunction(asyncProcessFunc) :
t.fail("The process being tested MUST be a async coroutine!")
processRunner = asyncio.create_task(asyncProcessFunc(t))
if not asyncio.iscoroutinefunction(asyncTestFunc) :
t.fail("The test being run on the process MUST be a async coroutine!")
asyncTestFuture = asyncio.get_event_loop().create_future()
async def wrappedAsyncTestFunc() :
try :
await asyncTestFunc(t)
asyncTestFuture.set_result(None)
except asyncio.CancelledError :
pass
except Exception as err :
asyncTestFuture.set_exception(err)
testRunner = asyncio.create_task(wrappedAsyncTestFunc())
await asyncTestFuture
try :
asyncio.run(asyncRunner())
except asyncio.CancelledError :
pass
return wrappedTest
return decorateTest