Adds a __repr__ to graph.
[pyutils.git] / src / pyutils / search / logical_search.py
1 #!/usr/bin/env python3
2 # pylint: disable=too-many-nested-blocks
3
4 # © Copyright 2021-2023, Scott Gasch
5
6 """This is a module concerned with the creation of and searching of a
7 corpus of documents.  The corpus and index are held in memory.
8 The query language contains AND, OR, NOT, and parenthesis to support
9 flexible search semantics.
10 """
11
12 from __future__ import annotations
13
14 import enum
15 import sys
16 from collections import defaultdict
17 from dataclasses import dataclass, field
18 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
19
20
21 class ParseError(Exception):
22     """An error encountered while parsing a logical search expression."""
23
24     def __init__(self, message: str):
25         super().__init__()
26         self.message = message
27
28
29 @dataclass
30 class Document:
31     """A class representing a searchable document."""
32
33     docid: str = ""
34     """A unique identifier for each document -- must be provided
35     by the caller.  See :meth:`python_modules.id_generator.get` or
36     :meth:`python_modules.string_utils.generate_uuid` for potential
37     sources."""
38
39     tags: Set[str] = field(default_factory=set)
40     """A set of tag strings for this document.  May be empty.  Tags
41     are simply text labels that are associated with a document and
42     may be used to search for it later.
43     """
44
45     properties: List[Tuple[str, str]] = field(default_factory=list)
46     """A list of key->value strings for this document.  May be empty.
47     Properties are more flexible tags that have both a label and a
48     value.  e.g. "category:mystery" or "author:smith"."""
49
50     reference: Optional[Any] = None
51     """An optional reference to something else for convenience;
52     interpreted only by caller code, ignored here.
53     """
54
55
56 class Operation(enum.Enum):
57     """A logical search query operation."""
58
59     QUERY = 1
60     CONJUNCTION = 2
61     DISJUNCTION = 3
62     INVERSION = 4
63
64     @staticmethod
65     def from_token(token: str):
66         table = {
67             "not": Operation.INVERSION,
68             "and": Operation.CONJUNCTION,
69             "or": Operation.DISJUNCTION,
70         }
71         return table.get(token, None)
72
73     def num_operands(self) -> Optional[int]:
74         table = {
75             Operation.INVERSION: 1,
76             Operation.CONJUNCTION: 2,
77             Operation.DISJUNCTION: 2,
78         }
79         return table.get(self, None)
80
81
82 class Corpus(object):
83     """A collection of searchable documents.  The caller can
84     add documents to it (or edit existing docs) via :meth:`add_doc`,
85     retrieve a document given its docid via :meth:`get_doc`, and
86     perform various lookups of documents.  The most interesting
87     lookup is implemented in :meth:`query`.
88
89     >>> c = Corpus()
90     >>> c.add_doc(Document(
91     ...                    docid=1,
92     ...                    tags=set(['urgent', 'important']),
93     ...                    properties=[
94     ...                                ('author', 'Scott'),
95     ...                                ('subject', 'your anniversary')
96     ...                    ],
97     ...                    reference=None,
98     ...                   )
99     ...          )
100     >>> c.add_doc(Document(
101     ...                    docid=2,
102     ...                    tags=set(['important']),
103     ...                    properties=[
104     ...                                ('author', 'Joe'),
105     ...                                ('subject', 'your performance at work')
106     ...                    ],
107     ...                    reference=None,
108     ...                   )
109     ...          )
110     >>> c.add_doc(Document(
111     ...                    docid=3,
112     ...                    tags=set(['urgent']),
113     ...                    properties=[
114     ...                                ('author', 'Scott'),
115     ...                                ('subject', 'car turning in front of you')
116     ...                    ],
117     ...                    reference=None,
118     ...                   )
119     ...          )
120     >>> c.query('author:Scott and important')
121     {1}
122     >>> c.query('*')
123     {1, 2, 3}
124     >>> c.query('*:*')
125     {1, 2, 3}
126     >>> c.query('*:Scott')
127     {1, 3}
128     """
129
130     def __init__(self) -> None:
131         self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
132         self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
133         self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
134         self.documents_by_docid: Dict[str, Document] = {}
135
136     def add_doc(self, doc: Document) -> None:
137         """Add a new Document to the Corpus.  Each Document must have a
138         distinct docid that will serve as its primary identifier.  If
139         the same Document is added multiple times, only the most
140         recent addition is indexed.  If two distinct documents with
141         the same docid are added, the latter klobbers the former in
142         the indexes.  See :meth:`python_modules.id_generator.get` or
143         :meth:`python_modules.string_utils.generate_uuid` for potential
144         sources of docids.
145
146         Each Document may have an optional set of tags which can be
147         used later in expressions to the query method.  These are simple
148         text labels.
149
150         Each Document may have an optional list of key->value tuples
151         which can be used later in expressions to the query method.
152
153         Document includes a user-defined "reference" field which is
154         never interpreted by this module.  This is meant to allow easy
155         mapping between Documents in this corpus and external objects
156         they may represent.
157
158         Args:
159             doc: the document to add or edit
160         """
161
162         if doc.docid in self.documents_by_docid:
163             # Handle collisions; assume that we are re-indexing the
164             # same document so remove it from the indexes before
165             # adding it back again.
166             colliding_doc = self.documents_by_docid[doc.docid]
167             assert colliding_doc.docid == doc.docid
168             del self.documents_by_docid[doc.docid]
169             for tag in colliding_doc.tags:
170                 self.docids_by_tag[tag].remove(doc.docid)
171             for key, value in colliding_doc.properties:
172                 self.docids_by_property[(key, value)].remove(doc.docid)
173                 self.docids_with_property[key].remove(doc.docid)
174
175         # Index the new Document
176         assert doc.docid not in self.documents_by_docid
177         self.documents_by_docid[doc.docid] = doc
178         for tag in doc.tags:
179             self.docids_by_tag[tag].add(doc.docid)
180         for key, value in doc.properties:
181             self.docids_by_property[(key, value)].add(doc.docid)
182             self.docids_with_property[key].add(doc.docid)
183
184     def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
185         """Return the set of docids that have a particular tag.
186
187         Args:
188             tag: the tag for which to search
189
190         Returns:
191             A set containing docids with the provided tag which
192             may be empty."""
193         return self.docids_by_tag[tag]
194
195     def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
196         """Return the set of docids with a tag that contains a str.
197
198         Args:
199             tag: the tag pattern for which to search
200
201         Returns:
202             A set containing docids with tags that match the pattern
203             provided.  e.g., if the arg was "foo" tags "football", "foobar",
204             and "food" all match.
205         """
206         ret = set()
207         for search_tag in self.docids_by_tag:
208             if tag in search_tag:
209                 for docid in self.docids_by_tag[search_tag]:
210                     ret.add(docid)
211         return ret
212
213     def get_docids_with_property(self, key: str) -> Set[str]:
214         """Return the set of docids that have a particular property no matter
215         what that property's value.
216
217         Args:
218             key: the key value to search for.
219
220         Returns:
221             A set of docids that contain the key (no matter what value)
222             which may be empty.
223         """
224         return self.docids_with_property[key]
225
226     def get_docids_by_property(self, key: str, value: str) -> Set[str]:
227         """Return the set of docids that have a particular property with a
228         particular value.
229
230         Args:
231             key: the key to search for
232             value: the value that key must have in order to match a doc.
233
234         Returns:
235             A set of docids that contain key with value which may be empty.
236         """
237         return self.docids_by_property[(key, value)]
238
239     def invert_docid_set(self, original: Set[str]) -> Set[str]:
240         """Invert a set of docids."""
241         return {docid for docid in self.documents_by_docid if docid not in original}
242
243     def get_doc(self, docid: str) -> Optional[Document]:
244         """Given a docid, retrieve the previously added Document.
245
246         Args:
247             docid: the docid to retrieve
248
249         Returns:
250             The Document with docid or None to indicate no match.
251         """
252         return self.documents_by_docid.get(docid, None)
253
254     def query(self, query: str) -> Optional[Set[str]]:
255         """Query the corpus for documents that match a logical expression.
256
257         Args:
258             query: the logical query expressed using a simple language
259                 that understands conjunction (and operator), disjunction
260                 (or operator) and inversion (not operator) as well as
261                 parenthesis.  Here are some legal sample queries::
262
263                     tag1 and tag2 and not tag3
264
265                     (tag1 or tag2) and (tag3 or tag4)
266
267                     (tag1 and key2:value2) or (tag2 and key1:value1)
268
269                     key:*
270
271                     tag1 and key:*
272
273         Returns:
274             A (potentially empty) set of docids for the matching
275             (previously added) documents or None on error.
276         """
277
278         try:
279             root = self._parse_query(query)
280         except ParseError as e:
281             print(e.message, file=sys.stderr)
282             return None
283         return root.eval()
284
285     def _parse_query(self, query: str):
286         """Internal parse helper; prefer to use query instead."""
287
288         parens = set(["(", ")"])
289         and_or = set(["and", "or"])
290
291         def operator_precedence(token: str) -> Optional[int]:
292             table = {
293                 "(": 4,  # higher
294                 ")": 4,
295                 "not": 3,
296                 "and": 2,
297                 "or": 1,  # lower
298             }
299             return table.get(token, None)
300
301         def is_operator(token: str) -> bool:
302             return operator_precedence(token) is not None
303
304         def lex(query: str):
305             tokens = query.split()
306             for token in tokens:
307                 # Handle ( and ) operators stuck to the ends of tokens
308                 # that split() doesn't understand.
309                 if len(token) > 1:
310                     first = token[0]
311                     if first in parens:
312                         tail = token[1:]
313                         yield first
314                         token = tail
315                     last = token[-1]
316                     if last in parens:
317                         head = token[0:-1]
318                         yield head
319                         token = last
320                 yield token
321
322         def evaluate(corpus: Corpus, stack: List[str]):
323             """
324             Raises:
325                 ParseError: bad number of operations, unbalanced parenthesis,
326                     unknown operators, internal errors.
327             """
328             node_stack: List[Node] = []
329             for token in stack:
330                 node = None
331                 if not is_operator(token):
332                     node = Node(corpus, Operation.QUERY, [token])
333                 else:
334                     args = []
335                     operation = Operation.from_token(token)
336                     operand_count = operation.num_operands()
337                     if len(node_stack) < operand_count:
338                         raise ParseError(
339                             f"Incorrect number of operations for {operation}"
340                         )
341                     for _ in range(operation.num_operands()):
342                         args.append(node_stack.pop())
343                     node = Node(corpus, operation, args)
344                 node_stack.append(node)
345             return node_stack[0]
346
347         output_stack = []
348         operator_stack = []
349         for token in lex(query):
350             if not is_operator(token):
351                 output_stack.append(token)
352                 continue
353
354             # token is an operator...
355             if token == "(":
356                 operator_stack.append(token)
357             elif token == ")":
358                 ok = False
359                 while len(operator_stack) > 0:
360                     pop_operator = operator_stack.pop()
361                     if pop_operator != "(":
362                         output_stack.append(pop_operator)
363                     else:
364                         ok = True
365                         break
366                 if not ok:
367                     raise ParseError("Unbalanced parenthesis in query expression")
368
369             # and, or, not
370             else:
371                 my_precedence = operator_precedence(token)
372                 if my_precedence is None:
373                     raise ParseError(f"Unknown operator: {token}")
374                 while len(operator_stack) > 0:
375                     peek_operator = operator_stack[-1]
376                     if not is_operator(peek_operator) or peek_operator == "(":
377                         break
378                     peek_precedence = operator_precedence(peek_operator)
379                     if peek_precedence is None:
380                         raise ParseError("Internal error")
381                     if (
382                         (peek_precedence < my_precedence)
383                         or (peek_precedence == my_precedence)
384                         and (peek_operator not in and_or)
385                     ):
386                         break
387                     output_stack.append(operator_stack.pop())
388                 operator_stack.append(token)
389         while len(operator_stack) > 0:
390             token = operator_stack.pop()
391             if token in parens:
392                 raise ParseError("Unbalanced parenthesis in query expression")
393             output_stack.append(token)
394         return evaluate(self, output_stack)
395
396
397 class Node(object):
398     """A query AST node."""
399
400     def __init__(
401         self,
402         corpus: Corpus,
403         op: Operation,
404         operands: Sequence[Union[Node, str]],
405     ):
406         self.corpus = corpus
407         self.op = op
408         self.operands = operands
409
410     def eval(self) -> Set[str]:
411         """Evaluate this node.
412
413         Raises:
414             ParseError: unexpected operands, invalid key:value syntax, wrong
415                 number of operands for operation, other invalid queries.
416         """
417
418         evaled_operands: List[Union[Set[str], str]] = []
419         for operand in self.operands:
420             if isinstance(operand, Node):
421                 evaled_operands.append(operand.eval())
422             elif isinstance(operand, str):
423                 evaled_operands.append(operand)
424             else:
425                 raise ParseError(f"Unexpected operand: {operand}")
426
427         retval = set()
428         if self.op is Operation.QUERY:
429             for tag in evaled_operands:
430                 if isinstance(tag, str):
431                     if ":" in tag:
432                         try:
433                             key, value = tag.split(":")
434                         except ValueError as v:
435                             raise ParseError(
436                                 f'Invalid key:value syntax at "{tag}"'
437                             ) from v
438
439                         if key == "*":
440                             r = set()
441                             for kv, s in self.corpus.docids_by_property.items():
442                                 if value in ("*", kv[1]):
443                                     r.update(s)
444                         else:
445                             if value == "*":
446                                 r = self.corpus.get_docids_with_property(key)
447                             else:
448                                 r = self.corpus.get_docids_by_property(key, value)
449                     else:
450                         if tag == "*":
451                             r = set()
452                             for s in self.corpus.docids_by_tag.values():
453                                 r.update(s)
454                         else:
455                             r = self.corpus.get_docids_by_exact_tag(tag)
456                     retval.update(r)
457                 else:
458                     raise ParseError(f"Unexpected query {tag}")
459         elif self.op is Operation.DISJUNCTION:
460             if len(evaled_operands) != 2:
461                 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
462             retval.update(evaled_operands[0])
463             retval.update(evaled_operands[1])
464         elif self.op is Operation.CONJUNCTION:
465             if len(evaled_operands) != 2:
466                 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
467             retval.update(evaled_operands[0])
468             retval = retval.intersection(evaled_operands[1])
469         elif self.op is Operation.INVERSION:
470             if len(evaled_operands) != 1:
471                 raise ParseError("Operation.INVERSION (not) expects one operand.")
472             _ = evaled_operands[0]
473             if isinstance(_, set):
474                 retval.update(self.corpus.invert_docid_set(_))
475             else:
476                 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
477         return retval
478
479
480 if __name__ == "__main__":
481     import doctest
482
483     doctest.testmod()