Ahem. Still running black?
[python_utils.git] / directory_filter.py
1 #!/usr/bin/env python3
2
3 import hashlib
4 import logging
5 import os
6 from typing import Any, Optional
7
8 logger = logging.getLogger(__name__)
9
10
11 class DirectoryFileFilter(object):
12     """A predicate that will return False if / when a proposed file's
13     content to-be-written is identical to the contents of the file on
14     disk allowing calling code to safely skip the write.
15
16     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c2.txt'
17     >>> contents = b'This is a test'
18     >>> with open(testfile, 'wb') as wf:
19     ...     wf.write(contents)
20     14
21
22     >>> d = DirectoryFileFilter('/tmp')
23
24     >>> d.apply(contents, testfile)     # False if testfile already contains contents
25     False
26
27     >>> d.apply(b'That was a test', testfile)    # True otherwise
28     True
29
30     >>> os.remove(testfile)
31
32     """
33
34     def __init__(self, directory: str):
35         super().__init__()
36         import file_utils
37
38         if not file_utils.does_directory_exist(directory):
39             raise ValueError(directory)
40         self.directory = directory
41         self.md5_by_filename = {}
42         self.mtime_by_filename = {}
43         self._update()
44
45     def _update(self):
46         for direntry in os.scandir(self.directory):
47             if direntry.is_file(follow_symlinks=True):
48                 mtime = direntry.stat(follow_symlinks=True).st_mtime
49                 path = f'{self.directory}/{direntry.name}'
50                 self._update_file(path, mtime)
51
52     def _update_file(self, filename: str, mtime: Optional[float] = None):
53         import file_utils
54
55         assert file_utils.does_file_exist(filename)
56         if mtime is None:
57             mtime = file_utils.get_file_raw_mtime(filename)
58         if self.mtime_by_filename.get(filename, 0) != mtime:
59             md5 = file_utils.get_file_md5(filename)
60             logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
61             self.mtime_by_filename[filename] = mtime
62             self.md5_by_filename[filename] = md5
63
64     def apply(self, item: Any, filename: str) -> bool:
65         self._update_file(filename)
66         file_md5 = self.md5_by_filename.get(filename, 0)
67         logger.debug(f'{filename}\'s checksum is {file_md5}')
68         mem_hash = hashlib.md5()
69         mem_hash.update(item)
70         md5 = mem_hash.hexdigest()
71         logger.debug(f'Item\'s checksum is {md5}')
72         return md5 != file_md5
73
74
75 class DirectoryAllFilesFilter(DirectoryFileFilter):
76     """A predicate that will return False if a file to-be-written to a
77     particular directory is identical to any other file in that same
78     directory.
79
80     i.e. this is the same as the above except that its apply() method
81     will return true not only if the contents to be written are
82     identical to the contents of filename on the disk but also it
83     returns true if there exists some other file sitting in the same
84     directory which already contains those identical contents.
85
86     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt'
87
88     >>> contents = b'This is a test'
89     >>> with open(testfile, 'wb') as wf:
90     ...     wf.write(contents)
91     14
92
93     >>> d = DirectoryAllFilesFilter('/tmp')
94
95     >>> d.apply(contents)    # False is _any_ file in /tmp contains contents
96     False
97
98     >>> d.apply(b'That was a test')    # True otherwise
99     True
100
101     >>> os.remove(testfile)
102     """
103
104     def __init__(self, directory: str):
105         self.all_md5s = set()
106         super().__init__(directory)
107
108     def _update_file(self, filename: str, mtime: Optional[float] = None):
109         import file_utils
110
111         assert file_utils.does_file_exist(filename)
112         if mtime is None:
113             mtime = file_utils.get_file_raw_mtime(filename)
114         if self.mtime_by_filename.get(filename, 0) != mtime:
115             md5 = file_utils.get_file_md5(filename)
116             self.mtime_by_filename[filename] = mtime
117             self.md5_by_filename[filename] = md5
118             self.all_md5s.add(md5)
119
120     def apply(self, item: Any) -> bool:
121         self._update()
122         mem_hash = hashlib.md5()
123         mem_hash.update(item)
124         md5 = mem_hash.hexdigest()
125         return md5 not in self.all_md5s
126
127
128 if __name__ == '__main__':
129     import doctest
130
131     doctest.testmod()