More spring cleaning.
[pyutils.git] / src / pyutils / search / logical_search.py
1 #!/usr/bin/env python3
2 # pylint: disable=too-many-nested-blocks
3
4 # © Copyright 2021-2023, Scott Gasch
5
6 """This is a module concerned with the creation of and searching of a
7 corpus of documents.  The corpus and index are held in memory.
8 The query language contains AND, OR, NOT, and parenthesis to support
9 flexible search semantics.
10 """
11
12 from __future__ import annotations
13
14 import enum
15 import sys
16 from collections import defaultdict
17 from dataclasses import dataclass, field
18 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
19
20
21 class ParseError(Exception):
22     """An error encountered while parsing a logical search expression."""
23
24     def __init__(self, message: str):
25         super().__init__()
26         self.message = message
27
28
29 @dataclass
30 class Document:
31     """A class representing a searchable document."""
32
33     docid: str = ""
34     """A unique identifier for each document -- must be provided
35     by the caller.  See :meth:`python_modules.id_generator.get` or
36     :meth:`python_modules.string_utils.generate_uuid` for potential
37     sources."""
38
39     tags: Set[str] = field(default_factory=set)
40     """A set of tag strings for this document.  May be empty.  Tags
41     are simply text labels that are associated with a document and
42     may be used to search for it later.
43     """
44
45     properties: List[Tuple[str, str]] = field(default_factory=list)
46     """A list of key->value strings for this document.  May be empty.
47     Properties are more flexible tags that have both a label and a
48     value.  e.g. "category:mystery" or "author:smith"."""
49
50     reference: Optional[Any] = None
51     """An optional reference to something else for convenience;
52     interpreted only by caller code, ignored here.
53     """
54
55
56 class Operation(enum.Enum):
57     """A logical search query operation."""
58
59     QUERY = 1
60     CONJUNCTION = 2
61     DISJUNCTION = 3
62     INVERSION = 4
63
64     @staticmethod
65     def from_token(token: str):
66         table = {
67             "not": Operation.INVERSION,
68             "and": Operation.CONJUNCTION,
69             "or": Operation.DISJUNCTION,
70         }
71         return table.get(token, None)
72
73     def num_operands(self) -> Optional[int]:
74         table = {
75             Operation.INVERSION: 1,
76             Operation.CONJUNCTION: 2,
77             Operation.DISJUNCTION: 2,
78         }
79         return table.get(self, None)
80
81
82 class Corpus(object):
83     """A collection of searchable documents.  The caller can
84     add documents to it (or edit existing docs) via :meth:`add_doc`,
85     retrieve a document given its docid via :meth:`get_doc`, and
86     perform various lookups of documents.  The most interesting
87     lookup is implemented in :meth:`query`.
88
89     >>> c = Corpus()
90     >>> c.add_doc(Document(
91     ...                    docid=1,
92     ...                    tags=set(['urgent', 'important']),
93     ...                    properties=[
94     ...                                ('author', 'Scott'),
95     ...                                ('subject', 'your anniversary')
96     ...                    ],
97     ...                    reference=None,
98     ...                   )
99     ...          )
100     >>> c.add_doc(Document(
101     ...                    docid=2,
102     ...                    tags=set(['important']),
103     ...                    properties=[
104     ...                                ('author', 'Joe'),
105     ...                                ('subject', 'your performance at work')
106     ...                    ],
107     ...                    reference=None,
108     ...                   )
109     ...          )
110     >>> c.add_doc(Document(
111     ...                    docid=3,
112     ...                    tags=set(['urgent']),
113     ...                    properties=[
114     ...                                ('author', 'Scott'),
115     ...                                ('subject', 'car turning in front of you')
116     ...                    ],
117     ...                    reference=None,
118     ...                   )
119     ...          )
120     >>> c.query('author:Scott and important')
121     {1}
122     >>> c.query('*')
123     {1, 2, 3}
124     >>> c.query('*:*')
125     {1, 2, 3}
126     >>> c.query('*:Scott')
127     {1, 3}
128     """
129
130     def __init__(self) -> None:
131         self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
132         self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
133         self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
134         self.documents_by_docid: Dict[str, Document] = {}
135
136     def add_doc(self, doc: Document) -> None:
137         """Add a new Document to the Corpus.  Each Document must have a
138         distinct docid that will serve as its primary identifier.  If
139         the same Document is added multiple times, only the most
140         recent addition is indexed.  If two distinct documents with
141         the same docid are added, the latter klobbers the former in
142         the indexes.  See :meth:`python_modules.id_generator.get` or
143         :meth:`python_modules.string_utils.generate_uuid` for potential
144         sources of docids.
145
146         Each Document may have an optional set of tags which can be
147         used later in expressions to the query method.  These are simple
148         text labels.
149
150         Each Document may have an optional list of key->value tuples
151         which can be used later in expressions to the query method.
152
153         Document includes a user-defined "reference" field which is
154         never interpreted by this module.  This is meant to allow easy
155         mapping between Documents in this corpus and external objects
156         they may represent.
157
158         Args:
159             doc: the document to add or edit
160         """
161
162         if doc.docid in self.documents_by_docid:
163             # Handle collisions; assume that we are re-indexing the
164             # same document so remove it from the indexes before
165             # adding it back again.
166             colliding_doc = self.documents_by_docid[doc.docid]
167             assert colliding_doc.docid == doc.docid
168             del self.documents_by_docid[doc.docid]
169             for tag in colliding_doc.tags:
170                 self.docids_by_tag[tag].remove(doc.docid)
171             for key, value in colliding_doc.properties:
172                 self.docids_by_property[(key, value)].remove(doc.docid)
173                 self.docids_with_property[key].remove(doc.docid)
174
175         # Index the new Document
176         assert doc.docid not in self.documents_by_docid
177         self.documents_by_docid[doc.docid] = doc
178         for tag in doc.tags:
179             self.docids_by_tag[tag].add(doc.docid)
180         for key, value in doc.properties:
181             self.docids_by_property[(key, value)].add(doc.docid)
182             self.docids_with_property[key].add(doc.docid)
183
184     def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
185         """Return the set of docids that have a particular tag.
186
187         Args:
188             tag: the tag for which to search
189
190         Returns:
191             A set containing docids with the provided tag which
192             may be empty."""
193         return self.docids_by_tag[tag]
194
195     def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
196         """Return the set of docids with a tag that contains a str.
197
198         Args:
199             tag: the tag pattern for which to search
200
201         Returns:
202             A set containing docids with tags that match the pattern
203             provided.  e.g., if the arg was "foo" tags "football", "foobar",
204             and "food" all match.
205         """
206         ret = set()
207         for search_tag in self.docids_by_tag:
208             if tag in search_tag:
209                 for docid in self.docids_by_tag[search_tag]:
210                     ret.add(docid)
211         return ret
212
213     def get_docids_with_property(self, key: str) -> Set[str]:
214         """Return the set of docids that have a particular property no matter
215         what that property's value.
216
217         Args:
218             key: the key value to search for.
219
220         Returns:
221             A set of docids that contain the key (no matter what value)
222             which may be empty.
223         """
224         return self.docids_with_property[key]
225
226     def get_docids_by_property(self, key: str, value: str) -> Set[str]:
227         """Return the set of docids that have a particular property with a
228         particular value.
229
230         Args:
231             key: the key to search for
232             value: the value that key must have in order to match a doc.
233
234         Returns:
235             A set of docids that contain key with value which may be empty.
236         """
237         return self.docids_by_property[(key, value)]
238
239     def invert_docid_set(self, original: Set[str]) -> Set[str]:
240         """Invert a set of docids."""
241         return {docid for docid in self.documents_by_docid if docid not in original}
242
243     def get_doc(self, docid: str) -> Optional[Document]:
244         """Given a docid, retrieve the previously added Document.
245
246         Args:
247             docid: the docid to retrieve
248
249         Returns:
250             The Document with docid or None to indicate no match.
251         """
252         return self.documents_by_docid.get(docid, None)
253
254     def query(self, query: str) -> Optional[Set[str]]:
255         """Query the corpus for documents that match a logical expression.
256
257         Args:
258             query: the logical query expressed using a simple language
259                 that understands conjunction (and operator), disjunction
260                 (or operator) and inversion (not operator) as well as
261                 parenthesis.  Here are some legal sample queries::
262
263                     tag1 and tag2 and not tag3
264
265                     (tag1 or tag2) and (tag3 or tag4)
266
267                     (tag1 and key2:value2) or (tag2 and key1:value1)
268
269                     key:*
270
271                     tag1 and key:*
272
273         Returns:
274             A (potentially empty) set of docids for the matching
275             (previously added) documents or None on error.
276         """
277
278         try:
279             root = self._parse_query(query)
280         except ParseError as e:
281             print(e.message, file=sys.stderr)
282             return None
283         return root.eval()
284
285     def _parse_query(self, query: str):
286         """Internal parse helper; prefer to use query instead."""
287
288         parens = set(["(", ")"])
289         and_or = set(["and", "or"])
290
291         def operator_precedence(token: str) -> Optional[int]:
292             table = {
293                 "(": 4,  # higher
294                 ")": 4,
295                 "not": 3,
296                 "and": 2,
297                 "or": 1,  # lower
298             }
299             return table.get(token, None)
300
301         def is_operator(token: str) -> bool:
302             return operator_precedence(token) is not None
303
304         def lex(query: str):
305             tokens = query.split()
306             for token in tokens:
307                 # Handle ( and ) operators stuck to the ends of tokens
308                 # that split() doesn't understand.
309                 if len(token) > 1:
310                     first = token[0]
311                     if first in parens:
312                         tail = token[1:]
313                         yield first
314                         token = tail
315                     last = token[-1]
316                     if last in parens:
317                         head = token[0:-1]
318                         yield head
319                         token = last
320                 yield token
321
322         def evaluate(corpus: Corpus, stack: List[str]):
323             node_stack: List[Node] = []
324             for token in stack:
325                 node = None
326                 if not is_operator(token):
327                     node = Node(corpus, Operation.QUERY, [token])
328                 else:
329                     args = []
330                     operation = Operation.from_token(token)
331                     operand_count = operation.num_operands()
332                     if len(node_stack) < operand_count:
333                         raise ParseError(
334                             f"Incorrect number of operations for {operation}"
335                         )
336                     for _ in range(operation.num_operands()):
337                         args.append(node_stack.pop())
338                     node = Node(corpus, operation, args)
339                 node_stack.append(node)
340             return node_stack[0]
341
342         output_stack = []
343         operator_stack = []
344         for token in lex(query):
345             if not is_operator(token):
346                 output_stack.append(token)
347                 continue
348
349             # token is an operator...
350             if token == "(":
351                 operator_stack.append(token)
352             elif token == ")":
353                 ok = False
354                 while len(operator_stack) > 0:
355                     pop_operator = operator_stack.pop()
356                     if pop_operator != "(":
357                         output_stack.append(pop_operator)
358                     else:
359                         ok = True
360                         break
361                 if not ok:
362                     raise ParseError("Unbalanced parenthesis in query expression")
363
364             # and, or, not
365             else:
366                 my_precedence = operator_precedence(token)
367                 if my_precedence is None:
368                     raise ParseError(f"Unknown operator: {token}")
369                 while len(operator_stack) > 0:
370                     peek_operator = operator_stack[-1]
371                     if not is_operator(peek_operator) or peek_operator == "(":
372                         break
373                     peek_precedence = operator_precedence(peek_operator)
374                     if peek_precedence is None:
375                         raise ParseError("Internal error")
376                     if (
377                         (peek_precedence < my_precedence)
378                         or (peek_precedence == my_precedence)
379                         and (peek_operator not in and_or)
380                     ):
381                         break
382                     output_stack.append(operator_stack.pop())
383                 operator_stack.append(token)
384         while len(operator_stack) > 0:
385             token = operator_stack.pop()
386             if token in parens:
387                 raise ParseError("Unbalanced parenthesis in query expression")
388             output_stack.append(token)
389         return evaluate(self, output_stack)
390
391
392 class Node(object):
393     """A query AST node."""
394
395     def __init__(
396         self,
397         corpus: Corpus,
398         op: Operation,
399         operands: Sequence[Union[Node, str]],
400     ):
401         self.corpus = corpus
402         self.op = op
403         self.operands = operands
404
405     def eval(self) -> Set[str]:
406         """Evaluate this node."""
407
408         evaled_operands: List[Union[Set[str], str]] = []
409         for operand in self.operands:
410             if isinstance(operand, Node):
411                 evaled_operands.append(operand.eval())
412             elif isinstance(operand, str):
413                 evaled_operands.append(operand)
414             else:
415                 raise ParseError(f"Unexpected operand: {operand}")
416
417         retval = set()
418         if self.op is Operation.QUERY:
419             for tag in evaled_operands:
420                 if isinstance(tag, str):
421                     if ":" in tag:
422                         try:
423                             key, value = tag.split(":")
424                         except ValueError as v:
425                             raise ParseError(
426                                 f'Invalid key:value syntax at "{tag}"'
427                             ) from v
428
429                         if key == "*":
430                             r = set()
431                             for kv, s in self.corpus.docids_by_property.items():
432                                 if value in ("*", kv[1]):
433                                     r.update(s)
434                         else:
435                             if value == "*":
436                                 r = self.corpus.get_docids_with_property(key)
437                             else:
438                                 r = self.corpus.get_docids_by_property(key, value)
439                     else:
440                         if tag == "*":
441                             r = set()
442                             for s in self.corpus.docids_by_tag.values():
443                                 r.update(s)
444                         else:
445                             r = self.corpus.get_docids_by_exact_tag(tag)
446                     retval.update(r)
447                 else:
448                     raise ParseError(f"Unexpected query {tag}")
449         elif self.op is Operation.DISJUNCTION:
450             if len(evaled_operands) != 2:
451                 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
452             retval.update(evaled_operands[0])
453             retval.update(evaled_operands[1])
454         elif self.op is Operation.CONJUNCTION:
455             if len(evaled_operands) != 2:
456                 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
457             retval.update(evaled_operands[0])
458             retval = retval.intersection(evaled_operands[1])
459         elif self.op is Operation.INVERSION:
460             if len(evaled_operands) != 1:
461                 raise ParseError("Operation.INVERSION (not) expects one operand.")
462             _ = evaled_operands[0]
463             if isinstance(_, set):
464                 retval.update(self.corpus.invert_docid_set(_))
465             else:
466                 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
467         return retval
468
469
470 if __name__ == "__main__":
471     import doctest
472
473     doctest.testmod()