03602d156a0929ba6b60ac094d264359e3426d6d
[python_utils.git] / directory_filter.py
1 #!/usr/bin/env python3
2
3 import hashlib
4 import logging
5 import os
6 from typing import Any, Optional
7
8 logger = logging.getLogger(__name__)
9
10
11 class DirectoryFileFilter(object):
12     """A predicate that will return False if / when a proposed file's
13     content to-be-written is identical to the contents of the file on
14     disk allowing calling code to safely skip the write.
15
16     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c2.txt'
17     >>> contents = b'This is a test'
18     >>> with open(testfile, 'wb') as wf:
19     ...     wf.write(contents)
20     14
21
22     >>> d = DirectoryFileFilter('/tmp')
23
24     >>> d.apply(contents, testfile)     # False if testfile already contains contents
25     False
26
27     >>> d.apply(b'That was a test', testfile)    # True otherwise
28     True
29
30     >>> os.remove(testfile)
31
32     """
33     def __init__(self, directory: str):
34         super().__init__()
35         import file_utils
36         if not file_utils.does_directory_exist(directory):
37             raise ValueError(directory)
38         self.directory = directory
39         self.md5_by_filename = {}
40         self.mtime_by_filename = {}
41         self._update()
42
43     def _update(self):
44         for direntry in os.scandir(self.directory):
45             if direntry.is_file(follow_symlinks=True):
46                 mtime = direntry.stat(follow_symlinks=True).st_mtime
47                 path = f'{self.directory}/{direntry.name}'
48                 self._update_file(path, mtime)
49
50     def _update_file(self, filename: str, mtime: Optional[float] = None):
51         import file_utils
52         assert file_utils.does_file_exist(filename)
53         if mtime is None:
54             mtime = file_utils.get_file_raw_mtime(filename)
55         if self.mtime_by_filename.get(filename, 0) != mtime:
56             md5 = file_utils.get_file_md5(filename)
57             logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
58             self.mtime_by_filename[filename] = mtime
59             self.md5_by_filename[filename] = md5
60
61     def apply(self, item: Any, filename: str) -> bool:
62         self._update_file(filename)
63         file_md5 = self.md5_by_filename.get(filename, 0)
64         logger.debug(f'{filename}\'s checksum is {file_md5}')
65         mem_hash = hashlib.md5()
66         mem_hash.update(item)
67         md5 = mem_hash.hexdigest()
68         logger.debug(f'Item\'s checksum is {md5}')
69         return md5 != file_md5
70
71
72 class DirectoryAllFilesFilter(DirectoryFileFilter):
73     """A predicate that will return False if a file to-be-written to a
74     particular directory is identical to any other file in that same
75     directory.
76
77     i.e. this is the same as the above except that its apply() method
78     will return true not only if the contents to be written are
79     identical to the contents of filename on the disk but also it
80     returns true if there exists some other file sitting in the same
81     directory which already contains those identical contents.
82
83     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt'
84
85     >>> contents = b'This is a test'
86     >>> with open(testfile, 'wb') as wf:
87     ...     wf.write(contents)
88     14
89
90     >>> d = DirectoryAllFilesFilter('/tmp')
91
92     >>> d.apply(contents)    # False is _any_ file in /tmp contains contents
93     False
94
95     >>> d.apply(b'That was a test')    # True otherwise
96     True
97
98     >>> os.remove(testfile)
99     """
100     def __init__(self, directory: str):
101         self.all_md5s = set()
102         super().__init__(directory)
103
104     def _update_file(self, filename: str, mtime: Optional[float] = None):
105         import file_utils
106         assert file_utils.does_file_exist(filename)
107         if mtime is None:
108             mtime = file_utils.get_file_raw_mtime(filename)
109         if self.mtime_by_filename.get(filename, 0) != mtime:
110             md5 = file_utils.get_file_md5(filename)
111             self.mtime_by_filename[filename] = mtime
112             self.md5_by_filename[filename] = md5
113             self.all_md5s.add(md5)
114
115     def apply(self, item: Any) -> bool:
116         self._update()
117         mem_hash = hashlib.md5()
118         mem_hash.update(item)
119         md5 = mem_hash.hexdigest()
120         return md5 not in self.all_md5s
121
122
123 if __name__ == '__main__':
124     import doctest
125     doctest.testmod()