#!/usr/bin/env python3
-from __future__ import annotations
+# © Copyright 2021-2022, Scott Gasch
-from collections import defaultdict
+"""This is a module concerned with the creation of and searching of a
+corpus of documents. The corpus is held in memory for fast
+searching.
+
+"""
+
+from __future__ import annotations
import enum
import sys
-from typing import (
- Any,
- Dict,
- List,
- NamedTuple,
- Optional,
- Set,
- Sequence,
- Tuple,
- Union,
-)
+from collections import defaultdict
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
class ParseError(Exception):
"""An error encountered while parsing a logical search expression."""
def __init__(self, message: str):
+ super().__init__()
self.message = message
-class Document(NamedTuple):
- """A tuple representing a searchable document."""
+@dataclass
+class Document:
+ """A class representing a searchable document."""
+
+ # A unique identifier for each document.
+ docid: str = ''
+
+ # A set of tag strings for this document. May be empty.
+ tags: Set[str] = field(default_factory=set)
- docid: str # a unique idenfier for the document
- tags: Set[str] # an optional set of tags
- properties: List[
- Tuple[str, str]
- ] # an optional set of key->value properties
- reference: Any # an optional reference to something else
+ # A list of key->value strings for this document. May be empty.
+ properties: List[Tuple[str, str]] = field(default_factory=list)
+
+ # An optional reference to something else; interpreted only by
+ # caller code, ignored here.
+ reference: Optional[Any] = None
class Operation(enum.Enum):
class Corpus(object):
- """A collection of searchable documents."""
+ """A collection of searchable documents.
+
+ >>> c = Corpus()
+ >>> c.add_doc(Document(
+ ... docid=1,
+ ... tags=set(['urgent', 'important']),
+ ... properties=[
+ ... ('author', 'Scott'),
+ ... ('subject', 'your anniversary')
+ ... ],
+ ... reference=None,
+ ... )
+ ... )
+ >>> c.add_doc(Document(
+ ... docid=2,
+ ... tags=set(['important']),
+ ... properties=[
+ ... ('author', 'Joe'),
+ ... ('subject', 'your performance at work')
+ ... ],
+ ... reference=None,
+ ... )
+ ... )
+ >>> c.add_doc(Document(
+ ... docid=3,
+ ... tags=set(['urgent']),
+ ... properties=[
+ ... ('author', 'Scott'),
+ ... ('subject', 'car turning in front of you')
+ ... ],
+ ... reference=None,
+ ... )
+ ... )
+ >>> c.query('author:Scott and important')
+ {1}
+ >>> c.query('*')
+ {1, 2, 3}
+ >>> c.query('*:*')
+ {1, 2, 3}
+ >>> c.query('*:Scott')
+ {1, 3}
+ """
def __init__(self) -> None:
self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
- self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(
- set
- )
+ self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
self.documents_by_docid: Dict[str, Document] = {}
def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
"""Return the set of docids that have a particular tag."""
-
return self.docids_by_tag[tag]
def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
def get_docids_with_property(self, key: str) -> Set[str]:
"""Return the set of docids that have a particular property no matter
what that property's value.
- """
+ """
return self.docids_with_property[key]
def get_docids_by_property(self, key: str, value: str) -> Set[str]:
"""Return the set of docids that have a particular property with a
particular value..
- """
+ """
return self.docids_by_property[(key, value)]
def invert_docid_set(self, original: Set[str]) -> Set[str]:
"""Invert a set of docids."""
- return set(
- [
- docid
- for docid in self.documents_by_docid.keys()
- if docid not in original
- ]
- )
+ return {docid for docid in self.documents_by_docid if docid not in original}
def get_doc(self, docid: str) -> Optional[Document]:
"""Given a docid, retrieve the previously added Document."""
return operator_precedence(token) is not None
def lex(query: str):
- query = query.lower()
tokens = query.split()
for token in tokens:
# Handle ( and ) operators stuck to the ends of tokens
operation = Operation.from_token(token)
operand_count = operation.num_operands()
if len(node_stack) < operand_count:
- raise ParseError(
- f"Incorrect number of operations for {operation}"
- )
+ raise ParseError(f"Incorrect number of operations for {operation}")
for _ in range(operation.num_operands()):
args.append(node_stack.pop())
node = Node(corpus, operation, args)
ok = True
break
if not ok:
- raise ParseError(
- "Unbalanced parenthesis in query expression"
- )
+ raise ParseError("Unbalanced parenthesis in query expression")
# and, or, not
else:
try:
key, value = tag.split(":")
except ValueError as v:
- raise ParseError(
- f'Invalid key:value syntax at "{tag}"'
- ) from v
- if value == "*":
- r = self.corpus.get_docids_with_property(key)
+ raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
+
+ if key == '*':
+ r = set()
+ for kv, s in self.corpus.docids_by_property.items():
+ if value in ('*', kv[1]):
+ r.update(s)
else:
- r = self.corpus.get_docids_by_property(key, value)
+ if value == '*':
+ r = self.corpus.get_docids_with_property(key)
+ else:
+ r = self.corpus.get_docids_by_property(key, value)
else:
- r = self.corpus.get_docids_by_exact_tag(tag)
+ if tag == '*':
+ r = set()
+ for s in self.corpus.docids_by_tag.values():
+ r.update(s)
+ else:
+ r = self.corpus.get_docids_by_exact_tag(tag)
retval.update(r)
else:
raise ParseError(f"Unexpected query {tag}")
elif self.op is Operation.DISJUNCTION:
if len(evaled_operands) != 2:
- raise ParseError(
- "Operation.DISJUNCTION (or) expects two operands."
- )
+ raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
retval.update(evaled_operands[0])
retval.update(evaled_operands[1])
elif self.op is Operation.CONJUNCTION:
if len(evaled_operands) != 2:
- raise ParseError(
- "Operation.CONJUNCTION (and) expects two operands."
- )
+ raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
retval.update(evaled_operands[0])
retval = retval.intersection(evaled_operands[1])
elif self.op is Operation.INVERSION:
if len(evaled_operands) != 1:
- raise ParseError(
- "Operation.INVERSION (not) expects one operand."
- )
+ raise ParseError("Operation.INVERSION (not) expects one operand.")
_ = evaled_operands[0]
if isinstance(_, set):
retval.update(self.corpus.invert_docid_set(_))
else:
raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
return retval
+
+
+if __name__ == '__main__':
+ import doctest
+
+ doctest.testmod()