More cleanup.
[python_utils.git] / logical_search.py
index 86c6352972dec2f3330d7b2796fc07fc1e69b28f..ef55a2bbdcbb5baafa5c29a53fc404fcfb4701bb 100644 (file)
@@ -1,39 +1,41 @@
 #!/usr/bin/env python3
 
-from __future__ import annotations
+"""This is a module concerned with the creation of and searching of a
+corpus of documents.  The corpus is held in memory for fast
+searching."""
 
-from collections import defaultdict
+from __future__ import annotations
 import enum
 import sys
-from typing import (
-    Any,
-    Dict,
-    List,
-    NamedTuple,
-    Optional,
-    Set,
-    Sequence,
-    Tuple,
-    Union,
-)
+from collections import defaultdict
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
 
 
 class ParseError(Exception):
     """An error encountered while parsing a logical search expression."""
 
     def __init__(self, message: str):
+        super().__init__()
         self.message = message
 
 
-class Document(NamedTuple):
-    """A tuple representing a searchable document."""
+@dataclass
+class Document:
+    """A class representing a searchable document."""
+
+    # A unique identifier for each document.
+    docid: str = ''
+
+    # A set of tag strings for this document.  May be empty.
+    tags: Set[str] = field(default_factory=set)
+
+    # A list of key->value strings for this document.  May be empty.
+    properties: List[Tuple[str, str]] = field(default_factory=list)
 
-    docid: str  # a unique idenfier for the document
-    tags: Set[str]  # an optional set of tags
-    properties: List[
-        Tuple[str, str]
-    ]  # an optional set of key->value properties
-    reference: Any  # an optional reference to something else
+    # An optional reference to something else; interpreted only by
+    # caller code, ignored here.
+    reference: Optional[Any] = None
 
 
 class Operation(enum.Enum):
@@ -63,13 +65,46 @@ class Operation(enum.Enum):
 
 
 class Corpus(object):
-    """A collection of searchable documents."""
+    """A collection of searchable documents.
+
+    >>> c = Corpus()
+    >>> c.add_doc(Document(
+    ...                    docid=1,
+    ...                    tags=set(['urgent', 'important']),
+    ...                    properties=[
+    ...                                ('author', 'Scott'),
+    ...                                ('subject', 'your anniversary')
+    ...                    ],
+    ...                    reference=None,
+    ...                   )
+    ...          )
+    >>> c.add_doc(Document(
+    ...                    docid=2,
+    ...                    tags=set(['important']),
+    ...                    properties=[
+    ...                                ('author', 'Joe'),
+    ...                                ('subject', 'your performance at work')
+    ...                    ],
+    ...                    reference=None,
+    ...                   )
+    ...          )
+    >>> c.add_doc(Document(
+    ...                    docid=3,
+    ...                    tags=set(['urgent']),
+    ...                    properties=[
+    ...                                ('author', 'Scott'),
+    ...                                ('subject', 'car turning in front of you')
+    ...                    ],
+    ...                    reference=None,
+    ...                   )
+    ...          )
+    >>> c.query('author:Scott and important')
+    {1}
+    """
 
     def __init__(self) -> None:
         self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
-        self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(
-            set
-        )
+        self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
         self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
         self.documents_by_docid: Dict[str, Document] = {}
 
@@ -133,27 +168,21 @@ class Corpus(object):
     def get_docids_with_property(self, key: str) -> Set[str]:
         """Return the set of docids that have a particular property no matter
         what that property's value.
-        """
 
+        """
         return self.docids_with_property[key]
 
     def get_docids_by_property(self, key: str, value: str) -> Set[str]:
         """Return the set of docids that have a particular property with a
         particular value..
-        """
 
+        """
         return self.docids_by_property[(key, value)]
 
     def invert_docid_set(self, original: Set[str]) -> Set[str]:
         """Invert a set of docids."""
 
-        return set(
-            [
-                docid
-                for docid in self.documents_by_docid.keys()
-                if docid not in original
-            ]
-        )
+        return {docid for docid in self.documents_by_docid if docid not in original}
 
     def get_doc(self, docid: str) -> Optional[Document]:
         """Given a docid, retrieve the previously added Document."""
@@ -205,7 +234,6 @@ class Corpus(object):
             return operator_precedence(token) is not None
 
         def lex(query: str):
-            query = query.lower()
             tokens = query.split()
             for token in tokens:
                 # Handle ( and ) operators stuck to the ends of tokens
@@ -234,9 +262,7 @@ class Corpus(object):
                     operation = Operation.from_token(token)
                     operand_count = operation.num_operands()
                     if len(node_stack) < operand_count:
-                        raise ParseError(
-                            f"Incorrect number of operations for {operation}"
-                        )
+                        raise ParseError(f"Incorrect number of operations for {operation}")
                     for _ in range(operation.num_operands()):
                         args.append(node_stack.pop())
                     node = Node(corpus, operation, args)
@@ -263,9 +289,7 @@ class Corpus(object):
                         ok = True
                         break
                 if not ok:
-                    raise ParseError(
-                        "Unbalanced parenthesis in query expression"
-                    )
+                    raise ParseError("Unbalanced parenthesis in query expression")
 
             # and, or, not
             else:
@@ -328,9 +352,7 @@ class Node(object):
                         try:
                             key, value = tag.split(":")
                         except ValueError as v:
-                            raise ParseError(
-                                f'Invalid key:value syntax at "{tag}"'
-                            ) from v
+                            raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
                         if value == "*":
                             r = self.corpus.get_docids_with_property(key)
                         else:
@@ -342,26 +364,26 @@ class Node(object):
                     raise ParseError(f"Unexpected query {tag}")
         elif self.op is Operation.DISJUNCTION:
             if len(evaled_operands) != 2:
-                raise ParseError(
-                    "Operation.DISJUNCTION (or) expects two operands."
-                )
+                raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
             retval.update(evaled_operands[0])
             retval.update(evaled_operands[1])
         elif self.op is Operation.CONJUNCTION:
             if len(evaled_operands) != 2:
-                raise ParseError(
-                    "Operation.CONJUNCTION (and) expects two operands."
-                )
+                raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
             retval.update(evaled_operands[0])
             retval = retval.intersection(evaled_operands[1])
         elif self.op is Operation.INVERSION:
             if len(evaled_operands) != 1:
-                raise ParseError(
-                    "Operation.INVERSION (not) expects one operand."
-                )
+                raise ParseError("Operation.INVERSION (not) expects one operand.")
             _ = evaled_operands[0]
             if isinstance(_, set):
                 retval.update(self.corpus.invert_docid_set(_))
             else:
                 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
         return retval
+
+
+if __name__ == '__main__':
+    import doctest
+
+    doctest.testmod()