The fsWatcher module adapts the asyncinotify example to recursively watch directories or files either by a direct request, or as they are created inside watched directories.
The FSWatcher
class manages the Linux file system inotify
watches for a given collection of directories or files. It provides a
file change event stream via the iterable recursive_watch
To allow for asynchronous operation, the "watches" are added to an
managed by the managePathsToWatchQueue
method. When
used, this managePathsToWatchQueue
method should be run inside its own
get_directories_recursive(self, path)
Recursively list all directories under path, including path itself, if it's a directory.
The path itself is always yielded before its children are iterated, so you can pre-process a path (by watching it with inotify) before you get the directory listing.
Source code in cpawd/
def get_directories_recursive(self, path) :
""" Recursively list all directories under path, including path
itself, if it's a directory.
The path itself is always yielded before its children are iterated, so
you can pre-process a path (by watching it with inotify) before you
get the directory listing. """
if path.is_dir() :
yield path
for child in path.iterdir():
yield from self.get_directories_recursive(child)
elif path.is_file() :
yield path
Implement all (pending) requests to watch/unWatch a directory or
file which are in the pathsToWatchQueue
When watching, the paths contained in all directories are themselves
recursively added to the pathsToWatchQueue
Source code in cpawd/
async def managePathsToWatchQueue(self) :
""" Implement all (pending) requests to watch/unWatch a directory or
file which are in the `pathsToWatchQueue`.
When watching, the paths contained in all directories are themselves
recursively added to the `pathsToWatchQueue`. """
while self.continueWatchingFS :
addPath, aPathToWatch, theWatch = await self.pathsToWatchQueue.get()
if addPath :
for aPath in self.get_directories_recursive(Path(aPathToWatch)) :
try :
self.numWatches = self.numWatches + 1
self.inotify.add_watch(aPath, self.wrMask)
self.logger.debug(f'INIT: watching {aPath}')
except PermissionError as err :
except Exception as err:
print(f"Exception while trying to watch: [{aPath}]")
# we can't watch this path just yet...
# ... schedule its parent and try again...
await self.watchAPath(aPath.parent)
else :
# according to the documentation.... the corresponding
# Mask.IGNORE event will automatically remove this watch.
self.numUnWatches = self.numUnWatches + 1
self.logger.debug(f'INIT: unWatching {aPathToWatch}')
if aPathToWatch in self.rootPaths :
self.logger.debug(f'INIT: found root path... rewatching it {aPathToWatch}')
await self.watchAPath(aPathToWatch)
(Gracefully) stop watching the file system
Source code in cpawd/
def stopWatchingFileSystem(self) :
"""(Gracefully) stop watching the file system"""
self.continueWatchingFS = False
unWatchAPath(self, pathToWatch, aWatch)
Add a single directory or file to be unWatched by this instance of
to the pathsToWatchQueue
Source code in cpawd/
async def unWatchAPath(self, pathToWatch, aWatch) :
""" Add a single directory or file to be unWatched by this instance of
`FSWatcher` to the `pathsToWatchQueue`. """
self.logger.debug("Adding path to (un)watch queue {}".format(pathToWatch))
await self.pathsToWatchQueue.put((False, pathToWatch, aWatch))
watchAPath(self, pathToWatch)
Add a single directory or file to be watched by this instance of
to the pathsToWatchQueue
Source code in cpawd/
async def watchAPath(self, pathToWatch) :
""" Add a single directory or file to be watched by this instance of
`FSWatcher` to the `pathsToWatchQueue`. """
self.logger.debug("Adding path to watch queue {}".format(pathToWatch))
await self.pathsToWatchQueue.put((True, pathToWatch, None))
watchARootPath(self, pathToWatch)
Add a single directory or file to the list of "root" paths to watch as well as schedule it to be watched. When one of the root paths is deleted, it will be re-watched.
Source code in cpawd/
async def watchARootPath(self, pathToWatch) :
"""Add a single directory or file to the list of "root" paths to watch
as well as schedule it to be watched. When one of the root paths is
deleted, it will be re-watched."""
self.logger.debug("Adding root path [{}]".format(pathToWatch))
await self.watchAPath(pathToWatch)
An asynchronously interable method which yields file system change events.
Source code in cpawd/
async def watchForFileSystemEvents(self):
""" An asynchronously interable method which yields file system change
events. """
# Things that can throw this off:
# * Moving a watched directory out of the watch tree (will still
# generate events even when outside of directory tree)
# * Doing two changes on a directory or something before the program
# has a time to handle it (this will also throw off a lot of inotify
# code, though)
# * Moving a watched directory within a watched directory will get the
# wrong path. This needs to use the cookie system to link events
# together and complete the move properly, which can still make some
# events get the wrong path if you get file events during the move or
# something silly like that, since MOVED_FROM and MOVED_TO aren't
# guaranteed to be contiguous. That exercise is left up to the
# reader.
# * Trying to watch a path that doesn't exist won't automatically
# create it or anything of the sort.
# * Deleting and recreating or moving the watched directory won't do
# anything special, but it probably should.
async for event in self.inotify:
if not self.continueWatchingFS :
# If this is a creation event, add a watch for the new path (and its
# subdirectories if any)
if Mask.CREATE in event.mask and event.path is not None :
await self.watchAPath(event.path)
if Mask.DELETE_SELF in event.mask and event.path is not None :
await self.unWatchAPath(event.path,
# If there are some bits in the cpMask in the event.mask yield this
# event
if event.mask & self.cpMask:
yield event
# Note that these events are needed for cleanup purposes.
# We'll always get IGNORED events so the watch can be removed
# from the inotify. We don't need to do anything with the
# events, but they do need to be generated for cleanup.
# We don't need to pass IGNORED events up, because the end-user
# doesn't have the inotify instance anyway, and IGNORED is just
# used for management purposes.
self.logger.debug(f'UNYIELDED EVENT: {event}')
Translate a raw Mask number into a human readable name.
Source code in cpawd/
def getMaskName(aMask) :
""" Translate a raw Mask number into a human readable name. """
maskName = ""
for aPotentialMask in masksToListenFor :
if aMask & aPotentialMask[0] :
maskName = maskName + ' ' + aPotentialMask[1]
return maskName.lstrip()
Compute the module's list of masksToLisentFor
into a single mask.
Source code in cpawd/
def getMasksToListenFor() :
""" Compute the module's list of `masksToLisentFor` into a single mask. """
masks = 0
for aMask in masksToListenFor :
masks = masks | aMask[0]
return masks