cpawd.taskRunner
This cpawd.taskRunner module implements the running of all watch-do tasks.
The following description is illustrated in the interaction diagram below.
The top level runTasks method initiates an asyncio.Tasks running the
watchDo method for each watch-do task. The watchDo method reStarts
an asyncio.Task, taskRunner, via a call to asyncio.ensure_future, to
manage a (potentially long running) OS process. The watchDo task
listens, in an FSWatcher.watchForFileSystemEvents loop, for any file
system changes which might be happening, reStarting the taskRunner
task on any such changes.
The taskRunner task starts by sleeping, during which time the
taskRunner task can be cancelled and reStarted (by the watchDo
task). This short timeout period of cancel-able sleep acts as a debouncing
timer. It allows the watchDo task to frequently reStart the
taskRunner task without actually running the external OS process until
any nearly simultaneous file system changes have stopped.
If the taskRunner is not cancelled during the sleep, the taskRunner
starts the OS process, using a call to asyncio.create_subprocess_exec,
and then creates two further asyncio.Tasks, captureStdout and
captureRetCode, to manage the process's output as well as to wait for
the process's return code.
Once an external OS process has been started, any reStart requests from
the watchDo task, signals the captureStdout task to stop listening for
the process's stdout, and then sends a SIGHUP signal to the process
(which must respond by gracefully exiting). The watchDo task then
waits on the taskRunner task to finish before creating a new
taskRunner task (and potentially repeating this cycle).
The main cpawd task, can at any time request that the runTasks task
shutdown. To shutdown, the runTasks task first signals all of the
watchDo FSWatcher.watchForFileSystemEvents loops to stop watching for
file system events. Then the runTasks task signals all running
taskRunners to stop.
In this interaction diagram, each asyncio.Task is represented by the
function which the task runs. The OSproc thread is an external OS
process, which is the ultimate "task" of a given watch-do task.
DebouncingTimer
The DebouncingTimer class implements a simple timer to ensure multiple file system events result in only one invocation of the task command.
__init__(self, timeout, taskName, taskDetails, taskLog, terminateSignal)
special
Create the timer with a specific timeout and task definition.
The taskDetails provides the command to run, the log file used to record command output, as well as the project directory in which to run the command.
Source code in cpawd/taskRunner.py
def __init__(self, timeout, taskName, taskDetails, taskLog, terminateSignal) :
""" Create the timer with a specific timeout and task definition.
The taskDetails provides the command to run, the log file used to
record command output, as well as the project directory in which to
run the command. """
self.timeout = timeout
self.taskName = taskName
self.taskCmd = taskDetails['cmd']
self.taskCmdStr = " ".join(taskDetails['cmd'])
self.taskLog = taskLog
self.taskDir = taskDetails['projectDir']
self.termSignal = terminateSignal
self.taskFuture = None
self.proc = None
self.pid = None
self.retCode = None
self.continueCapturingStdout = True
cancelTimer(self)
Cancel the Debouncing timer
Source code in cpawd/taskRunner.py
def cancelTimer(self) :
"""Cancel the Debouncing timer"""
if self.taskFuture and not self.procIsRunning() :
logger.debug("Cancelling timer for {}".format(self.taskName))
self.taskFuture.cancel()
captureOutput(self)
async
Capture the (stdout) output from the external process
Source code in cpawd/taskRunner.py
async def captureOutput(self) :
"""Capture the (stdout) output from the external process"""
logger.debug("CaptureOutput task running for {}".format(self.taskName))
taskLog = self.taskLog
if self.proc is not None :
stdout = self.proc.stdout
if stdout :
await taskLog.write("\n============================================================================\n")
await taskLog.write("{} ({}) stdout @ {}\n".format(
self.taskName, self.proc.pid, time.strftime("%Y/%m/%d %H:%M:%S")
))
await taskLog.write("{}\n".format(self.taskCmdStr))
await taskLog.write("----------------------------------------------------------------------------\n")
await taskLog.flush()
while self.continueCapturingStdout and not stdout.at_eof() :
logger.debug("Collecting {} stdout ({})".format(
self.taskName, self.proc.pid
))
aLine = await stdout.readline()
await taskLog.write(aLine.decode())
await taskLog.flush()
if self.continueCapturingStdout :
logger.debug("Finshed collecting {} stdout ({})".format(
self.taskName, self.proc.pid
))
else :
await taskLog.write("\n[Stopped collecting stdout]")
logger.debug("Stopped collecting process stdout for {} ({})".format(
self.taskName, self.pid
))
await taskLog.write("\n----------------------------------------------------------------------------\n")
await taskLog.write("{} ({}) stdout @ {}\n".format(
self.taskName, self.pid, time.strftime("%Y/%m/%d %H:%M:%S")
))
await taskLog.flush()
else :
logger.debug("No stdout found for {}".format(self.taskName))
else :
logger.debug("No external process found so no stdout captured for {}".format(self.taskName))
logger.debug("CaptureOutput task finished for {}".format(self.taskName))
captureRetCode(self)
async
Wait for and capture the return code of the external process
Source code in cpawd/taskRunner.py
async def captureRetCode(self) :
"""Wait for and capture the return code of the external process"""
logger.debug("Capturing return code for {}".format(self.taskName))
try :
self.retCode = await self.proc.wait()
except ProcessLookupError :
logger.debug("No process found for {} (pid:{})".format(
self.taskName, self.pid
))
if self.retCode is not None :
retCode = self.retCode
pid = self.pid
logger.debug("Return code for {} is {} (pid:{})".format(
self.taskName, retCode, pid
))
taskLog = self.taskLog
await taskLog.write("{} task ({}) exited with {}\n".format(
self.taskName, pid, retCode
))
await taskLog.write("\n")
await taskLog.flush()
logger.debug("Finished {} ({}) command [{}] exited with {}".format(
self.taskName, pid, self.taskCmdStr, retCode
))
self.proc = None
logger.debug("Captured return code for {}".format(self.taskName))
procIsRunning(self)
Determine if an external process is (still) running
Source code in cpawd/taskRunner.py
def procIsRunning(self) :
"""Determine if an external process is (still) running"""
return self.proc is not None and self.proc.returncode is None
reStart(self)
async
(Re)Start the timer. If the timer is already started, it is restarted with a new timeout period.
Source code in cpawd/taskRunner.py
async def reStart(self) :
""" (Re)Start the timer. If the timer is already started, it is
restarted with a new timeout period. """
await self.stopTaskProc()
if self.taskFuture :
self.cancelTimer()
if not self.taskFuture.done() :
logger.debug("Waiting for the previous taskRunner task for {} to finish".format(self.taskName))
await asyncio.wait([self.taskFuture])
logger.debug("Starting new taskRunner for {}".format(self.taskName))
self.taskFuture = asyncio.ensure_future(self.taskRunner())
stopTaskProc(self)
async
Stop the external process
Source code in cpawd/taskRunner.py
async def stopTaskProc(self) :
"""Stop the external process"""
logger.debug("Attempting to stop the task process for {}".format(self.taskName))
self.continueCapturingStdout = False
if self.proc is not None :
pid = self.proc.pid
logger.debug("Process found for {} ({})".format(self.taskName, pid))
if self.procIsRunning() :
logger.debug("Process still running for {}".format(self.taskName))
try:
logger.debug("Sending OS signal ({}) to {} (pid:{})".format(
self.termSignal, self.taskName, pid
))
self.proc.send_signal(self.termSignal)
except ProcessLookupError :
logger.debug("No exiting external process found for {} (pid:{})".format(
self.taskName, pid
))
except Exception as err:
logger.error("Could not send signal ({}) to proc for {} (})".format(
self.termSignal, self.taskName, pid
))
logger.error(repr(err))
traceback.print_exc()
else :
self.retCode = self.proc.returncode
logger.debug("Process finished with return code {} for {}".format(
self.retCode, self.taskName
))
else :
logger.debug("No external process found for {}".format(self.taskName))
taskRunner(self)
async
Run the task's command, after sleeping for the timeout period,
using asyncio.create_subprocess_exec command.
Source code in cpawd/taskRunner.py
async def taskRunner(self) :
""" Run the task's command, after sleeping for the timeout period,
using `asyncio.create_subprocess_exec` command. """
try:
logger.debug("TaskRunner for {} sleeping for {}".format(
self.taskName, self.timeout
))
await asyncio.sleep(self.timeout)
# Now we can run the new task...
#
logger.debug("Running {} command [{}]".format(
self.taskName, self.taskCmdStr
))
self.proc = await asyncio.create_subprocess_exec(
*self.taskCmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=self.taskDir
)
self.pid = self.proc.pid
self.retCode = None
self.continueCapturingStdout = True
print(f'Ran: {self.taskName}')
#await asyncio.gather(
await self.captureOutput(),
await self.captureRetCode()
#)
if self.retCode is None or self.retCode != 0 :
print(f"FAILED: {self.taskName} ({self.retCode})")
except Exception as err :
print("Caught exception while running {} task".format(self.taskName))
print(repr(err))
traceback.print_exc()
runTasks(config)
async
Walk through the list of watch-do tasks and create an asyncio.Task
for each task, using an invocation of the watchDo method to wrap each
task. Since these tasks are not Python-CPU bound, they will essentially
"run" in parallel.
Source code in cpawd/taskRunner.py
async def runTasks(config) :
""" Walk through the list of watch-do tasks and create an `asyncio.Task`
for each task, using an invocation of the `watchDo` method to wrap each
task. Since these tasks are not Python-CPU bound, they will essentially
"run" in parallel. """
for aTaskName, aTask in config['tasks'].items() :
asyncio.create_task(watchDo(aTaskName, aTask))
await waitForShutdown()
stopTasks()
async
Stop all watch-do tasks
Source code in cpawd/taskRunner.py
async def stopTasks() :
"""Stop all watch-do tasks"""
logger.info("Stopping all tasks")
for aWatcher in watchers :
aWatcher.stopWatchingFileSystem()
for aTimer in debouncingTimers :
await aTimer.stopTaskProc()
aTimer.cancelTimer()
logger.debug("All tasks Stoped")
waitForShutdown()
async
Wait for the shutdown event and then stop all watch-do tasks
Source code in cpawd/taskRunner.py
async def waitForShutdown() :
"""Wait for the shutdown event and then stop all watch-do tasks"""
logger.debug("waiting for eventual shutdown event")
await shutdownTasks.wait()
logger.debug("got shutdown")
await stopTasks()
logger.debug("shutdown")
watchDo(aTaskName, aTask)
async
Setup and manage the watches, and then run the task's command using the DebouncingTimer whenever a change is detected in a watched directory or file.
Source code in cpawd/taskRunner.py
async def watchDo(aTaskName, aTask) :
""" Setup and manage the watches, and then run the task's command using
the DebouncingTimer whenever a change is detected in a watched directory
or file. """
logger.debug("Starting watchDo for {}".format(aTaskName))
aWatcher = FSWatcher(logger)
watchers.append(aWatcher)
taskLog = await aiofiles.open(aTask['logFilePath'], 'w')
aTimer = DebouncingTimer(1, aTaskName, aTask, taskLog, signal.SIGHUP)
debouncingTimers.append(aTimer)
# add watches
asyncio.create_task(aWatcher.managePathsToWatchQueue())
for aWatch in aTask['watch'] :
await aWatcher.watchARootPath(aWatch)
# Ensure the task is run at least once
logger.debug("First run of taskRunner for {}".format(aTaskName))
await aTimer.reStart()
# watch and run cmd
if 'runOnce' not in aTask :
async for event in aWatcher.watchForFileSystemEvents() :
logger.debug("File system event mask {} for file [{}] for task {}".format(
getMaskName(event.mask), event.name, aTaskName
))
await aTimer.reStart()