Skip to content

Python code details

cphttp

Implement a very simple reloading HTTP server for ComputePods

DebouncingTimer

A simple debouncing timer which ensures we wait until any high frequency events have stopped.

cancelTask(self)

Cancel the current timer.

Source code in cphttp/cphttp.py
def cancelTask(self) :
  """Cancel the current timer."""

  if self.taskFuture :
    self.taskFuture.cancel()

doTask(self) async

Sleep for timeout seconds while waiting to be (potentially) cancelled. If we are not cancelled, send a reload message to the browser.

Source code in cphttp/cphttp.py
async def doTask(self) :
  """Sleep for timeout seconds while waiting to be (potentially)
  cancelled. If we are not cancelled, send a `reload` message to the
  browser."""

  await asyncio.sleep(self.timeout)
  try :
    print("testing for reload")
    rootPaths = self.theWatcher.getRootPaths()
    for aPath in rootPaths :
      if not os.path.exists(aPath) :
        self.logger.debug("Root path [{}] does NOT exist".format(aPath))
        await self.reStart()
        return
    numWatches, numUnWatches = self.theWatcher.getWatchStats()
    print("watches: {} unWatches: {}".format(numWatches, numUnWatches))
    if numWatches + 2 < numUnWatches :
      await self.reStart()
      return
    self.theWatcher.clearWatchStats()
    self.logger.debug("Sending RELOAD message to browser")
    await heartBeatQueue.put("reload")
  except Exception as err :
    print(repr(err))

reStart(self) async

Restart the timer (cancel the old one if it exists).

Source code in cphttp/cphttp.py
async def reStart(self) :
  """Restart the timer (cancel the old one if it exists)."""

  self.cancelTask()
  self.taskFuture = asyncio.ensure_future(self.doTask())

configureWebServer(cliArgs)

Configure the Hypercorn webserver.

Source code in cphttp/cphttp.py
def configureWebServer(cliArgs) :
  """ Configure the Hypercorn webserver."""

  config = Config()
  config.bind      = [ cliArgs.host+':'+str(cliArgs.port) ]
  config.loglevel  = cliArgs.loglevel
  config.accesslog = cliArgs.accesslog
  config.errorlog  = cliArgs.errorlog
  config.log # Force the config object to instantiate the loggers

  return (
    logging.getLogger('hypercorn.access'),
    config
  )

cphttp()

Parse the command line arguments, configure hypercorn, setup the /heartBeat Server Sent Events handler using sse_starlette, and then run the server.

Source code in cphttp/cphttp.py
def cphttp() :
  """ Parse the command line arguments, configure hypercorn, setup the
  /heartBeat Server Sent Events handler using sse_starlette, and then run
  the server. """

  argparser = argparse.ArgumentParser(
    description="A very simple reloading Http server using SSE Starlette and Hypercorn."
  )
  argparser.add_argument("-H", "--host", default="localhost",
    help="The host interface to listen on (default: localhost)"
  )
  argparser.add_argument("-p", "--port", default=8008,
    help="The port to listen on (default: 8008)"
  )
  argparser.add_argument("-d", "--directory", default="html",
    help="The html directory to serve static files from (default: html)"
  )
  argparser.add_argument("-v", "--verbose", default=False,
    action=argparse.BooleanOptionalAction,
    help="provide more information about what is happening"
  )
  argparser.add_argument("-a", "--accesslog", default='-',
    help="specify a file for the access log (default: stdout)"
  )
  argparser.add_argument("-e", "--errorlog", default='-',
    help="specify a file for the error log (default: stdout)"
  )
  argparser.add_argument("-l", "--loglevel", default='INFO',
    help="specify the access/error logging level (default: INFO)"
  )
  argparser.add_argument("-w", "--watch", default=[], action='append',
    help="sepcify the directories/files to watch (can be used multiple times) (default: none)"
  )
  cliArgs = argparser.parse_args()
  logger, config = configureWebServer(cliArgs)

  # setup the asyncio loop

  loop = asyncio.get_event_loop()
  loop.set_debug(cliArgs.verbose)
  loop.add_signal_handler(signal.SIGTERM, signalHandler, "SIGTERM", logger)
  loop.add_signal_handler(signal.SIGHUP,  signalHandler, "SIGHUP",  logger)
  loop.add_signal_handler(signal.SIGINT,  signalHandler, "SIGINT",  logger)
  loop.run_until_complete(runUntilShutdown(cliArgs, logger, config))

  logger.info("Finised serving")
  logging.shutdown()

