--- /dev/null
+#!/usr/bin/env python3
+
+import hashlib
+import os
+from typing import Any, Optional
+
+import predicate
+import file_utils
+
+
+class DirectoryFileFilter(predicate.Predicate):
+ """A predicate that will return False if when a proposed file's
+ content to-be-written is identical to the contents of the file;
+ skip the write.
+ """
+
+ def __init__(self, directory: str):
+ super().__init__()
+ if not file_utils.does_directory_exist(directory):
+ raise ValueError(directory)
+ self.directory = directory
+ self.md5_by_filename = {}
+ self.mtime_by_filename = {}
+ self._update()
+
+ def _update(self):
+ for direntry in os.scandir(self.directory):
+ if direntry.is_file(follow_symlinks=True):
+ mtime = direntry.stat(follow_symlinks=True).st_mtime
+ path = f'{self.directory}/{direntry.name}'
+ self._update_file(path, mtime)
+
+ def _update_file(self, filename: str, mtime: Optional[float] = None):
+ assert file_utils.does_file_exist(filename)
+ if mtime is None:
+ mtime = file_utils.get_file_raw_mtime(filename)
+ if self.mtime_by_filename.get(filename, 0) != mtime:
+ md5 = file_utils.get_file_md5(filename)
+ self.mtime_by_filename[filename] = mtime
+ self.md5_by_filename[filename] = md5
+
+ def apply(self, item: Any, filename: str) -> bool:
+ self._update_file(filename)
+ file_md5 = self.md5_by_filename.get(filename, 0)
+ mem_hash = hashlib.md5()
+ mem_hash.update(item)
+ md5 = mem_hash.hexdigest()
+ return md5 != file_md5
+
+
+class DirectoryAllFilesFilter(DirectoryFileFilter):
+ """A predicate that will return False if a file to-be-written to a
+ particular directory is identical to any other file in that same
+ directory.
+ """
+
+ def __init__(self, directory: str):
+ self.all_md5s = set()
+ super().__init__(directory)
+ print(self.all_md5s)
+
+ def _update_file(self, filename: str, mtime: Optional[float] = None):
+ assert file_utils.does_file_exist(filename)
+ if mtime is None:
+ mtime = file_utils.get_file_raw_mtime(filename)
+ if self.mtime_by_filename.get(filename, 0) != mtime:
+ md5 = file_utils.get_file_md5(filename)
+ self.mtime_by_filename[filename] = mtime
+ self.md5_by_filename[filename] = md5
+ self.all_md5s.add(md5)
+
+ def apply(self, item: Any) -> bool:
+ self._update()
+ mem_hash = hashlib.md5()
+ mem_hash.update(item)
+ md5 = mem_hash.hexdigest()
+ return md5 not in self.all_md5s
+