Ran black code formatter on everything.
[python_utils.git] / directory_filter.py
1 #!/usr/bin/env python3
2
3 import hashlib
4 import logging
5 import os
6 from typing import Any, Optional
7
8 logger = logging.getLogger(__name__)
9
10
11 class DirectoryFileFilter(object):
12     """A predicate that will return False if / when a proposed file's
13     content to-be-written is identical to the contents of the file on
14     disk allowing calling code to safely skip the write.
15
16     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c2.txt'
17     >>> contents = b'This is a test'
18     >>> with open(testfile, 'wb') as wf:
19     ...     wf.write(contents)
20     14
21
22     >>> d = DirectoryFileFilter('/tmp')
23
24     >>> d.apply(contents, testfile)     # False if testfile already contains contents
25     False
26
27     >>> d.apply(b'That was a test', testfile)    # True otherwise
28     True
29
30     >>> os.remove(testfile)
31
32     """
33
34     def __init__(self, directory: str):
35         super().__init__()
36         import file_utils
37
38         if not file_utils.does_directory_exist(directory):
39             raise ValueError(directory)
40         self.directory = directory
41         self.md5_by_filename = {}
42         self.mtime_by_filename = {}
43         self._update()
44
45     def _update(self):
46         for direntry in os.scandir(self.directory):
47             if direntry.is_file(follow_symlinks=True):
48                 mtime = direntry.stat(follow_symlinks=True).st_mtime
49                 path = f'{self.directory}/{direntry.name}'
50                 self._update_file(path, mtime)
51
52     def _update_file(self, filename: str, mtime: Optional[float] = None):
53         import file_utils
54
55         assert file_utils.does_file_exist(filename)
56         if mtime is None:
57             mtime = file_utils.get_file_raw_mtime(filename)
58         if self.mtime_by_filename.get(filename, 0) != mtime:
59             md5 = file_utils.get_file_md5(filename)
60             logger.debug(
61                 f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})'
62             )
63             self.mtime_by_filename[filename] = mtime
64             self.md5_by_filename[filename] = md5
65
66     def apply(self, item: Any, filename: str) -> bool:
67         self._update_file(filename)
68         file_md5 = self.md5_by_filename.get(filename, 0)
69         logger.debug(f'{filename}\'s checksum is {file_md5}')
70         mem_hash = hashlib.md5()
71         mem_hash.update(item)
72         md5 = mem_hash.hexdigest()
73         logger.debug(f'Item\'s checksum is {md5}')
74         return md5 != file_md5
75
76
77 class DirectoryAllFilesFilter(DirectoryFileFilter):
78     """A predicate that will return False if a file to-be-written to a
79     particular directory is identical to any other file in that same
80     directory.
81
82     i.e. this is the same as the above except that its apply() method
83     will return true not only if the contents to be written are
84     identical to the contents of filename on the disk but also it
85     returns true if there exists some other file sitting in the same
86     directory which already contains those identical contents.
87
88     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt'
89
90     >>> contents = b'This is a test'
91     >>> with open(testfile, 'wb') as wf:
92     ...     wf.write(contents)
93     14
94
95     >>> d = DirectoryAllFilesFilter('/tmp')
96
97     >>> d.apply(contents)    # False is _any_ file in /tmp contains contents
98     False
99
100     >>> d.apply(b'That was a test')    # True otherwise
101     True
102
103     >>> os.remove(testfile)
104     """
105
106     def __init__(self, directory: str):
107         self.all_md5s = set()
108         super().__init__(directory)
109
110     def _update_file(self, filename: str, mtime: Optional[float] = None):
111         import file_utils
112
113         assert file_utils.does_file_exist(filename)
114         if mtime is None:
115             mtime = file_utils.get_file_raw_mtime(filename)
116         if self.mtime_by_filename.get(filename, 0) != mtime:
117             md5 = file_utils.get_file_md5(filename)
118             self.mtime_by_filename[filename] = mtime
119             self.md5_by_filename[filename] = md5
120             self.all_md5s.add(md5)
121
122     def apply(self, item: Any) -> bool:
123         self._update()
124         mem_hash = hashlib.md5()
125         mem_hash.update(item)
126         md5 = mem_hash.hexdigest()
127         return md5 not in self.all_md5s
128
129
130 if __name__ == '__main__':
131     import doctest
132
133     doctest.testmod()