Skip to content

Test Utilities


asyncTestOfProcess(asyncProcessFunc=None)

The asyncTestOfProcess decorator expects as an argument an async coroutine which runs the long running asynchronous process to be tested. The asyncTestOfProcess decorates another async coroutine containing the tests and assertions used to test the long running process itself. Both the argument and the decorated function MUST be async coroutines, AND the process needs to place "things" into the t.asyncTestQueue, AND the decorated function MUST await on t.asyncTestQueue.get() to allow the tested process time to function.

Source code in tests/testUtils.py
def asyncTestOfProcess(asyncProcessFunc=None) :
  """

  The asyncTestOfProcess **decorator** expects as an *argument* an async
  coroutine which runs the long running asynchronous process to be tested.
  The asyncTestOfProcess *decorates* another async coroutine containing
  the tests and assertions used to test the long running process itself.
  Both the argument and the decorated function MUST be async coroutines,
  AND the process needs to place "things" into the t.asyncTestQueue, AND
  the decorated function MUST await on t.asyncTestQueue.get() to allow the
  tested process time to function.

  """
  def decorateTest(asyncTestFunc) :
    @functools.wraps(asyncTestFunc)
    def wrappedTest(t) :
      async def asyncRunner() :
        t.asyncTestQueue = asyncio.Queue()

        if asyncProcessFunc :
          if not asyncio.iscoroutinefunction(asyncProcessFunc) :
            t.fail("The process being tested MUST be a async coroutine!")
          processRunner = asyncio.create_task(asyncProcessFunc(t))

        if not asyncio.iscoroutinefunction(asyncTestFunc) :
          t.fail("The test being run on the process MUST be a async coroutine!")
        asyncTestFuture = asyncio.get_event_loop().create_future()
        async def wrappedAsyncTestFunc() :
          try :
            await asyncTestFunc(t)
            asyncTestFuture.set_result(None)
          except asyncio.CancelledError :
            pass
          except Exception as err :
            asyncTestFuture.set_exception(err)
        testRunner = asyncio.create_task(wrappedAsyncTestFunc())
        await asyncTestFuture

      try :
        asyncio.run(asyncRunner())
      except asyncio.CancelledError :
        pass

    return wrappedTest

  return decorateTest