heartBeatBeater()

The SSE generator of messages to be sent over the /heartBeat SSE.

Using an asyncio.Queue allows the counter and reload messages to be interleaved.

Source code in cphttp/cphttp.py
async def heartBeatBeater() :
  """ The SSE generator of messages to be sent over the /heartBeat SSE.

  Using an asyncio.Queue allows the counter and reload messages to be
  interleaved. """

  while heartBeatContinueBeating:
    theMessage = await heartBeatQueue.get()
    yield dict(data=json.dumps(theMessage))
    heartBeatQueue.task_done()

heartBeatCounter() async

A (slow) counter to act as messages over the /heartBeat SSE which help keep the connection open when run through an HTTP-proxy.

Source code in cphttp/cphttp.py
async def heartBeatCounter() :
  """ A (slow) counter to act as messages over the /heartBeat SSE which
  help keep the connection open when run through an HTTP-proxy. """

  count = 0
  while heartBeatContinueBeating:
    await asyncio.sleep(2)
    await heartBeatQueue.put(str(count))
    count = count + 1

heartBeatSSE(request) async

Implement the /heartBeat SSE end point.

Start the long running counter and pass it to the sse_starlette EventSourceResponse.

Source code in cphttp/cphttp.py
async def heartBeatSSE(request) :
  """ Implement the /heartBeat SSE end point.

  Start the long running counter and pass it to the sse_starlette
  EventSourceResponse. """

  asyncio.create_task(heartBeatCounter())

  beater = heartBeatBeater()
  return EventSourceResponse(beater)

runWebServer(cliArgs, logger, config) async

Setup the Starlette Application and run it.

Source code in cphttp/cphttp.py
async def runWebServer(cliArgs, logger, config) :
  """Setup the Starlette Application and run it."""

  app = Starlette(debug=cliArgs.verbose)

  app.add_route(
    '/heartBeat',
    heartBeatSSE,
    name='heartBeat'
  )
  app.mount(
    '/',
    StaticFiles(directory=cliArgs.directory, html=True),
    name='home'
  )

  logger.info("Serving static files from [{}]".format(cliArgs.directory))

  for aRoute in app.routes :
    logger.info("MountPoint: [{}]".format(aRoute.path))

  await serve(app, config, shutdown_trigger=shutdownHypercorn.wait)

signalHandler(signum, logger)

Handle an OS system signal by stopping the heartBeat

Source code in cphttp/cphttp.py
def signalHandler(signum, logger) :
  """ Handle an OS system signal by stopping the heartBeat """

  print("")
  logger.info("SignalHandler: Caught signal {}".format(signum))
  stopHeartBeat()
  shutdownHypercorn.set()

stopWebServer()

Tell the Hypercorn server to stop.

Source code in cphttp/cphttp.py
def stopWebServer() :
  """Tell the Hypercorn server to stop."""

  shutdownHypercorn.set()

watchFiles(cliArgs, logger) async

Setup the file system watcher.

Source code in cphttp/cphttp.py
async def watchFiles(cliArgs, logger) :
  """Setup the file system watcher."""

  aWatcher = FSWatcher(logger)
  aTimer   = DebouncingTimer(1, aWatcher, logger)

  asyncio.create_task(aWatcher.managePathsToWatchQueue())

  for aWatch in cliArgs.watch :
    await aWatcher.watchARootPath(aWatch)

  async for event in aWatcher.watchForFileSystemEvents() :
    await aTimer.reStart()

fileResponsePatch

This fileResponsePatch module monkey patches the Starlette::FileResponse.__call__ to allow us to inject our reloaderScript JavaScript into any HTML responses.

newFileResponseCall(self, scope, receive, send) async

A monkey patched version of the Starlette::FileResponse.__call__ method.

This version detects if the media_type is HTML and if so, reads the file line by line looking for the </head> element. When found the </head> string is replaced by a <script>...</script></head> string which contains the reloaderScript from the cphttp.reloader

If the file is not an HTML file, the original Starlette::FileResponse.__call__ is called.

This code is based on the original Starlette::FileResponse.__call__ code. It is used under Starlette's BSD License (see the top of the fileResonsePatch.py file for details)

