6985831fc3a6e3d8fe24550c831293e40383b9a6
[python_utils.git] / directory_filter.py
1 #!/usr/bin/env python3
2
3 """Two predicates that can help avoid unnecessary disk I/O by
4 detecting if a particular file is identical to the contents about to
5 be written or if a particular directory already contains a file that
6 is identical to the one to be written.  See class docs below for
7 examples."""
8
9 import hashlib
10 import logging
11 import os
12 from typing import Any, Dict, Optional, Set
13
14 logger = logging.getLogger(__name__)
15
16
17 class DirectoryFileFilter(object):
18     """A predicate that will return False if / when a proposed file's
19     content to-be-written is identical to the contents of the file on
20     disk allowing calling code to safely skip the write.
21
22     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c2.txt'
23     >>> contents = b'This is a test'
24     >>> with open(testfile, 'wb') as wf:
25     ...     wf.write(contents)
26     14
27
28     >>> d = DirectoryFileFilter('/tmp')
29
30     >>> d.apply(contents, testfile)     # False if testfile already contains contents
31     False
32
33     >>> d.apply(b'That was a test', testfile)    # True otherwise
34     True
35
36     >>> os.remove(testfile)
37
38     """
39
40     def __init__(self, directory: str):
41         super().__init__()
42         import file_utils
43
44         if not file_utils.does_directory_exist(directory):
45             raise ValueError(directory)
46         self.directory = directory
47         self.md5_by_filename: Dict[str, str] = {}
48         self.mtime_by_filename: Dict[str, float] = {}
49         self._update()
50
51     def _update(self):
52         for direntry in os.scandir(self.directory):
53             if direntry.is_file(follow_symlinks=True):
54                 mtime = direntry.stat(follow_symlinks=True).st_mtime
55                 path = f'{self.directory}/{direntry.name}'
56                 self._update_file(path, mtime)
57
58     def _update_file(self, filename: str, mtime: Optional[float] = None):
59         import file_utils
60
61         assert file_utils.does_file_exist(filename)
62         if mtime is None:
63             mtime = file_utils.get_file_raw_mtime(filename)
64         assert mtime is not None
65         if self.mtime_by_filename.get(filename, 0) != mtime:
66             md5 = file_utils.get_file_md5(filename)
67             logger.debug('Computed/stored %s\'s MD5 at ts=%.2f (%s)', filename, mtime, md5)
68             self.mtime_by_filename[filename] = mtime
69             self.md5_by_filename[filename] = md5
70
71     def apply(self, item: Any, filename: str) -> bool:
72         self._update_file(filename)
73         file_md5 = self.md5_by_filename.get(filename, 0)
74         logger.debug('%s\'s checksum is %s', filename, file_md5)
75         mem_hash = hashlib.md5()
76         mem_hash.update(item)
77         md5 = mem_hash.hexdigest()
78         logger.debug('Item\'s checksum is %s', md5)
79         return md5 != file_md5
80
81
82 class DirectoryAllFilesFilter(DirectoryFileFilter):
83     """A predicate that will return False if a file to-be-written to a
84     particular directory is identical to any other file in that same
85     directory.
86
87     i.e. this is the same as the above except that its apply() method
88     will return true not only if the contents to be written are
89     identical to the contents of filename on the disk but also it
90     returns true if there exists some other file sitting in the same
91     directory which already contains those identical contents.
92
93     >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt'
94
95     >>> contents = b'This is a test'
96     >>> with open(testfile, 'wb') as wf:
97     ...     wf.write(contents)
98     14
99
100     >>> d = DirectoryAllFilesFilter('/tmp')
101
102     >>> d.apply(contents)    # False is _any_ file in /tmp contains contents
103     False
104
105     >>> d.apply(b'That was a test')    # True otherwise
106     True
107
108     >>> os.remove(testfile)
109     """
110
111     def __init__(self, directory: str):
112         self.all_md5s: Set[str] = set()
113         super().__init__(directory)
114
115     def _update_file(self, filename: str, mtime: Optional[float] = None):
116         import file_utils
117
118         assert file_utils.does_file_exist(filename)
119         if mtime is None:
120             mtime = file_utils.get_file_raw_mtime(filename)
121         assert mtime is not None
122         if self.mtime_by_filename.get(filename, 0) != mtime:
123             md5 = file_utils.get_file_md5(filename)
124             self.mtime_by_filename[filename] = mtime
125             self.md5_by_filename[filename] = md5
126             self.all_md5s.add(md5)
127
128     def apply(self, item: Any, ignored_filename: str = None) -> bool:
129         assert ignored_filename is None
130         self._update()
131         mem_hash = hashlib.md5()
132         mem_hash.update(item)
133         md5 = mem_hash.hexdigest()
134         return md5 not in self.all_md5s
135
136
137 if __name__ == '__main__':
138     import doctest
139
140     doctest.testmod()