Migration from old pyutilz package name (which, in turn, came from
[pyutils.git] / src / pyutils / search / logical_search.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents.  The corpus and index are held in memory.
7 """
8
9 from __future__ import annotations
10
11 import enum
12 import sys
13 from collections import defaultdict
14 from dataclasses import dataclass, field
15 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
16
17
18 class ParseError(Exception):
19     """An error encountered while parsing a logical search expression."""
20
21     def __init__(self, message: str):
22         super().__init__()
23         self.message = message
24
25
26 @dataclass
27 class Document:
28     """A class representing a searchable document."""
29
30     docid: str = ''
31     """A unique identifier for each document -- must be provided
32     by the caller.  See :meth:`python_modules.id_generator.get` or
33     :meth:`python_modules.string_utils.generate_uuid` for potential
34     sources."""
35
36     tags: Set[str] = field(default_factory=set)
37     """A set of tag strings for this document.  May be empty.  Tags
38     are simply text labels that are associated with a document and
39     may be used to search for it later.
40     """
41
42     properties: List[Tuple[str, str]] = field(default_factory=list)
43     """A list of key->value strings for this document.  May be empty.
44     Properties are more flexible tags that have both a label and a
45     value.  e.g. "category:mystery" or "author:smith"."""
46
47     reference: Optional[Any] = None
48     """An optional reference to something else for convenience;
49     interpreted only by caller code, ignored here.
50     """
51
52
53 class Operation(enum.Enum):
54     """A logical search query operation."""
55
56     QUERY = 1
57     CONJUNCTION = 2
58     DISJUNCTION = 3
59     INVERSION = 4
60
61     @staticmethod
62     def from_token(token: str):
63         table = {
64             "not": Operation.INVERSION,
65             "and": Operation.CONJUNCTION,
66             "or": Operation.DISJUNCTION,
67         }
68         return table.get(token, None)
69
70     def num_operands(self) -> Optional[int]:
71         table = {
72             Operation.INVERSION: 1,
73             Operation.CONJUNCTION: 2,
74             Operation.DISJUNCTION: 2,
75         }
76         return table.get(self, None)
77
78
79 class Corpus(object):
80     """A collection of searchable documents.  The caller can
81     add documents to it (or edit existing docs) via :meth:`add_doc`,
82     retrieve a document given its docid via :meth:`get_doc`, and
83     perform various lookups of documents.  The most interesting
84     lookup is implemented in :meth:`query`.
85
86     >>> c = Corpus()
87     >>> c.add_doc(Document(
88     ...                    docid=1,
89     ...                    tags=set(['urgent', 'important']),
90     ...                    properties=[
91     ...                                ('author', 'Scott'),
92     ...                                ('subject', 'your anniversary')
93     ...                    ],
94     ...                    reference=None,
95     ...                   )
96     ...          )
97     >>> c.add_doc(Document(
98     ...                    docid=2,
99     ...                    tags=set(['important']),
100     ...                    properties=[
101     ...                                ('author', 'Joe'),
102     ...                                ('subject', 'your performance at work')
103     ...                    ],
104     ...                    reference=None,
105     ...                   )
106     ...          )
107     >>> c.add_doc(Document(
108     ...                    docid=3,
109     ...                    tags=set(['urgent']),
110     ...                    properties=[
111     ...                                ('author', 'Scott'),
112     ...                                ('subject', 'car turning in front of you')
113     ...                    ],
114     ...                    reference=None,
115     ...                   )
116     ...          )
117     >>> c.query('author:Scott and important')
118     {1}
119     >>> c.query('*')
120     {1, 2, 3}
121     >>> c.query('*:*')
122     {1, 2, 3}
123     >>> c.query('*:Scott')
124     {1, 3}
125     """
126
127     def __init__(self) -> None:
128         self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
129         self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
130         self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
131         self.documents_by_docid: Dict[str, Document] = {}
132
133     def add_doc(self, doc: Document) -> None:
134         """Add a new Document to the Corpus.  Each Document must have a
135         distinct docid that will serve as its primary identifier.  If
136         the same Document is added multiple times, only the most
137         recent addition is indexed.  If two distinct documents with
138         the same docid are added, the latter klobbers the former in
139         the indexes.  See :meth:`python_modules.id_generator.get` or
140         :meth:`python_modules.string_utils.generate_uuid` for potential
141         sources of docids.
142
143         Each Document may have an optional set of tags which can be
144         used later in expressions to the query method.  These are simple
145         text labels.
146
147         Each Document may have an optional list of key->value tuples
148         which can be used later in expressions to the query method.
149
150         Document includes a user-defined "reference" field which is
151         never interpreted by this module.  This is meant to allow easy
152         mapping between Documents in this corpus and external objects
153         they may represent.
154
155         Args:
156             doc: the document to add or edit
157         """
158
159         if doc.docid in self.documents_by_docid:
160             # Handle collisions; assume that we are re-indexing the
161             # same document so remove it from the indexes before
162             # adding it back again.
163             colliding_doc = self.documents_by_docid[doc.docid]
164             assert colliding_doc.docid == doc.docid
165             del self.documents_by_docid[doc.docid]
166             for tag in colliding_doc.tags:
167                 self.docids_by_tag[tag].remove(doc.docid)
168             for key, value in colliding_doc.properties:
169                 self.docids_by_property[(key, value)].remove(doc.docid)
170                 self.docids_with_property[key].remove(doc.docid)
171
172         # Index the new Document
173         assert doc.docid not in self.documents_by_docid
174         self.documents_by_docid[doc.docid] = doc
175         for tag in doc.tags:
176             self.docids_by_tag[tag].add(doc.docid)
177         for key, value in doc.properties:
178             self.docids_by_property[(key, value)].add(doc.docid)
179             self.docids_with_property[key].add(doc.docid)
180
181     def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
182         """Return the set of docids that have a particular tag.
183
184         Args:
185             tag: the tag for which to search
186
187         Returns:
188             A set containing docids with the provided tag which
189             may be empty."""
190         return self.docids_by_tag[tag]
191
192     def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
193         """Return the set of docids with a tag that contains a str.
194
195         Args:
196             tag: the tag pattern for which to search
197
198         Returns:
199             A set containing docids with tags that match the pattern
200             provided.  e.g., if the arg was "foo" tags "football", "foobar",
201             and "food" all match.
202         """
203         ret = set()
204         for search_tag in self.docids_by_tag:
205             if tag in search_tag:
206                 for docid in self.docids_by_tag[search_tag]:
207                     ret.add(docid)
208         return ret
209
210     def get_docids_with_property(self, key: str) -> Set[str]:
211         """Return the set of docids that have a particular property no matter
212         what that property's value.
213
214         Args:
215             key: the key value to search for.
216
217         Returns:
218             A set of docids that contain the key (no matter what value)
219             which may be empty.
220         """
221         return self.docids_with_property[key]
222
223     def get_docids_by_property(self, key: str, value: str) -> Set[str]:
224         """Return the set of docids that have a particular property with a
225         particular value.
226
227         Args:
228             key: the key to search for
229             value: the value that key must have in order to match a doc.
230
231         Returns:
232             A set of docids that contain key with value which may be empty.
233         """
234         return self.docids_by_property[(key, value)]
235
236     def invert_docid_set(self, original: Set[str]) -> Set[str]:
237         """Invert a set of docids."""
238         return {docid for docid in self.documents_by_docid if docid not in original}
239
240     def get_doc(self, docid: str) -> Optional[Document]:
241         """Given a docid, retrieve the previously added Document.
242
243         Args:
244             docid: the docid to retrieve
245
246         Returns:
247             The Document with docid or None to indicate no match.
248         """
249         return self.documents_by_docid.get(docid, None)
250
251     def query(self, query: str) -> Optional[Set[str]]:
252         """Query the corpus for documents that match a logical expression.
253
254         Args:
255             query: the logical query expressed using a simple language
256                 that understands conjunction (and operator), disjunction
257                 (or operator) and inversion (not operator) as well as
258                 parenthesis.  Here are some legal sample queries::
259
260                     tag1 and tag2 and not tag3
261
262                     (tag1 or tag2) and (tag3 or tag4)
263
264                     (tag1 and key2:value2) or (tag2 and key1:value1)
265
266                     key:*
267
268                     tag1 and key:*
269
270         Returns:
271             A (potentially empty) set of docids for the matching
272             (previously added) documents or None on error.
273         """
274
275         try:
276             root = self._parse_query(query)
277         except ParseError as e:
278             print(e.message, file=sys.stderr)
279             return None
280         return root.eval()
281
282     def _parse_query(self, query: str):
283         """Internal parse helper; prefer to use query instead."""
284
285         parens = set(["(", ")"])
286         and_or = set(["and", "or"])
287
288         def operator_precedence(token: str) -> Optional[int]:
289             table = {
290                 "(": 4,  # higher
291                 ")": 4,
292                 "not": 3,
293                 "and": 2,
294                 "or": 1,  # lower
295             }
296             return table.get(token, None)
297
298         def is_operator(token: str) -> bool:
299             return operator_precedence(token) is not None
300
301         def lex(query: str):
302             tokens = query.split()
303             for token in tokens:
304                 # Handle ( and ) operators stuck to the ends of tokens
305                 # that split() doesn't understand.
306                 if len(token) > 1:
307                     first = token[0]
308                     if first in parens:
309                         tail = token[1:]
310                         yield first
311                         token = tail
312                     last = token[-1]
313                     if last in parens:
314                         head = token[0:-1]
315                         yield head
316                         token = last
317                 yield token
318
319         def evaluate(corpus: Corpus, stack: List[str]):
320             node_stack: List[Node] = []
321             for token in stack:
322                 node = None
323                 if not is_operator(token):
324                     node = Node(corpus, Operation.QUERY, [token])
325                 else:
326                     args = []
327                     operation = Operation.from_token(token)
328                     operand_count = operation.num_operands()
329                     if len(node_stack) < operand_count:
330                         raise ParseError(
331                             f"Incorrect number of operations for {operation}"
332                         )
333                     for _ in range(operation.num_operands()):
334                         args.append(node_stack.pop())
335                     node = Node(corpus, operation, args)
336                 node_stack.append(node)
337             return node_stack[0]
338
339         output_stack = []
340         operator_stack = []
341         for token in lex(query):
342             if not is_operator(token):
343                 output_stack.append(token)
344                 continue
345
346             # token is an operator...
347             if token == "(":
348                 operator_stack.append(token)
349             elif token == ")":
350                 ok = False
351                 while len(operator_stack) > 0:
352                     pop_operator = operator_stack.pop()
353                     if pop_operator != "(":
354                         output_stack.append(pop_operator)
355                     else:
356                         ok = True
357                         break
358                 if not ok:
359                     raise ParseError("Unbalanced parenthesis in query expression")
360
361             # and, or, not
362             else:
363                 my_precedence = operator_precedence(token)
364                 if my_precedence is None:
365                     raise ParseError(f"Unknown operator: {token}")
366                 while len(operator_stack) > 0:
367                     peek_operator = operator_stack[-1]
368                     if not is_operator(peek_operator) or peek_operator == "(":
369                         break
370                     peek_precedence = operator_precedence(peek_operator)
371                     if peek_precedence is None:
372                         raise ParseError("Internal error")
373                     if (
374                         (peek_precedence < my_precedence)
375                         or (peek_precedence == my_precedence)
376                         and (peek_operator not in and_or)
377                     ):
378                         break
379                     output_stack.append(operator_stack.pop())
380                 operator_stack.append(token)
381         while len(operator_stack) > 0:
382             token = operator_stack.pop()
383             if token in parens:
384                 raise ParseError("Unbalanced parenthesis in query expression")
385             output_stack.append(token)
386         return evaluate(self, output_stack)
387
388
389 class Node(object):
390     """A query AST node."""
391
392     def __init__(
393         self,
394         corpus: Corpus,
395         op: Operation,
396         operands: Sequence[Union[Node, str]],
397     ):
398         self.corpus = corpus
399         self.op = op
400         self.operands = operands
401
402     def eval(self) -> Set[str]:
403         """Evaluate this node."""
404
405         evaled_operands: List[Union[Set[str], str]] = []
406         for operand in self.operands:
407             if isinstance(operand, Node):
408                 evaled_operands.append(operand.eval())
409             elif isinstance(operand, str):
410                 evaled_operands.append(operand)
411             else:
412                 raise ParseError(f"Unexpected operand: {operand}")
413
414         retval = set()
415         if self.op is Operation.QUERY:
416             for tag in evaled_operands:
417                 if isinstance(tag, str):
418                     if ":" in tag:
419                         try:
420                             key, value = tag.split(":")
421                         except ValueError as v:
422                             raise ParseError(
423                                 f'Invalid key:value syntax at "{tag}"'
424                             ) from v
425
426                         if key == '*':
427                             r = set()
428                             for kv, s in self.corpus.docids_by_property.items():
429                                 if value in ('*', kv[1]):
430                                     r.update(s)
431                         else:
432                             if value == '*':
433                                 r = self.corpus.get_docids_with_property(key)
434                             else:
435                                 r = self.corpus.get_docids_by_property(key, value)
436                     else:
437                         if tag == '*':
438                             r = set()
439                             for s in self.corpus.docids_by_tag.values():
440                                 r.update(s)
441                         else:
442                             r = self.corpus.get_docids_by_exact_tag(tag)
443                     retval.update(r)
444                 else:
445                     raise ParseError(f"Unexpected query {tag}")
446         elif self.op is Operation.DISJUNCTION:
447             if len(evaled_operands) != 2:
448                 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
449             retval.update(evaled_operands[0])
450             retval.update(evaled_operands[1])
451         elif self.op is Operation.CONJUNCTION:
452             if len(evaled_operands) != 2:
453                 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
454             retval.update(evaled_operands[0])
455             retval = retval.intersection(evaled_operands[1])
456         elif self.op is Operation.INVERSION:
457             if len(evaled_operands) != 1:
458                 raise ParseError("Operation.INVERSION (not) expects one operand.")
459             _ = evaled_operands[0]
460             if isinstance(_, set):
461                 retval.update(self.corpus.invert_docid_set(_))
462             else:
463                 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
464         return retval
465
466
467 if __name__ == '__main__':
468     import doctest
469
470     doctest.testmod()