Source code in cphttp/fileResponsePatch.py
async def newFileResponseCall(self, scope: Scope, receive: Receive, send: Send) -> None:
  """ A monkey patched version of the `Starlette::FileResponse.__call__`
  method.

  This version detects if the media_type is HTML and if so, reads the file
  line by line looking for the `</head>` element. When found the `</head>`
  string is replaced by a `<script>...</script></head>` string which
  contains the `reloaderScript` from the `cphttp.reloader`

  If the file is *not* an HTML file, the original
  `Starlette::FileResponse.__call__` is called.

  This code is based on the original `Starlette::FileResponse.__call__`
  code. It is used under Starlette's BSD License (see the top of the
  fileResonsePatch.py file for details) """

  if not self.media_type.endswith("html") :
    return await oldFileResponseCall(self, scope, receive, send)

  logger.debug("Injecting reloader into [{}]".format(self.path))

  headReplacementStr = """
    <script>{}</script>
  </head>
""".format(reloaderScript)
  headReplacementSize = len(headReplacementStr) - len('</head>')

  if self.stat_result is None:
    try:
      stat_result = await anyio.to_thread.run_sync(os.stat, self.path)
      self.set_stat_headers(stat_result)
    except FileNotFoundError:
      raise RuntimeError(f"File at path {self.path} does not exist.")
    else:
      mode = stat_result.st_mode
      if not stat.S_ISREG(mode):
        raise RuntimeError(f"File at path {self.path} is not a file.")
  self.headers['content-length'] = str(self.stat_result.st_size + headReplacementSize)
  await send(
    {
      "type": "http.response.start",
      "status": self.status_code,
      "headers": self.raw_headers,
    }
  )
  if self.send_header_only:
    await send({"type": "http.response.body", "body": b"", "more_body": False})
  else:
    async with await anyio.open_file(self.path, mode="r") as file:
      more_body = True
      while more_body:
        aLine = await file.readline()
        more_body = len(aLine) != 0
        if -1 < aLine.find('</head>') :
          aLine = aLine.replace('</head>', headReplacementStr)
        await send(
          {
            "type": "http.response.body",
            "body": bytes(aLine, 'utf-8'),
            "more_body": more_body,
          }
        )
  if self.background is not None:
    await self.background()

fsWatcher

The fsWatcher module adapts the asyncinotify example to recursively watch directories or files either by a direct request, or as they are created inside watched directories.

FSWatcher

The FSWatcher class manages the Linux file system inotify watches for a given collection of directories or files. It provides a file change event stream via the iterable recursive_watch method.

To allow for asynchronous operation, the "watches" are added to an asyncio.Queue managed by the managePathsToWatchQueue method. When used, this managePathsToWatchQueue method should be run inside its own asyncio.Task.

get_directories_recursive(self, path)

Recursively list all directories under path, including path itself, if it's a directory.

The path itself is always yielded before its children are iterated, so you can pre-process a path (by watching it with inotify) before you get the directory listing.

Source code in cphttp/fsWatcher.py
def get_directories_recursive(self, path) :
  """ Recursively list all directories under path, including path
  itself, if it's a directory.

  The path itself is always yielded before its children are iterated, so
  you can pre-process a path (by watching it with inotify) before you
  get the directory listing. """

  if path.is_dir() :
    yield path
    for child in path.iterdir():
      yield from self.get_directories_recursive(child)
  elif path.is_file() :
    yield path

managePathsToWatchQueue(self) async

Implement all (pending) requests to watch/unWatch a directory or file which are in the pathsToWatchQueue.

When watching, the paths contained in all directories are themselves recursively added to the pathsToWatchQueue.

Source code in cphttp/fsWatcher.py
async def managePathsToWatchQueue(self) :
  """ Implement all (pending) requests to watch/unWatch a directory or
  file which are in the `pathsToWatchQueue`.

  When watching, the paths contained in all directories are themselves
  recursively added to the `pathsToWatchQueue`. """

  while self.continueWatchingFS :
    addPath, aPathToWatch, theWatch = await self.pathsToWatchQueue.get()

    if addPath :
      for aPath in self.get_directories_recursive(Path(aPathToWatch)) :
        try :
          self.numWatches = self.numWatches + 1
          self.inotify.add_watch(aPath, self.wrMask)
          self.logger.debug(f'INIT: watching {aPath}')
        except PermissionError as err :
          pass
        except Exception as err:
          print(f"Exception while trying to watch: [{aPath}]")
          traceback.print_exc(err)
          # we can't watch this path just yet...
          # ... schedule its parent and try again...
          await self.watchAPath(aPath.parent)
    else :
      # according to the documentation.... the corresponding
      # Mask.IGNORE event will automatically remove this watch.
      #self.inotify.rm_watch(theWatch)
      self.numUnWatches = self.numUnWatches + 1
      self.logger.debug(f'INIT: unWatching {aPathToWatch}')
      if aPathToWatch in self.rootPaths :
        self.logger.debug(f'INIT: found root path... rewatching it {aPathToWatch}')
        await self.watchAPath(aPathToWatch)
    self.pathsToWatchQueue.task_done()

