2 # pylint: disable=too-many-nested-blocks
4 # © Copyright 2021-2023, Scott Gasch
6 """This is a module concerned with the creation of and searching of a
7 corpus of documents. The corpus and index are held in memory.
8 The query language contains AND, OR, NOT, and parenthesis to support
9 flexible search semantics.
12 from __future__ import annotations
16 from collections import defaultdict
17 from dataclasses import dataclass, field
18 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
21 class ParseError(Exception):
22 """An error encountered while parsing a logical search expression."""
24 def __init__(self, message: str):
26 self.message = message
31 """A class representing a searchable document."""
34 """A unique identifier for each document -- must be provided
35 by the caller. See :meth:`python_modules.id_generator.get` or
36 :meth:`python_modules.string_utils.generate_uuid` for potential
39 tags: Set[str] = field(default_factory=set)
40 """A set of tag strings for this document. May be empty. Tags
41 are simply text labels that are associated with a document and
42 may be used to search for it later.
45 properties: List[Tuple[str, str]] = field(default_factory=list)
46 """A list of key->value strings for this document. May be empty.
47 Properties are more flexible tags that have both a label and a
48 value. e.g. "category:mystery" or "author:smith"."""
50 reference: Optional[Any] = None
51 """An optional reference to something else for convenience;
52 interpreted only by caller code, ignored here.
56 class Operation(enum.Enum):
57 """A logical search query operation."""
65 def from_token(token: str):
67 "not": Operation.INVERSION,
68 "and": Operation.CONJUNCTION,
69 "or": Operation.DISJUNCTION,
71 return table.get(token, None)
73 def num_operands(self) -> Optional[int]:
75 Operation.INVERSION: 1,
76 Operation.CONJUNCTION: 2,
77 Operation.DISJUNCTION: 2,
79 return table.get(self, None)
83 """A collection of searchable documents. The caller can
84 add documents to it (or edit existing docs) via :meth:`add_doc`,
85 retrieve a document given its docid via :meth:`get_doc`, and
86 perform various lookups of documents. The most interesting
87 lookup is implemented in :meth:`query`.
90 >>> c.add_doc(Document(
92 ... tags=set(['urgent', 'important']),
94 ... ('author', 'Scott'),
95 ... ('subject', 'your anniversary')
100 >>> c.add_doc(Document(
102 ... tags=set(['important']),
104 ... ('author', 'Joe'),
105 ... ('subject', 'your performance at work')
110 >>> c.add_doc(Document(
112 ... tags=set(['urgent']),
114 ... ('author', 'Scott'),
115 ... ('subject', 'car turning in front of you')
120 >>> c.query('author:Scott and important')
126 >>> c.query('*:Scott')
130 def __init__(self) -> None:
131 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
132 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
133 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
134 self.documents_by_docid: Dict[str, Document] = {}
136 def add_doc(self, doc: Document) -> None:
137 """Add a new Document to the Corpus. Each Document must have a
138 distinct docid that will serve as its primary identifier. If
139 the same Document is added multiple times, only the most
140 recent addition is indexed. If two distinct documents with
141 the same docid are added, the latter klobbers the former in
142 the indexes. See :meth:`python_modules.id_generator.get` or
143 :meth:`python_modules.string_utils.generate_uuid` for potential
146 Each Document may have an optional set of tags which can be
147 used later in expressions to the query method. These are simple
150 Each Document may have an optional list of key->value tuples
151 which can be used later in expressions to the query method.
153 Document includes a user-defined "reference" field which is
154 never interpreted by this module. This is meant to allow easy
155 mapping between Documents in this corpus and external objects
159 doc: the document to add or edit
162 if doc.docid in self.documents_by_docid:
163 # Handle collisions; assume that we are re-indexing the
164 # same document so remove it from the indexes before
165 # adding it back again.
166 colliding_doc = self.documents_by_docid[doc.docid]
167 assert colliding_doc.docid == doc.docid
168 del self.documents_by_docid[doc.docid]
169 for tag in colliding_doc.tags:
170 self.docids_by_tag[tag].remove(doc.docid)
171 for key, value in colliding_doc.properties:
172 self.docids_by_property[(key, value)].remove(doc.docid)
173 self.docids_with_property[key].remove(doc.docid)
175 # Index the new Document
176 assert doc.docid not in self.documents_by_docid
177 self.documents_by_docid[doc.docid] = doc
179 self.docids_by_tag[tag].add(doc.docid)
180 for key, value in doc.properties:
181 self.docids_by_property[(key, value)].add(doc.docid)
182 self.docids_with_property[key].add(doc.docid)
184 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
185 """Return the set of docids that have a particular tag.
188 tag: the tag for which to search
191 A set containing docids with the provided tag which
193 return self.docids_by_tag[tag]
195 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
196 """Return the set of docids with a tag that contains a str.
199 tag: the tag pattern for which to search
202 A set containing docids with tags that match the pattern
203 provided. e.g., if the arg was "foo" tags "football", "foobar",
204 and "food" all match.
207 for search_tag in self.docids_by_tag:
208 if tag in search_tag:
209 for docid in self.docids_by_tag[search_tag]:
213 def get_docids_with_property(self, key: str) -> Set[str]:
214 """Return the set of docids that have a particular property no matter
215 what that property's value.
218 key: the key value to search for.
221 A set of docids that contain the key (no matter what value)
224 return self.docids_with_property[key]
226 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
227 """Return the set of docids that have a particular property with a
231 key: the key to search for
232 value: the value that key must have in order to match a doc.
235 A set of docids that contain key with value which may be empty.
237 return self.docids_by_property[(key, value)]
239 def invert_docid_set(self, original: Set[str]) -> Set[str]:
240 """Invert a set of docids."""
241 return {docid for docid in self.documents_by_docid if docid not in original}
243 def get_doc(self, docid: str) -> Optional[Document]:
244 """Given a docid, retrieve the previously added Document.
247 docid: the docid to retrieve
250 The Document with docid or None to indicate no match.
252 return self.documents_by_docid.get(docid, None)
254 def query(self, query: str) -> Optional[Set[str]]:
255 """Query the corpus for documents that match a logical expression.
258 query: the logical query expressed using a simple language
259 that understands conjunction (and operator), disjunction
260 (or operator) and inversion (not operator) as well as
261 parenthesis. Here are some legal sample queries::
263 tag1 and tag2 and not tag3
265 (tag1 or tag2) and (tag3 or tag4)
267 (tag1 and key2:value2) or (tag2 and key1:value1)
274 A (potentially empty) set of docids for the matching
275 (previously added) documents or None on error.
279 root = self._parse_query(query)
280 except ParseError as e:
281 print(e.message, file=sys.stderr)
285 def _parse_query(self, query: str):
286 """Internal parse helper; prefer to use query instead."""
288 parens = set(["(", ")"])
289 and_or = set(["and", "or"])
291 def operator_precedence(token: str) -> Optional[int]:
299 return table.get(token, None)
301 def is_operator(token: str) -> bool:
302 return operator_precedence(token) is not None
305 tokens = query.split()
307 # Handle ( and ) operators stuck to the ends of tokens
308 # that split() doesn't understand.
322 def evaluate(corpus: Corpus, stack: List[str]):
325 ParseError: bad number of operations, unbalanced parenthesis,
326 unknown operators, internal errors.
328 node_stack: List[Node] = []
331 if not is_operator(token):
332 node = Node(corpus, Operation.QUERY, [token])
335 operation = Operation.from_token(token)
336 operand_count = operation.num_operands()
337 if len(node_stack) < operand_count:
339 f"Incorrect number of operations for {operation}"
341 for _ in range(operation.num_operands()):
342 args.append(node_stack.pop())
343 node = Node(corpus, operation, args)
344 node_stack.append(node)
349 for token in lex(query):
350 if not is_operator(token):
351 output_stack.append(token)
354 # token is an operator...
356 operator_stack.append(token)
359 while len(operator_stack) > 0:
360 pop_operator = operator_stack.pop()
361 if pop_operator != "(":
362 output_stack.append(pop_operator)
367 raise ParseError("Unbalanced parenthesis in query expression")
371 my_precedence = operator_precedence(token)
372 if my_precedence is None:
373 raise ParseError(f"Unknown operator: {token}")
374 while len(operator_stack) > 0:
375 peek_operator = operator_stack[-1]
376 if not is_operator(peek_operator) or peek_operator == "(":
378 peek_precedence = operator_precedence(peek_operator)
379 if peek_precedence is None:
380 raise ParseError("Internal error")
382 (peek_precedence < my_precedence)
383 or (peek_precedence == my_precedence)
384 and (peek_operator not in and_or)
387 output_stack.append(operator_stack.pop())
388 operator_stack.append(token)
389 while len(operator_stack) > 0:
390 token = operator_stack.pop()
392 raise ParseError("Unbalanced parenthesis in query expression")
393 output_stack.append(token)
394 return evaluate(self, output_stack)
398 """A query AST node."""
404 operands: Sequence[Union[Node, str]],
408 self.operands = operands
410 def eval(self) -> Set[str]:
411 """Evaluate this node.
414 ParseError: unexpected operands, invalid key:value syntax, wrong
415 number of operands for operation, other invalid queries.
418 evaled_operands: List[Union[Set[str], str]] = []
419 for operand in self.operands:
420 if isinstance(operand, Node):
421 evaled_operands.append(operand.eval())
422 elif isinstance(operand, str):
423 evaled_operands.append(operand)
425 raise ParseError(f"Unexpected operand: {operand}")
428 if self.op is Operation.QUERY:
429 for tag in evaled_operands:
430 if isinstance(tag, str):
433 key, value = tag.split(":")
434 except ValueError as v:
436 f'Invalid key:value syntax at "{tag}"'
441 for kv, s in self.corpus.docids_by_property.items():
442 if value in ("*", kv[1]):
446 r = self.corpus.get_docids_with_property(key)
448 r = self.corpus.get_docids_by_property(key, value)
452 for s in self.corpus.docids_by_tag.values():
455 r = self.corpus.get_docids_by_exact_tag(tag)
458 raise ParseError(f"Unexpected query {tag}")
459 elif self.op is Operation.DISJUNCTION:
460 if len(evaled_operands) != 2:
461 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
462 retval.update(evaled_operands[0])
463 retval.update(evaled_operands[1])
464 elif self.op is Operation.CONJUNCTION:
465 if len(evaled_operands) != 2:
466 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
467 retval.update(evaled_operands[0])
468 retval = retval.intersection(evaled_operands[1])
469 elif self.op is Operation.INVERSION:
470 if len(evaled_operands) != 1:
471 raise ParseError("Operation.INVERSION (not) expects one operand.")
472 _ = evaled_operands[0]
473 if isinstance(_, set):
474 retval.update(self.corpus.invert_docid_set(_))
476 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
480 if __name__ == "__main__":