mypy clean!
[python_utils.git] / directory_filter.py
1 #!/usr/bin/env python3
2
3 import hashlib
4 import logging
5 import os
6 from typing import Any, Dict, Optional, Set
7
8 logger = logging.getLogger(__name__)
9
10
11 class DirectoryFileFilter(object):
12     """A predicate that will return False if / when a proposed file's
13     content to-be-written is identical to the contents of the file on
14     disk allowing calling code to safely skip the write.
15
16     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c2.txt'
17     >>> contents = b'This is a test'
18     >>> with open(testfile, 'wb') as wf:
19     ...     wf.write(contents)
20     14
21
22     >>> d = DirectoryFileFilter('/tmp')
23
24     >>> d.apply(contents, testfile)     # False if testfile already contains contents
25     False
26
27     >>> d.apply(b'That was a test', testfile)    # True otherwise
28     True
29
30     >>> os.remove(testfile)
31
32     """
33
34     def __init__(self, directory: str):
35         super().__init__()
36         import file_utils
37
38         if not file_utils.does_directory_exist(directory):
39             raise ValueError(directory)
40         self.directory = directory
41         self.md5_by_filename: Dict[str, str] = {}
42         self.mtime_by_filename: Dict[str, float] = {}
43         self._update()
44
45     def _update(self):
46         for direntry in os.scandir(self.directory):
47             if direntry.is_file(follow_symlinks=True):
48                 mtime = direntry.stat(follow_symlinks=True).st_mtime
49                 path = f'{self.directory}/{direntry.name}'
50                 self._update_file(path, mtime)
51
52     def _update_file(self, filename: str, mtime: Optional[float] = None):
53         import file_utils
54
55         assert file_utils.does_file_exist(filename)
56         if mtime is None:
57             mtime = file_utils.get_file_raw_mtime(filename)
58         assert mtime
59         if self.mtime_by_filename.get(filename, 0) != mtime:
60             md5 = file_utils.get_file_md5(filename)
61             logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
62             self.mtime_by_filename[filename] = mtime
63             self.md5_by_filename[filename] = md5
64
65     def apply(self, item: Any, filename: str) -> bool:
66         self._update_file(filename)
67         file_md5 = self.md5_by_filename.get(filename, 0)
68         logger.debug(f'{filename}\'s checksum is {file_md5}')
69         mem_hash = hashlib.md5()
70         mem_hash.update(item)
71         md5 = mem_hash.hexdigest()
72         logger.debug(f'Item\'s checksum is {md5}')
73         return md5 != file_md5
74
75
76 class DirectoryAllFilesFilter(DirectoryFileFilter):
77     """A predicate that will return False if a file to-be-written to a
78     particular directory is identical to any other file in that same
79     directory.
80
81     i.e. this is the same as the above except that its apply() method
82     will return true not only if the contents to be written are
83     identical to the contents of filename on the disk but also it
84     returns true if there exists some other file sitting in the same
85     directory which already contains those identical contents.
86
87     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt'
88
89     >>> contents = b'This is a test'
90     >>> with open(testfile, 'wb') as wf:
91     ...     wf.write(contents)
92     14
93
94     >>> d = DirectoryAllFilesFilter('/tmp')
95
96     >>> d.apply(contents)    # False is _any_ file in /tmp contains contents
97     False
98
99     >>> d.apply(b'That was a test')    # True otherwise
100     True
101
102     >>> os.remove(testfile)
103     """
104
105     def __init__(self, directory: str):
106         self.all_md5s: Set[str] = set()
107         super().__init__(directory)
108
109     def _update_file(self, filename: str, mtime: Optional[float] = None):
110         import file_utils
111
112         assert file_utils.does_file_exist(filename)
113         if mtime is None:
114             mtime = file_utils.get_file_raw_mtime(filename)
115         assert mtime
116         if self.mtime_by_filename.get(filename, 0) != mtime:
117             md5 = file_utils.get_file_md5(filename)
118             self.mtime_by_filename[filename] = mtime
119             self.md5_by_filename[filename] = md5
120             self.all_md5s.add(md5)
121
122     def apply(self, item: Any, ignored_filename: str = None) -> bool:
123         assert not ignored_filename
124         self._update()
125         mem_hash = hashlib.md5()
126         mem_hash.update(item)
127         md5 = mem_hash.hexdigest()
128         return md5 not in self.all_md5s
129
130
131 if __name__ == '__main__':
132     import doctest
133
134     doctest.testmod()