More cleanup, yey!
[python_utils.git] / logical_search.py
index 86c6352972dec2f3330d7b2796fc07fc1e69b28f..b55e68901501ad2e68a7f45df034866072c3f61b 100644 (file)
@@ -1,27 +1,22 @@
 #!/usr/bin/env python3
 
+"""This is a module concerned with the creation of and searching of a
+corpus of documents.  The corpus is held in memory for fast
+searching."""
+
 from __future__ import annotations
 
-from collections import defaultdict
 import enum
 import sys
-from typing import (
-    Any,
-    Dict,
-    List,
-    NamedTuple,
-    Optional,
-    Set,
-    Sequence,
-    Tuple,
-    Union,
-)
+from collections import defaultdict
+from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
 
 
 class ParseError(Exception):
     """An error encountered while parsing a logical search expression."""
 
     def __init__(self, message: str):
+        super().__init__()
         self.message = message
 
 
@@ -30,9 +25,7 @@ class Document(NamedTuple):
 
     docid: str  # a unique idenfier for the document
     tags: Set[str]  # an optional set of tags
-    properties: List[
-        Tuple[str, str]
-    ]  # an optional set of key->value properties
+    properties: List[Tuple[str, str]]  # an optional set of key->value properties
     reference: Any  # an optional reference to something else
 
 
@@ -63,13 +56,46 @@ class Operation(enum.Enum):
 
 
 class Corpus(object):
-    """A collection of searchable documents."""
+    """A collection of searchable documents.
+
+    >>> c = Corpus()
+    >>> c.add_doc(Document(
+    ...                    docid=1,
+    ...                    tags=set(['urgent', 'important']),
+    ...                    properties=[
+    ...                                ('author', 'Scott'),
+    ...                                ('subject', 'your anniversary')
+    ...                    ],
+    ...                    reference=None,
+    ...                   )
+    ...          )
+    >>> c.add_doc(Document(
+    ...                    docid=2,
+    ...                    tags=set(['important']),
+    ...                    properties=[
+    ...                                ('author', 'Joe'),
+    ...                                ('subject', 'your performance at work')
+    ...                    ],
+    ...                    reference=None,
+    ...                   )
+    ...          )
+    >>> c.add_doc(Document(
+    ...                    docid=3,
+    ...                    tags=set(['urgent']),
+    ...                    properties=[
+    ...                                ('author', 'Scott'),
+    ...                                ('subject', 'car turning in front of you')
+    ...                    ],
+    ...                    reference=None,
+    ...                   )
+    ...          )
+    >>> c.query('author:Scott and important')
+    {1}
+    """
 
     def __init__(self) -> None:
         self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
-        self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(
-            set
-        )
+        self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
         self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
         self.documents_by_docid: Dict[str, Document] = {}
 
@@ -133,27 +159,21 @@ class Corpus(object):
     def get_docids_with_property(self, key: str) -> Set[str]:
         """Return the set of docids that have a particular property no matter
         what that property's value.
-        """
 
+        """
         return self.docids_with_property[key]
 
     def get_docids_by_property(self, key: str, value: str) -> Set[str]:
         """Return the set of docids that have a particular property with a
         particular value..
-        """
 
+        """
         return self.docids_by_property[(key, value)]
 
     def invert_docid_set(self, original: Set[str]) -> Set[str]:
         """Invert a set of docids."""
 
-        return set(
-            [
-                docid
-                for docid in self.documents_by_docid.keys()
-                if docid not in original
-            ]
-        )
+        return set([docid for docid in self.documents_by_docid.keys() if docid not in original])
 
     def get_doc(self, docid: str) -> Optional[Document]:
         """Given a docid, retrieve the previously added Document."""
@@ -205,7 +225,6 @@ class Corpus(object):
             return operator_precedence(token) is not None
 
         def lex(query: str):
-            query = query.lower()
             tokens = query.split()
             for token in tokens:
                 # Handle ( and ) operators stuck to the ends of tokens
@@ -234,9 +253,7 @@ class Corpus(object):
                     operation = Operation.from_token(token)
                     operand_count = operation.num_operands()
                     if len(node_stack) < operand_count:
-                        raise ParseError(
-                            f"Incorrect number of operations for {operation}"
-                        )
+                        raise ParseError(f"Incorrect number of operations for {operation}")
                     for _ in range(operation.num_operands()):
                         args.append(node_stack.pop())
                     node = Node(corpus, operation, args)
@@ -263,9 +280,7 @@ class Corpus(object):
                         ok = True
                         break
                 if not ok:
-                    raise ParseError(
-                        "Unbalanced parenthesis in query expression"
-                    )
+                    raise ParseError("Unbalanced parenthesis in query expression")
 
             # and, or, not
             else:
@@ -328,9 +343,7 @@ class Node(object):
                         try:
                             key, value = tag.split(":")
                         except ValueError as v:
-                            raise ParseError(
-                                f'Invalid key:value syntax at "{tag}"'
-                            ) from v
+                            raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
                         if value == "*":
                             r = self.corpus.get_docids_with_property(key)
                         else:
@@ -342,26 +355,26 @@ class Node(object):
                     raise ParseError(f"Unexpected query {tag}")
         elif self.op is Operation.DISJUNCTION:
             if len(evaled_operands) != 2:
-                raise ParseError(
-                    "Operation.DISJUNCTION (or) expects two operands."
-                )
+                raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
             retval.update(evaled_operands[0])
             retval.update(evaled_operands[1])
         elif self.op is Operation.CONJUNCTION:
             if len(evaled_operands) != 2:
-                raise ParseError(
-                    "Operation.CONJUNCTION (and) expects two operands."
-                )
+                raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
             retval.update(evaled_operands[0])
             retval = retval.intersection(evaled_operands[1])
         elif self.op is Operation.INVERSION:
             if len(evaled_operands) != 1:
-                raise ParseError(
-                    "Operation.INVERSION (not) expects one operand."
-                )
+                raise ParseError("Operation.INVERSION (not) expects one operand.")
             _ = evaled_operands[0]
             if isinstance(_, set):
                 retval.update(self.corpus.invert_docid_set(_))
             else:
                 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
         return retval
+
+
+if __name__ == '__main__':
+    import doctest
+
+    doctest.testmod()