stopWatchingFileSystem(self)

(Gracefully) stop watching the file system

Source code in cphttp/fsWatcher.py
def stopWatchingFileSystem(self) :
  """(Gracefully) stop watching the file system"""

  self.continueWatchingFS = False

unWatchAPath(self, pathToWatch, aWatch) async

Add a single directory or file to be unWatched by this instance of FSWatcher to the pathsToWatchQueue.

Source code in cphttp/fsWatcher.py
async def unWatchAPath(self, pathToWatch, aWatch) :
  """ Add a single directory or file to be unWatched by this instance of
  `FSWatcher` to the `pathsToWatchQueue`. """

  self.logger.debug("Adding path to (un)watch queue {}".format(pathToWatch))
  await self.pathsToWatchQueue.put((False, pathToWatch, aWatch))

watchAPath(self, pathToWatch) async

Add a single directory or file to be watched by this instance of FSWatcher to the pathsToWatchQueue.

Source code in cphttp/fsWatcher.py
async def watchAPath(self, pathToWatch) :
  """ Add a single directory or file to be watched by this instance of
  `FSWatcher` to the `pathsToWatchQueue`. """

  self.logger.debug("Adding path to watch queue {}".format(pathToWatch))
  await self.pathsToWatchQueue.put((True, pathToWatch, None))

watchARootPath(self, pathToWatch) async

Add a single directory or file to the list of "root" paths to watch as well as schedule it to be watched. When one of the root paths is deleted, it will be re-watched.

Source code in cphttp/fsWatcher.py
async def watchARootPath(self, pathToWatch) :
  """Add a single directory or file to the list of "root" paths to watch
  as well as schedule it to be watched. When one of the root paths is
  deleted, it will be re-watched."""


  self.logger.debug("Adding root path [{}]".format(pathToWatch))
  self.rootPaths.append(pathToWatch)
  await self.watchAPath(pathToWatch)

watchForFileSystemEvents(self)

An asynchronously interable method which yields file system change events.

Source code in cphttp/fsWatcher.py
async def watchForFileSystemEvents(self):
  """ An asynchronously interable method which yields file system change
  events. """

  # Things that can throw this off:
  #
  # * Moving a watched directory out of the watch tree (will still
  #   generate events even when outside of directory tree)
  #
  # * Doing two changes on a directory or something before the program
  #   has a time to handle it (this will also throw off a lot of inotify
  #   code, though)
  #
  # * Moving a watched directory within a watched directory will get the
  #   wrong path.  This needs to use the cookie system to link events
  #   together and complete the move properly, which can still make some
  #   events get the wrong path if you get file events during the move or
  #   something silly like that, since MOVED_FROM and MOVED_TO aren't
  #   guaranteed to be contiguous.  That exercise is left up to the
  #   reader.
  #
  # * Trying to watch a path that doesn't exist won't automatically
  #   create it or anything of the sort.
  #
  # * Deleting and recreating or moving the watched directory won't do
  #   anything special, but it probably should.
  #
  async for event in self.inotify:

    if not self.continueWatchingFS :
      return

    # If this is a creation event, add a watch for the new path (and its
    # subdirectories if any)
    #
    if Mask.CREATE in event.mask and event.path is not None :
      await self.watchAPath(event.path)

    if Mask.DELETE_SELF in event.mask and event.path is not None :
      await self.unWatchAPath(event.path, event.watch)

    # If there are some bits in the cpMask in the event.mask yield this
    # event
    #
    if event.mask & self.cpMask:
      yield event
    else:
      # Note that these events are needed for cleanup purposes.
      # We'll always get IGNORED events so the watch can be removed
      # from the inotify.  We don't need to do anything with the
      # events, but they do need to be generated for cleanup.
      # We don't need to pass IGNORED events up, because the end-user
      # doesn't have the inotify instance anyway, and IGNORED is just
      # used for management purposes.
      #
      self.logger.debug(f'UNYIELDED EVENT: {event}')

reloader

The fileResponsePatch module injects the contents of the reloaderScript string, provided by this module, as a <script>...</script> at the end of any HTML file's <head>...</head> section.