Python code details
cphttp
Implement a very simple reloading HTTP server for ComputePods
DebouncingTimer
A simple debouncing timer which ensures we wait until any high frequency events have stopped.
cancelTask(self)
Cancel the current timer.
Source code in cphttp/cphttp.py
def cancelTask(self) :
"""Cancel the current timer."""
if self.taskFuture :
self.taskFuture.cancel()
doTask(self)
async
Sleep for timeout seconds while waiting to be (potentially)
cancelled. If we are not cancelled, send a reload
message to the
browser.
Source code in cphttp/cphttp.py
async def doTask(self) :
"""Sleep for timeout seconds while waiting to be (potentially)
cancelled. If we are not cancelled, send a `reload` message to the
browser."""
await asyncio.sleep(self.timeout)
try :
print("testing for reload")
rootPaths = self.theWatcher.getRootPaths()
for aPath in rootPaths :
if not os.path.exists(aPath) :
self.logger.debug("Root path [{}] does NOT exist".format(aPath))
await self.reStart()
return
numWatches, numUnWatches = self.theWatcher.getWatchStats()
print("watches: {} unWatches: {}".format(numWatches, numUnWatches))
if numWatches + 2 < numUnWatches :
await self.reStart()
return
self.theWatcher.clearWatchStats()
self.logger.debug("Sending RELOAD message to browser")
await heartBeatQueue.put("reload")
except Exception as err :
print(repr(err))
reStart(self)
async
Restart the timer (cancel the old one if it exists).
Source code in cphttp/cphttp.py
async def reStart(self) :
"""Restart the timer (cancel the old one if it exists)."""
self.cancelTask()
self.taskFuture = asyncio.ensure_future(self.doTask())
configureWebServer(cliArgs)
Configure the Hypercorn webserver.
Source code in cphttp/cphttp.py
def configureWebServer(cliArgs) :
""" Configure the Hypercorn webserver."""
config = Config()
config.bind = [ cliArgs.host+':'+str(cliArgs.port) ]
config.loglevel = cliArgs.loglevel
config.accesslog = cliArgs.accesslog
config.errorlog = cliArgs.errorlog
config.log # Force the config object to instantiate the loggers
return (
logging.getLogger('hypercorn.access'),
config
)
cphttp()
Parse the command line arguments, configure hypercorn, setup the /heartBeat Server Sent Events handler using sse_starlette, and then run the server.
Source code in cphttp/cphttp.py
def cphttp() :
""" Parse the command line arguments, configure hypercorn, setup the
/heartBeat Server Sent Events handler using sse_starlette, and then run
the server. """
argparser = argparse.ArgumentParser(
description="A very simple reloading Http server using SSE Starlette and Hypercorn."
)
argparser.add_argument("-H", "--host", default="localhost",
help="The host interface to listen on (default: localhost)"
)
argparser.add_argument("-p", "--port", default=8008,
help="The port to listen on (default: 8008)"
)
argparser.add_argument("-d", "--directory", default="html",
help="The html directory to serve static files from (default: html)"
)
argparser.add_argument("-v", "--verbose", default=False,
action=argparse.BooleanOptionalAction,
help="provide more information about what is happening"
)
argparser.add_argument("-a", "--accesslog", default='-',
help="specify a file for the access log (default: stdout)"
)
argparser.add_argument("-e", "--errorlog", default='-',
help="specify a file for the error log (default: stdout)"
)
argparser.add_argument("-l", "--loglevel", default='INFO',
help="specify the access/error logging level (default: INFO)"
)
argparser.add_argument("-w", "--watch", default=[], action='append',
help="sepcify the directories/files to watch (can be used multiple times) (default: none)"
)
cliArgs = argparser.parse_args()
logger, config = configureWebServer(cliArgs)
# setup the asyncio loop
loop = asyncio.get_event_loop()
loop.set_debug(cliArgs.verbose)
loop.add_signal_handler(signal.SIGTERM, signalHandler, "SIGTERM", logger)
loop.add_signal_handler(signal.SIGHUP, signalHandler, "SIGHUP", logger)
loop.add_signal_handler(signal.SIGINT, signalHandler, "SIGINT", logger)
loop.run_until_complete(runUntilShutdown(cliArgs, logger, config))
logger.info("Finised serving")
logging.shutdown()
heartBeatBeater()
The SSE generator of messages to be sent over the /heartBeat SSE.
Using an asyncio.Queue allows the counter and reload messages to be interleaved.
Source code in cphttp/cphttp.py
async def heartBeatBeater() :
""" The SSE generator of messages to be sent over the /heartBeat SSE.
Using an asyncio.Queue allows the counter and reload messages to be
interleaved. """
while heartBeatContinueBeating:
theMessage = await heartBeatQueue.get()
yield dict(data=json.dumps(theMessage))
heartBeatQueue.task_done()
heartBeatCounter()
async
A (slow) counter to act as messages over the /heartBeat SSE which help keep the connection open when run through an HTTP-proxy.
Source code in cphttp/cphttp.py
async def heartBeatCounter() :
""" A (slow) counter to act as messages over the /heartBeat SSE which
help keep the connection open when run through an HTTP-proxy. """
count = 0
while heartBeatContinueBeating:
await asyncio.sleep(2)
await heartBeatQueue.put(str(count))
count = count + 1
heartBeatSSE(request)
async
Implement the /heartBeat SSE end point.
Start the long running counter and pass it to the sse_starlette EventSourceResponse.
Source code in cphttp/cphttp.py
async def heartBeatSSE(request) :
""" Implement the /heartBeat SSE end point.
Start the long running counter and pass it to the sse_starlette
EventSourceResponse. """
asyncio.create_task(heartBeatCounter())
beater = heartBeatBeater()
return EventSourceResponse(beater)
runWebServer(cliArgs, logger, config)
async
Setup the Starlette Application and run it.
Source code in cphttp/cphttp.py
async def runWebServer(cliArgs, logger, config) :
"""Setup the Starlette Application and run it."""
app = Starlette(debug=cliArgs.verbose)
app.add_route(
'/heartBeat',
heartBeatSSE,
name='heartBeat'
)
app.mount(
'/',
StaticFiles(directory=cliArgs.directory, html=True),
name='home'
)
logger.info("Serving static files from [{}]".format(cliArgs.directory))
for aRoute in app.routes :
logger.info("MountPoint: [{}]".format(aRoute.path))
await serve(app, config, shutdown_trigger=shutdownHypercorn.wait)
signalHandler(signum, logger)
Handle an OS system signal by stopping the heartBeat
Source code in cphttp/cphttp.py
def signalHandler(signum, logger) :
""" Handle an OS system signal by stopping the heartBeat """
print("")
logger.info("SignalHandler: Caught signal {}".format(signum))
stopHeartBeat()
shutdownHypercorn.set()
stopWebServer()
Tell the Hypercorn server to stop.
Source code in cphttp/cphttp.py
def stopWebServer() :
"""Tell the Hypercorn server to stop."""
shutdownHypercorn.set()
watchFiles(cliArgs, logger)
async
Setup the file system watcher.
Source code in cphttp/cphttp.py
async def watchFiles(cliArgs, logger) :
"""Setup the file system watcher."""
aWatcher = FSWatcher(logger)
aTimer = DebouncingTimer(1, aWatcher, logger)
asyncio.create_task(aWatcher.managePathsToWatchQueue())
for aWatch in cliArgs.watch :
await aWatcher.watchARootPath(aWatch)
async for event in aWatcher.watchForFileSystemEvents() :
await aTimer.reStart()
fileResponsePatch
This fileResponsePatch module monkey patches the
Starlette::FileResponse.__call__
to allow us to inject our
reloaderScript
JavaScript into any HTML responses.
newFileResponseCall(self, scope, receive, send)
async
A monkey patched version of the Starlette::FileResponse.__call__
method.
This version detects if the media_type is HTML and if so, reads the file
line by line looking for the </head>
element. When found the </head>
string is replaced by a <script>...</script></head>
string which
contains the reloaderScript
from the cphttp.reloader
If the file is not an HTML file, the original
Starlette::FileResponse.__call__
is called.
This code is based on the original Starlette::FileResponse.__call__
code. It is used under Starlette's BSD License (see the top of the
fileResonsePatch.py file for details)
Source code in cphttp/fileResponsePatch.py
async def newFileResponseCall(self, scope: Scope, receive: Receive, send: Send) -> None:
""" A monkey patched version of the `Starlette::FileResponse.__call__`
method.
This version detects if the media_type is HTML and if so, reads the file
line by line looking for the `</head>` element. When found the `</head>`
string is replaced by a `<script>...</script></head>` string which
contains the `reloaderScript` from the `cphttp.reloader`
If the file is *not* an HTML file, the original
`Starlette::FileResponse.__call__` is called.
This code is based on the original `Starlette::FileResponse.__call__`
code. It is used under Starlette's BSD License (see the top of the
fileResonsePatch.py file for details) """
if not self.media_type.endswith("html") :
return await oldFileResponseCall(self, scope, receive, send)
logger.debug("Injecting reloader into [{}]".format(self.path))
headReplacementStr = """
<script>{}</script>
</head>
""".format(reloaderScript)
headReplacementSize = len(headReplacementStr) - len('</head>')
if self.stat_result is None:
try:
stat_result = await anyio.to_thread.run_sync(os.stat, self.path)
self.set_stat_headers(stat_result)
except FileNotFoundError:
raise RuntimeError(f"File at path {self.path} does not exist.")
else:
mode = stat_result.st_mode
if not stat.S_ISREG(mode):
raise RuntimeError(f"File at path {self.path} is not a file.")
self.headers['content-length'] = str(self.stat_result.st_size + headReplacementSize)
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
if self.send_header_only:
await send({"type": "http.response.body", "body": b"", "more_body": False})
else:
async with await anyio.open_file(self.path, mode="r") as file:
more_body = True
while more_body:
aLine = await file.readline()
more_body = len(aLine) != 0
if -1 < aLine.find('</head>') :
aLine = aLine.replace('</head>', headReplacementStr)
await send(
{
"type": "http.response.body",
"body": bytes(aLine, 'utf-8'),
"more_body": more_body,
}
)
if self.background is not None:
await self.background()
fsWatcher
The fsWatcher module adapts the asyncinotify example to recursively watch directories or files either by a direct request, or as they are created inside watched directories.
FSWatcher
The FSWatcher
class manages the Linux file system inotify
watches for a given collection of directories or files. It provides a
file change event stream via the iterable recursive_watch
method.
To allow for asynchronous operation, the "watches" are added to an
asyncio.Queue
managed by the managePathsToWatchQueue
method. When
used, this managePathsToWatchQueue
method should be run inside its own
asyncio.Task
.
get_directories_recursive(self, path)
Recursively list all directories under path, including path itself, if it's a directory.
The path itself is always yielded before its children are iterated, so you can pre-process a path (by watching it with inotify) before you get the directory listing.
Source code in cphttp/fsWatcher.py
def get_directories_recursive(self, path) :
""" Recursively list all directories under path, including path
itself, if it's a directory.
The path itself is always yielded before its children are iterated, so
you can pre-process a path (by watching it with inotify) before you
get the directory listing. """
if path.is_dir() :
yield path
for child in path.iterdir():
yield from self.get_directories_recursive(child)
elif path.is_file() :
yield path
managePathsToWatchQueue(self)
async
Implement all (pending) requests to watch/unWatch a directory or
file which are in the pathsToWatchQueue
.
When watching, the paths contained in all directories are themselves
recursively added to the pathsToWatchQueue
.
Source code in cphttp/fsWatcher.py
async def managePathsToWatchQueue(self) :
""" Implement all (pending) requests to watch/unWatch a directory or
file which are in the `pathsToWatchQueue`.
When watching, the paths contained in all directories are themselves
recursively added to the `pathsToWatchQueue`. """
while self.continueWatchingFS :
addPath, aPathToWatch, theWatch = await self.pathsToWatchQueue.get()
if addPath :
for aPath in self.get_directories_recursive(Path(aPathToWatch)) :
try :
self.numWatches = self.numWatches + 1
self.inotify.add_watch(aPath, self.wrMask)
self.logger.debug(f'INIT: watching {aPath}')
except PermissionError as err :
pass
except Exception as err:
print(f"Exception while trying to watch: [{aPath}]")
traceback.print_exc(err)
# we can't watch this path just yet...
# ... schedule its parent and try again...
await self.watchAPath(aPath.parent)
else :
# according to the documentation.... the corresponding
# Mask.IGNORE event will automatically remove this watch.
#self.inotify.rm_watch(theWatch)
self.numUnWatches = self.numUnWatches + 1
self.logger.debug(f'INIT: unWatching {aPathToWatch}')
if aPathToWatch in self.rootPaths :
self.logger.debug(f'INIT: found root path... rewatching it {aPathToWatch}')
await self.watchAPath(aPathToWatch)
self.pathsToWatchQueue.task_done()
stopWatchingFileSystem(self)
(Gracefully) stop watching the file system
Source code in cphttp/fsWatcher.py
def stopWatchingFileSystem(self) :
"""(Gracefully) stop watching the file system"""
self.continueWatchingFS = False
unWatchAPath(self, pathToWatch, aWatch)
async
Add a single directory or file to be unWatched by this instance of
FSWatcher
to the pathsToWatchQueue
.
Source code in cphttp/fsWatcher.py
async def unWatchAPath(self, pathToWatch, aWatch) :
""" Add a single directory or file to be unWatched by this instance of
`FSWatcher` to the `pathsToWatchQueue`. """
self.logger.debug("Adding path to (un)watch queue {}".format(pathToWatch))
await self.pathsToWatchQueue.put((False, pathToWatch, aWatch))
watchAPath(self, pathToWatch)
async
Add a single directory or file to be watched by this instance of
FSWatcher
to the pathsToWatchQueue
.
Source code in cphttp/fsWatcher.py
async def watchAPath(self, pathToWatch) :
""" Add a single directory or file to be watched by this instance of
`FSWatcher` to the `pathsToWatchQueue`. """
self.logger.debug("Adding path to watch queue {}".format(pathToWatch))
await self.pathsToWatchQueue.put((True, pathToWatch, None))
watchARootPath(self, pathToWatch)
async
Add a single directory or file to the list of "root" paths to watch as well as schedule it to be watched. When one of the root paths is deleted, it will be re-watched.
Source code in cphttp/fsWatcher.py
async def watchARootPath(self, pathToWatch) :
"""Add a single directory or file to the list of "root" paths to watch
as well as schedule it to be watched. When one of the root paths is
deleted, it will be re-watched."""
self.logger.debug("Adding root path [{}]".format(pathToWatch))
self.rootPaths.append(pathToWatch)
await self.watchAPath(pathToWatch)
watchForFileSystemEvents(self)
An asynchronously interable method which yields file system change events.
Source code in cphttp/fsWatcher.py
async def watchForFileSystemEvents(self):
""" An asynchronously interable method which yields file system change
events. """
# Things that can throw this off:
#
# * Moving a watched directory out of the watch tree (will still
# generate events even when outside of directory tree)
#
# * Doing two changes on a directory or something before the program
# has a time to handle it (this will also throw off a lot of inotify
# code, though)
#
# * Moving a watched directory within a watched directory will get the
# wrong path. This needs to use the cookie system to link events
# together and complete the move properly, which can still make some
# events get the wrong path if you get file events during the move or
# something silly like that, since MOVED_FROM and MOVED_TO aren't
# guaranteed to be contiguous. That exercise is left up to the
# reader.
#
# * Trying to watch a path that doesn't exist won't automatically
# create it or anything of the sort.
#
# * Deleting and recreating or moving the watched directory won't do
# anything special, but it probably should.
#
async for event in self.inotify:
if not self.continueWatchingFS :
return
# If this is a creation event, add a watch for the new path (and its
# subdirectories if any)
#
if Mask.CREATE in event.mask and event.path is not None :
await self.watchAPath(event.path)
if Mask.DELETE_SELF in event.mask and event.path is not None :
await self.unWatchAPath(event.path, event.watch)
# If there are some bits in the cpMask in the event.mask yield this
# event
#
if event.mask & self.cpMask:
yield event
else:
# Note that these events are needed for cleanup purposes.
# We'll always get IGNORED events so the watch can be removed
# from the inotify. We don't need to do anything with the
# events, but they do need to be generated for cleanup.
# We don't need to pass IGNORED events up, because the end-user
# doesn't have the inotify instance anyway, and IGNORED is just
# used for management purposes.
#
self.logger.debug(f'UNYIELDED EVENT: {event}')
reloader
The fileResponsePatch module injects the contents of the reloaderScript
string, provided by this module, as a <script>...</script>
at the end of
any HTML file's <head>...</head>
section.