Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / tests / shared_dict_test.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """shared_dict unittest."""
6
7 import random
8 import unittest
9
10 import parallelize as p
11 import smart_future
12 import unittest_utils
13 from collect.shared_dict import SharedDict
14
15
16 class SharedDictTest(unittest.TestCase):
17     @p.parallelize(method=p.Method.PROCESS)
18     def doit(self, n: int, dict_name: str, parent_lock_id: int):
19         assert id(SharedDict.LOCK) == parent_lock_id
20         d = SharedDict(dict_name, None)
21         try:
22             msg = f'Hello from shard {n}'
23             for x in range(0, 1000):
24                 d[n] = msg
25                 self.assertTrue(n in d)
26                 self.assertEqual(msg, d[n])
27                 y = d.get(random.randrange(0, 99), None)
28             return n
29         finally:
30             d.close()
31
32     def test_basic_operations(self):
33         dict_name = 'test_shared_dict'
34         d = SharedDict(dict_name, 4096)
35         try:
36             self.assertEqual(dict_name, d.get_name())
37             results = []
38             for n in range(100):
39                 f = self.doit(n, d.get_name(), id(SharedDict.LOCK))
40                 results.append(f)
41             smart_future.wait_all(results)
42             for f in results:
43                 self.assertTrue(f.wrapped_future.done())
44             for k in d:
45                 self.assertEqual(d[k], f'Hello from shard {k}')
46             assert len(d) == 100
47         finally:
48             d.close()
49             d.cleanup()
50
51     @p.parallelize(method=p.Method.PROCESS)
52     def add_one(self, name: str, expected_lock_id: int):
53         d = SharedDict(name)
54         self.assertEqual(id(SharedDict.LOCK), expected_lock_id)
55         try:
56             for x in range(1000):
57                 with SharedDict.LOCK:
58                     d["sum"] += 1
59         finally:
60             d.close()
61
62     def test_locking_works(self):
63         dict_name = 'test_shared_dict_lock'
64         d = SharedDict(dict_name, 4096)
65         try:
66             d["sum"] = 0
67             results = []
68             for n in range(10):
69                 f = self.add_one(d.get_name(), id(SharedDict.LOCK))
70                 results.append(f)
71             smart_future.wait_all(results)
72             self.assertEqual(10000, d["sum"])
73         finally:
74             d.close()
75             d.cleanup()
76
77
78 if __name__ == '__main__':
79     unittest.main()