From 2ced8b93ae2b43bf52570cd3e97f8de91e39ee75 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Sun, 19 Sep 2021 17:45:04 -0700 Subject: [PATCH 1/1] Persistent state --- persistent_state.py | 51 ++++++++++++++++++++++++++++++++++ tests/persistent_state_test.py | 33 ++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 persistent_state.py create mode 100755 tests/persistent_state_test.py diff --git a/persistent_state.py b/persistent_state.py new file mode 100644 index 0000000..1607b5e --- /dev/null +++ b/persistent_state.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 + +import pickle +from typing import Any, Mapping + +import file_utils + + +class PersistentState(object): + """ + An object that behaves like a persistent hash map; it can be + accessed to store key-value pairs but also saved to disk and read + back from disk easily. + + """ + def __init__(self, filename: str, *, max_age_sec: int = None): + self.filename = filename + if file_utils.does_file_exist(self.filename): + if max_age_sec is not None: + age = file_utils.get_file_mtime_age_seconds(self.filename) + if age < max_age_sec: + self.state = self.load() + else: + self.state = {} + else: + self.state = self.load() + else: + self.state = {} + + def __getitem__(self, key: str) -> Any: + return self.state.get(key, "") + + def __setitem__(self, key: str, value: Any): + self.state[key] = value + + def __delitem__(self, key: str): + del self.state[key] + + def __iter__(self): + return iter(self.state) + + def __len__(self): + return len(self.state) + + def save(self) -> None: + with open(self.filename, 'wb') as f: + pickle.dump(self.state, f, pickle.HIGHEST_PROTOCOL) + + def load(self) -> Mapping[str, Any]: + with open(self.filename, 'rb') as f: + return(pickle.load(f)) diff --git a/tests/persistent_state_test.py b/tests/persistent_state_test.py new file mode 100755 index 0000000..f9afdde --- /dev/null +++ b/tests/persistent_state_test.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +import os +import unittest + +import bootstrap +import persistent_state as ps + +import unittest_utils as uu + + +class TestPersistentState(unittest.TestCase): + def test_it_all_baby(self): + filename = '/tmp/.persistent_state_test.bin' + os.system(f'/bin/rm {filename}') + s = ps.PersistentState(filename) + self.assertEqual(0, len(s)) + s['test'] = 123 + self.assertEqual(123, s['test']) + self.assertEqual(1, len(s)) + s['ing'] = 234 + self.assertEqual(2, len(s)) + del s['ing'] + self.assertEqual(1, len(s)) + s.save() + t = ps.PersistentState(filename) + self.assertEqual(1, len(t)) + self.assertEqual(123, t['test']) + + +if __name__ == '__main__': + bootstrap.initialize(unittest.main)() + -- 2.45.2