More work to improve documentation generated by sphinx. Also fixes
[pyutils.git] / src / pyutils / search / logical_search.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents.  The corpus and index are held in memory.
7 The query language contains AND, OR, NOT, and parenthesis to support
8 flexible search semantics.
9 """
10
11 from __future__ import annotations
12
13 import enum
14 import sys
15 from collections import defaultdict
16 from dataclasses import dataclass, field
17 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
18
19
20 class ParseError(Exception):
21     """An error encountered while parsing a logical search expression."""
22
23     def __init__(self, message: str):
24         super().__init__()
25         self.message = message
26
27
28 @dataclass
29 class Document:
30     """A class representing a searchable document."""
31
32     docid: str = ''
33     """A unique identifier for each document -- must be provided
34     by the caller.  See :meth:`python_modules.id_generator.get` or
35     :meth:`python_modules.string_utils.generate_uuid` for potential
36     sources."""
37
38     tags: Set[str] = field(default_factory=set)
39     """A set of tag strings for this document.  May be empty.  Tags
40     are simply text labels that are associated with a document and
41     may be used to search for it later.
42     """
43
44     properties: List[Tuple[str, str]] = field(default_factory=list)
45     """A list of key->value strings for this document.  May be empty.
46     Properties are more flexible tags that have both a label and a
47     value.  e.g. "category:mystery" or "author:smith"."""
48
49     reference: Optional[Any] = None
50     """An optional reference to something else for convenience;
51     interpreted only by caller code, ignored here.
52     """
53
54
55 class Operation(enum.Enum):
56     """A logical search query operation."""
57
58     QUERY = 1
59     CONJUNCTION = 2
60     DISJUNCTION = 3
61     INVERSION = 4
62
63     @staticmethod
64     def from_token(token: str):
65         table = {
66             "not": Operation.INVERSION,
67             "and": Operation.CONJUNCTION,
68             "or": Operation.DISJUNCTION,
69         }
70         return table.get(token, None)
71
72     def num_operands(self) -> Optional[int]:
73         table = {
74             Operation.INVERSION: 1,
75             Operation.CONJUNCTION: 2,
76             Operation.DISJUNCTION: 2,
77         }
78         return table.get(self, None)
79
80
81 class Corpus(object):
82     """A collection of searchable documents.  The caller can
83     add documents to it (or edit existing docs) via :meth:`add_doc`,
84     retrieve a document given its docid via :meth:`get_doc`, and
85     perform various lookups of documents.  The most interesting
86     lookup is implemented in :meth:`query`.
87
88     >>> c = Corpus()
89     >>> c.add_doc(Document(
90     ...                    docid=1,
91     ...                    tags=set(['urgent', 'important']),
92     ...                    properties=[
93     ...                                ('author', 'Scott'),
94     ...                                ('subject', 'your anniversary')
95     ...                    ],
96     ...                    reference=None,
97     ...                   )
98     ...          )
99     >>> c.add_doc(Document(
100     ...                    docid=2,
101     ...                    tags=set(['important']),
102     ...                    properties=[
103     ...                                ('author', 'Joe'),
104     ...                                ('subject', 'your performance at work')
105     ...                    ],
106     ...                    reference=None,
107     ...                   )
108     ...          )
109     >>> c.add_doc(Document(
110     ...                    docid=3,
111     ...                    tags=set(['urgent']),
112     ...                    properties=[
113     ...                                ('author', 'Scott'),
114     ...                                ('subject', 'car turning in front of you')
115     ...                    ],
116     ...                    reference=None,
117     ...                   )
118     ...          )
119     >>> c.query('author:Scott and important')
120     {1}
121     >>> c.query('*')
122     {1, 2, 3}
123     >>> c.query('*:*')
124     {1, 2, 3}
125     >>> c.query('*:Scott')
126     {1, 3}
127     """
128
129     def __init__(self) -> None:
130         self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
131         self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
132         self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
133         self.documents_by_docid: Dict[str, Document] = {}
134
135     def add_doc(self, doc: Document) -> None:
136         """Add a new Document to the Corpus.  Each Document must have a
137         distinct docid that will serve as its primary identifier.  If
138         the same Document is added multiple times, only the most
139         recent addition is indexed.  If two distinct documents with
140         the same docid are added, the latter klobbers the former in
141         the indexes.  See :meth:`python_modules.id_generator.get` or
142         :meth:`python_modules.string_utils.generate_uuid` for potential
143         sources of docids.
144
145         Each Document may have an optional set of tags which can be
146         used later in expressions to the query method.  These are simple
147         text labels.
148
149         Each Document may have an optional list of key->value tuples
150         which can be used later in expressions to the query method.
151
152         Document includes a user-defined "reference" field which is
153         never interpreted by this module.  This is meant to allow easy
154         mapping between Documents in this corpus and external objects
155         they may represent.
156
157         Args:
158             doc: the document to add or edit
159         """
160
161         if doc.docid in self.documents_by_docid:
162             # Handle collisions; assume that we are re-indexing the
163             # same document so remove it from the indexes before
164             # adding it back again.
165             colliding_doc = self.documents_by_docid[doc.docid]
166             assert colliding_doc.docid == doc.docid
167             del self.documents_by_docid[doc.docid]
168             for tag in colliding_doc.tags:
169                 self.docids_by_tag[tag].remove(doc.docid)
170             for key, value in colliding_doc.properties:
171                 self.docids_by_property[(key, value)].remove(doc.docid)
172                 self.docids_with_property[key].remove(doc.docid)
173
174         # Index the new Document
175         assert doc.docid not in self.documents_by_docid
176         self.documents_by_docid[doc.docid] = doc
177         for tag in doc.tags:
178             self.docids_by_tag[tag].add(doc.docid)
179         for key, value in doc.properties:
180             self.docids_by_property[(key, value)].add(doc.docid)
181             self.docids_with_property[key].add(doc.docid)
182
183     def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
184         """Return the set of docids that have a particular tag.
185
186         Args:
187             tag: the tag for which to search
188
189         Returns:
190             A set containing docids with the provided tag which
191             may be empty."""
192         return self.docids_by_tag[tag]
193
194     def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
195         """Return the set of docids with a tag that contains a str.
196
197         Args:
198             tag: the tag pattern for which to search
199
200         Returns:
201             A set containing docids with tags that match the pattern
202             provided.  e.g., if the arg was "foo" tags "football", "foobar",
203             and "food" all match.
204         """
205         ret = set()
206         for search_tag in self.docids_by_tag:
207             if tag in search_tag:
208                 for docid in self.docids_by_tag[search_tag]:
209                     ret.add(docid)
210         return ret
211
212     def get_docids_with_property(self, key: str) -> Set[str]:
213         """Return the set of docids that have a particular property no matter
214         what that property's value.
215
216         Args:
217             key: the key value to search for.
218
219         Returns:
220             A set of docids that contain the key (no matter what value)
221             which may be empty.
222         """
223         return self.docids_with_property[key]
224
225     def get_docids_by_property(self, key: str, value: str) -> Set[str]:
226         """Return the set of docids that have a particular property with a
227         particular value.
228
229         Args:
230             key: the key to search for
231             value: the value that key must have in order to match a doc.
232
233         Returns:
234             A set of docids that contain key with value which may be empty.
235         """
236         return self.docids_by_property[(key, value)]
237
238     def invert_docid_set(self, original: Set[str]) -> Set[str]:
239         """Invert a set of docids."""
240         return {docid for docid in self.documents_by_docid if docid not in original}
241
242     def get_doc(self, docid: str) -> Optional[Document]:
243         """Given a docid, retrieve the previously added Document.
244
245         Args:
246             docid: the docid to retrieve
247
248         Returns:
249             The Document with docid or None to indicate no match.
250         """
251         return self.documents_by_docid.get(docid, None)
252
253     def query(self, query: str) -> Optional[Set[str]]:
254         """Query the corpus for documents that match a logical expression.
255
256         Args:
257             query: the logical query expressed using a simple language
258                 that understands conjunction (and operator), disjunction
259                 (or operator) and inversion (not operator) as well as
260                 parenthesis.  Here are some legal sample queries::
261
262                     tag1 and tag2 and not tag3
263
264                     (tag1 or tag2) and (tag3 or tag4)
265
266                     (tag1 and key2:value2) or (tag2 and key1:value1)
267
268                     key:*
269
270                     tag1 and key:*
271
272         Returns:
273             A (potentially empty) set of docids for the matching
274             (previously added) documents or None on error.
275         """
276
277         try:
278             root = self._parse_query(query)
279         except ParseError as e:
280             print(e.message, file=sys.stderr)
281             return None
282         return root.eval()
283
284     def _parse_query(self, query: str):
285         """Internal parse helper; prefer to use query instead."""
286
287         parens = set(["(", ")"])
288         and_or = set(["and", "or"])
289
290         def operator_precedence(token: str) -> Optional[int]:
291             table = {
292                 "(": 4,  # higher
293                 ")": 4,
294                 "not": 3,
295                 "and": 2,
296                 "or": 1,  # lower
297             }
298             return table.get(token, None)
299
300         def is_operator(token: str) -> bool:
301             return operator_precedence(token) is not None
302
303         def lex(query: str):
304             tokens = query.split()
305             for token in tokens:
306                 # Handle ( and ) operators stuck to the ends of tokens
307                 # that split() doesn't understand.
308                 if len(token) > 1:
309                     first = token[0]
310                     if first in parens:
311                         tail = token[1:]
312                         yield first
313                         token = tail
314                     last = token[-1]
315                     if last in parens:
316                         head = token[0:-1]
317                         yield head
318                         token = last
319                 yield token
320
321         def evaluate(corpus: Corpus, stack: List[str]):
322             node_stack: List[Node] = []
323             for token in stack:
324                 node = None
325                 if not is_operator(token):
326                     node = Node(corpus, Operation.QUERY, [token])
327                 else:
328                     args = []
329                     operation = Operation.from_token(token)
330                     operand_count = operation.num_operands()
331                     if len(node_stack) < operand_count:
332                         raise ParseError(
333                             f"Incorrect number of operations for {operation}"
334                         )
335                     for _ in range(operation.num_operands()):
336                         args.append(node_stack.pop())
337                     node = Node(corpus, operation, args)
338                 node_stack.append(node)
339             return node_stack[0]
340
341         output_stack = []
342         operator_stack = []
343         for token in lex(query):
344             if not is_operator(token):
345                 output_stack.append(token)
346                 continue
347
348             # token is an operator...
349             if token == "(":
350                 operator_stack.append(token)
351             elif token == ")":
352                 ok = False
353                 while len(operator_stack) > 0:
354                     pop_operator = operator_stack.pop()
355                     if pop_operator != "(":
356                         output_stack.append(pop_operator)
357                     else:
358                         ok = True
359                         break
360                 if not ok:
361                     raise ParseError("Unbalanced parenthesis in query expression")
362
363             # and, or, not
364             else:
365                 my_precedence = operator_precedence(token)
366                 if my_precedence is None:
367                     raise ParseError(f"Unknown operator: {token}")
368                 while len(operator_stack) > 0:
369                     peek_operator = operator_stack[-1]
370                     if not is_operator(peek_operator) or peek_operator == "(":
371                         break
372                     peek_precedence = operator_precedence(peek_operator)
373                     if peek_precedence is None:
374                         raise ParseError("Internal error")
375                     if (
376                         (peek_precedence < my_precedence)
377                         or (peek_precedence == my_precedence)
378                         and (peek_operator not in and_or)
379                     ):
380                         break
381                     output_stack.append(operator_stack.pop())
382                 operator_stack.append(token)
383         while len(operator_stack) > 0:
384             token = operator_stack.pop()
385             if token in parens:
386                 raise ParseError("Unbalanced parenthesis in query expression")
387             output_stack.append(token)
388         return evaluate(self, output_stack)
389
390
391 class Node(object):
392     """A query AST node."""
393
394     def __init__(
395         self,
396         corpus: Corpus,
397         op: Operation,
398         operands: Sequence[Union[Node, str]],
399     ):
400         self.corpus = corpus
401         self.op = op
402         self.operands = operands
403
404     def eval(self) -> Set[str]:
405         """Evaluate this node."""
406
407         evaled_operands: List[Union[Set[str], str]] = []
408         for operand in self.operands:
409             if isinstance(operand, Node):
410                 evaled_operands.append(operand.eval())
411             elif isinstance(operand, str):
412                 evaled_operands.append(operand)
413             else:
414                 raise ParseError(f"Unexpected operand: {operand}")
415
416         retval = set()
417         if self.op is Operation.QUERY:
418             for tag in evaled_operands:
419                 if isinstance(tag, str):
420                     if ":" in tag:
421                         try:
422                             key, value = tag.split(":")
423                         except ValueError as v:
424                             raise ParseError(
425                                 f'Invalid key:value syntax at "{tag}"'
426                             ) from v
427
428                         if key == '*':
429                             r = set()
430                             for kv, s in self.corpus.docids_by_property.items():
431                                 if value in ('*', kv[1]):
432                                     r.update(s)
433                         else:
434                             if value == '*':
435                                 r = self.corpus.get_docids_with_property(key)
436                             else:
437                                 r = self.corpus.get_docids_by_property(key, value)
438                     else:
439                         if tag == '*':
440                             r = set()
441                             for s in self.corpus.docids_by_tag.values():
442                                 r.update(s)
443                         else:
444                             r = self.corpus.get_docids_by_exact_tag(tag)
445                     retval.update(r)
446                 else:
447                     raise ParseError(f"Unexpected query {tag}")
448         elif self.op is Operation.DISJUNCTION:
449             if len(evaled_operands) != 2:
450                 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
451             retval.update(evaled_operands[0])
452             retval.update(evaled_operands[1])
453         elif self.op is Operation.CONJUNCTION:
454             if len(evaled_operands) != 2:
455                 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
456             retval.update(evaled_operands[0])
457             retval = retval.intersection(evaled_operands[1])
458         elif self.op is Operation.INVERSION:
459             if len(evaled_operands) != 1:
460                 raise ParseError("Operation.INVERSION (not) expects one operand.")
461             _ = evaled_operands[0]
462             if isinstance(_, set):
463                 retval.update(self.corpus.invert_docid_set(_))
464             else:
465                 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
466         return retval
467
468
469 if __name__ == '__main__':
470     import doctest
471
472     doctest.testmod()