Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / logical_search.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents.  The corpus and index are held in memory.
7 """
8
9 from __future__ import annotations
10 import enum
11 import sys
12 from collections import defaultdict
13 from dataclasses import dataclass, field
14 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
15
16
17 class ParseError(Exception):
18     """An error encountered while parsing a logical search expression."""
19
20     def __init__(self, message: str):
21         super().__init__()
22         self.message = message
23
24
25 @dataclass
26 class Document:
27     """A class representing a searchable document."""
28
29     docid: str = ''
30     """A unique identifier for each document -- must be provided
31     by the caller.  See :meth:`python_modules.id_generator.get` or
32     :meth:`python_modules.string_utils.generate_uuid` for potential
33     sources."""
34
35     tags: Set[str] = field(default_factory=set)
36     """A set of tag strings for this document.  May be empty.  Tags
37     are simply text labels that are associated with a document and
38     may be used to search for it later.
39     """
40
41     properties: List[Tuple[str, str]] = field(default_factory=list)
42     """A list of key->value strings for this document.  May be empty.
43     Properties are more flexible tags that have both a label and a
44     value.  e.g. "category:mystery" or "author:smith"."""
45
46     reference: Optional[Any] = None
47     """An optional reference to something else for convenience;
48     interpreted only by caller code, ignored here.
49     """
50
51
52 class Operation(enum.Enum):
53     """A logical search query operation."""
54
55     QUERY = 1
56     CONJUNCTION = 2
57     DISJUNCTION = 3
58     INVERSION = 4
59
60     @staticmethod
61     def from_token(token: str):
62         table = {
63             "not": Operation.INVERSION,
64             "and": Operation.CONJUNCTION,
65             "or": Operation.DISJUNCTION,
66         }
67         return table.get(token, None)
68
69     def num_operands(self) -> Optional[int]:
70         table = {
71             Operation.INVERSION: 1,
72             Operation.CONJUNCTION: 2,
73             Operation.DISJUNCTION: 2,
74         }
75         return table.get(self, None)
76
77
78 class Corpus(object):
79     """A collection of searchable documents.  The caller can
80     add documents to it (or edit existing docs) via :meth:`add_doc`,
81     retrieve a document given its docid via :meth:`get_doc`, and
82     perform various lookups of documents.  The most interesting
83     lookup is implemented in :meth:`query`.
84
85     >>> c = Corpus()
86     >>> c.add_doc(Document(
87     ...                    docid=1,
88     ...                    tags=set(['urgent', 'important']),
89     ...                    properties=[
90     ...                                ('author', 'Scott'),
91     ...                                ('subject', 'your anniversary')
92     ...                    ],
93     ...                    reference=None,
94     ...                   )
95     ...          )
96     >>> c.add_doc(Document(
97     ...                    docid=2,
98     ...                    tags=set(['important']),
99     ...                    properties=[
100     ...                                ('author', 'Joe'),
101     ...                                ('subject', 'your performance at work')
102     ...                    ],
103     ...                    reference=None,
104     ...                   )
105     ...          )
106     >>> c.add_doc(Document(
107     ...                    docid=3,
108     ...                    tags=set(['urgent']),
109     ...                    properties=[
110     ...                                ('author', 'Scott'),
111     ...                                ('subject', 'car turning in front of you')
112     ...                    ],
113     ...                    reference=None,
114     ...                   )
115     ...          )
116     >>> c.query('author:Scott and important')
117     {1}
118     >>> c.query('*')
119     {1, 2, 3}
120     >>> c.query('*:*')
121     {1, 2, 3}
122     >>> c.query('*:Scott')
123     {1, 3}
124     """
125
126     def __init__(self) -> None:
127         self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
128         self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
129         self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
130         self.documents_by_docid: Dict[str, Document] = {}
131
132     def add_doc(self, doc: Document) -> None:
133         """Add a new Document to the Corpus.  Each Document must have a
134         distinct docid that will serve as its primary identifier.  If
135         the same Document is added multiple times, only the most
136         recent addition is indexed.  If two distinct documents with
137         the same docid are added, the latter klobbers the former in
138         the indexes.  See :meth:`python_modules.id_generator.get` or
139         :meth:`python_modules.string_utils.generate_uuid` for potential
140         sources of docids.
141
142         Each Document may have an optional set of tags which can be
143         used later in expressions to the query method.  These are simple
144         text labels.
145
146         Each Document may have an optional list of key->value tuples
147         which can be used later in expressions to the query method.
148
149         Document includes a user-defined "reference" field which is
150         never interpreted by this module.  This is meant to allow easy
151         mapping between Documents in this corpus and external objects
152         they may represent.
153
154         Args:
155             doc: the document to add or edit
156         """
157
158         if doc.docid in self.documents_by_docid:
159             # Handle collisions; assume that we are re-indexing the
160             # same document so remove it from the indexes before
161             # adding it back again.
162             colliding_doc = self.documents_by_docid[doc.docid]
163             assert colliding_doc.docid == doc.docid
164             del self.documents_by_docid[doc.docid]
165             for tag in colliding_doc.tags:
166                 self.docids_by_tag[tag].remove(doc.docid)
167             for key, value in colliding_doc.properties:
168                 self.docids_by_property[(key, value)].remove(doc.docid)
169                 self.docids_with_property[key].remove(doc.docid)
170
171         # Index the new Document
172         assert doc.docid not in self.documents_by_docid
173         self.documents_by_docid[doc.docid] = doc
174         for tag in doc.tags:
175             self.docids_by_tag[tag].add(doc.docid)
176         for key, value in doc.properties:
177             self.docids_by_property[(key, value)].add(doc.docid)
178             self.docids_with_property[key].add(doc.docid)
179
180     def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
181         """Return the set of docids that have a particular tag.
182
183         Args:
184             tag: the tag for which to search
185
186         Returns:
187             A set containing docids with the provided tag which
188             may be empty."""
189         return self.docids_by_tag[tag]
190
191     def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
192         """Return the set of docids with a tag that contains a str.
193
194         Args:
195             tag: the tag pattern for which to search
196
197         Returns:
198             A set containing docids with tags that match the pattern
199             provided.  e.g., if the arg was "foo" tags "football", "foobar",
200             and "food" all match.
201         """
202         ret = set()
203         for search_tag in self.docids_by_tag:
204             if tag in search_tag:
205                 for docid in self.docids_by_tag[search_tag]:
206                     ret.add(docid)
207         return ret
208
209     def get_docids_with_property(self, key: str) -> Set[str]:
210         """Return the set of docids that have a particular property no matter
211         what that property's value.
212
213         Args:
214             key: the key value to search for.
215
216         Returns:
217             A set of docids that contain the key (no matter what value)
218             which may be empty.
219         """
220         return self.docids_with_property[key]
221
222     def get_docids_by_property(self, key: str, value: str) -> Set[str]:
223         """Return the set of docids that have a particular property with a
224         particular value.
225
226         Args:
227             key: the key to search for
228             value: the value that key must have in order to match a doc.
229
230         Returns:
231             A set of docids that contain key with value which may be empty.
232         """
233         return self.docids_by_property[(key, value)]
234
235     def invert_docid_set(self, original: Set[str]) -> Set[str]:
236         """Invert a set of docids."""
237         return {docid for docid in self.documents_by_docid if docid not in original}
238
239     def get_doc(self, docid: str) -> Optional[Document]:
240         """Given a docid, retrieve the previously added Document.
241
242         Args:
243             docid: the docid to retrieve
244
245         Returns:
246             The Document with docid or None to indicate no match.
247         """
248         return self.documents_by_docid.get(docid, None)
249
250     def query(self, query: str) -> Optional[Set[str]]:
251         """Query the corpus for documents that match a logical expression.
252
253         Args:
254             query: the logical query expressed using a simple language
255                 that understands conjunction (and operator), disjunction
256                 (or operator) and inversion (not operator) as well as
257                 parenthesis.  Here are some legal sample queries::
258
259                     tag1 and tag2 and not tag3
260
261                     (tag1 or tag2) and (tag3 or tag4)
262
263                     (tag1 and key2:value2) or (tag2 and key1:value1)
264
265                     key:*
266
267                     tag1 and key:*
268
269         Returns:
270             A (potentially empty) set of docids for the matching
271             (previously added) documents or None on error.
272         """
273
274         try:
275             root = self._parse_query(query)
276         except ParseError as e:
277             print(e.message, file=sys.stderr)
278             return None
279         return root.eval()
280
281     def _parse_query(self, query: str):
282         """Internal parse helper; prefer to use query instead."""
283
284         parens = set(["(", ")"])
285         and_or = set(["and", "or"])
286
287         def operator_precedence(token: str) -> Optional[int]:
288             table = {
289                 "(": 4,  # higher
290                 ")": 4,
291                 "not": 3,
292                 "and": 2,
293                 "or": 1,  # lower
294             }
295             return table.get(token, None)
296
297         def is_operator(token: str) -> bool:
298             return operator_precedence(token) is not None
299
300         def lex(query: str):
301             tokens = query.split()
302             for token in tokens:
303                 # Handle ( and ) operators stuck to the ends of tokens
304                 # that split() doesn't understand.
305                 if len(token) > 1:
306                     first = token[0]
307                     if first in parens:
308                         tail = token[1:]
309                         yield first
310                         token = tail
311                     last = token[-1]
312                     if last in parens:
313                         head = token[0:-1]
314                         yield head
315                         token = last
316                 yield token
317
318         def evaluate(corpus: Corpus, stack: List[str]):
319             node_stack: List[Node] = []
320             for token in stack:
321                 node = None
322                 if not is_operator(token):
323                     node = Node(corpus, Operation.QUERY, [token])
324                 else:
325                     args = []
326                     operation = Operation.from_token(token)
327                     operand_count = operation.num_operands()
328                     if len(node_stack) < operand_count:
329                         raise ParseError(f"Incorrect number of operations for {operation}")
330                     for _ in range(operation.num_operands()):
331                         args.append(node_stack.pop())
332                     node = Node(corpus, operation, args)
333                 node_stack.append(node)
334             return node_stack[0]
335
336         output_stack = []
337         operator_stack = []
338         for token in lex(query):
339             if not is_operator(token):
340                 output_stack.append(token)
341                 continue
342
343             # token is an operator...
344             if token == "(":
345                 operator_stack.append(token)
346             elif token == ")":
347                 ok = False
348                 while len(operator_stack) > 0:
349                     pop_operator = operator_stack.pop()
350                     if pop_operator != "(":
351                         output_stack.append(pop_operator)
352                     else:
353                         ok = True
354                         break
355                 if not ok:
356                     raise ParseError("Unbalanced parenthesis in query expression")
357
358             # and, or, not
359             else:
360                 my_precedence = operator_precedence(token)
361                 if my_precedence is None:
362                     raise ParseError(f"Unknown operator: {token}")
363                 while len(operator_stack) > 0:
364                     peek_operator = operator_stack[-1]
365                     if not is_operator(peek_operator) or peek_operator == "(":
366                         break
367                     peek_precedence = operator_precedence(peek_operator)
368                     if peek_precedence is None:
369                         raise ParseError("Internal error")
370                     if (
371                         (peek_precedence < my_precedence)
372                         or (peek_precedence == my_precedence)
373                         and (peek_operator not in and_or)
374                     ):
375                         break
376                     output_stack.append(operator_stack.pop())
377                 operator_stack.append(token)
378         while len(operator_stack) > 0:
379             token = operator_stack.pop()
380             if token in parens:
381                 raise ParseError("Unbalanced parenthesis in query expression")
382             output_stack.append(token)
383         return evaluate(self, output_stack)
384
385
386 class Node(object):
387     """A query AST node."""
388
389     def __init__(
390         self,
391         corpus: Corpus,
392         op: Operation,
393         operands: Sequence[Union[Node, str]],
394     ):
395         self.corpus = corpus
396         self.op = op
397         self.operands = operands
398
399     def eval(self) -> Set[str]:
400         """Evaluate this node."""
401
402         evaled_operands: List[Union[Set[str], str]] = []
403         for operand in self.operands:
404             if isinstance(operand, Node):
405                 evaled_operands.append(operand.eval())
406             elif isinstance(operand, str):
407                 evaled_operands.append(operand)
408             else:
409                 raise ParseError(f"Unexpected operand: {operand}")
410
411         retval = set()
412         if self.op is Operation.QUERY:
413             for tag in evaled_operands:
414                 if isinstance(tag, str):
415                     if ":" in tag:
416                         try:
417                             key, value = tag.split(":")
418                         except ValueError as v:
419                             raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
420
421                         if key == '*':
422                             r = set()
423                             for kv, s in self.corpus.docids_by_property.items():
424                                 if value in ('*', kv[1]):
425                                     r.update(s)
426                         else:
427                             if value == '*':
428                                 r = self.corpus.get_docids_with_property(key)
429                             else:
430                                 r = self.corpus.get_docids_by_property(key, value)
431                     else:
432                         if tag == '*':
433                             r = set()
434                             for s in self.corpus.docids_by_tag.values():
435                                 r.update(s)
436                         else:
437                             r = self.corpus.get_docids_by_exact_tag(tag)
438                     retval.update(r)
439                 else:
440                     raise ParseError(f"Unexpected query {tag}")
441         elif self.op is Operation.DISJUNCTION:
442             if len(evaled_operands) != 2:
443                 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
444             retval.update(evaled_operands[0])
445             retval.update(evaled_operands[1])
446         elif self.op is Operation.CONJUNCTION:
447             if len(evaled_operands) != 2:
448                 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
449             retval.update(evaled_operands[0])
450             retval = retval.intersection(evaled_operands[1])
451         elif self.op is Operation.INVERSION:
452             if len(evaled_operands) != 1:
453                 raise ParseError("Operation.INVERSION (not) expects one operand.")
454             _ = evaled_operands[0]
455             if isinstance(_, set):
456                 retval.update(self.corpus.invert_docid_set(_))
457             else:
458                 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
459         return retval
460
461
462 if __name__ == '__main__':
463     import doctest
464
465     doctest.testmod()