2 # pylint: disable=too-many-nested-blocks
4 # © Copyright 2021-2023, Scott Gasch
6 """This is a module concerned with the creation of and searching of a
7 corpus of documents. The corpus and index are held in memory.
8 The query language contains AND, OR, NOT, and parenthesis to support
9 flexible search semantics.
12 from __future__ import annotations
16 from collections import defaultdict
17 from dataclasses import dataclass, field
18 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
21 class ParseError(Exception):
22 """An error encountered while parsing a logical search expression."""
24 def __init__(self, message: str):
26 self.message = message
31 """A class representing a searchable document."""
34 """A unique identifier for each document -- must be provided
35 by the caller. See :meth:`python_modules.id_generator.get` or
36 :meth:`python_modules.string_utils.generate_uuid` for potential
39 tags: Set[str] = field(default_factory=set)
40 """A set of tag strings for this document. May be empty. Tags
41 are simply text labels that are associated with a document and
42 may be used to search for it later.
45 properties: List[Tuple[str, str]] = field(default_factory=list)
46 """A list of key->value strings for this document. May be empty.
47 Properties are more flexible tags that have both a label and a
48 value. e.g. "category:mystery" or "author:smith"."""
50 reference: Optional[Any] = None
51 """An optional reference to something else for convenience;
52 interpreted only by caller code, ignored here.
56 class Operation(enum.Enum):
57 """A logical search query operation."""
65 def from_token(token: str):
67 "not": Operation.INVERSION,
68 "and": Operation.CONJUNCTION,
69 "or": Operation.DISJUNCTION,
71 return table.get(token, None)
73 def num_operands(self) -> Optional[int]:
75 Operation.INVERSION: 1,
76 Operation.CONJUNCTION: 2,
77 Operation.DISJUNCTION: 2,
79 return table.get(self, None)
83 """A collection of searchable documents. The caller can
84 add documents to it (or edit existing docs) via :meth:`add_doc`,
85 retrieve a document given its docid via :meth:`get_doc`, and
86 perform various lookups of documents. The most interesting
87 lookup is implemented in :meth:`query`.
90 >>> c.add_doc(Document(
92 ... tags=set(['urgent', 'important']),
94 ... ('author', 'Scott'),
95 ... ('subject', 'your anniversary')
100 >>> c.add_doc(Document(
102 ... tags=set(['important']),
104 ... ('author', 'Joe'),
105 ... ('subject', 'your performance at work')
110 >>> c.add_doc(Document(
112 ... tags=set(['urgent']),
114 ... ('author', 'Scott'),
115 ... ('subject', 'car turning in front of you')
120 >>> c.query('author:Scott and important')
126 >>> c.query('*:Scott')
130 def __init__(self) -> None:
131 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
132 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
133 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
134 self.documents_by_docid: Dict[str, Document] = {}
136 def add_doc(self, doc: Document) -> None:
137 """Add a new Document to the Corpus. Each Document must have a
138 distinct docid that will serve as its primary identifier. If
139 the same Document is added multiple times, only the most
140 recent addition is indexed. If two distinct documents with
141 the same docid are added, the latter klobbers the former in
142 the indexes. See :meth:`python_modules.id_generator.get` or
143 :meth:`python_modules.string_utils.generate_uuid` for potential
146 Each Document may have an optional set of tags which can be
147 used later in expressions to the query method. These are simple
150 Each Document may have an optional list of key->value tuples
151 which can be used later in expressions to the query method.
153 Document includes a user-defined "reference" field which is
154 never interpreted by this module. This is meant to allow easy
155 mapping between Documents in this corpus and external objects
159 doc: the document to add or edit
162 if doc.docid in self.documents_by_docid:
163 # Handle collisions; assume that we are re-indexing the
164 # same document so remove it from the indexes before
165 # adding it back again.
166 colliding_doc = self.documents_by_docid[doc.docid]
167 assert colliding_doc.docid == doc.docid
168 del self.documents_by_docid[doc.docid]
169 for tag in colliding_doc.tags:
170 self.docids_by_tag[tag].remove(doc.docid)
171 for key, value in colliding_doc.properties:
172 self.docids_by_property[(key, value)].remove(doc.docid)
173 self.docids_with_property[key].remove(doc.docid)
175 # Index the new Document
176 assert doc.docid not in self.documents_by_docid
177 self.documents_by_docid[doc.docid] = doc
179 self.docids_by_tag[tag].add(doc.docid)
180 for key, value in doc.properties:
181 self.docids_by_property[(key, value)].add(doc.docid)
182 self.docids_with_property[key].add(doc.docid)
184 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
185 """Return the set of docids that have a particular tag.
188 tag: the tag for which to search
191 A set containing docids with the provided tag which
193 return self.docids_by_tag[tag]
195 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
196 """Return the set of docids with a tag that contains a str.
199 tag: the tag pattern for which to search
202 A set containing docids with tags that match the pattern
203 provided. e.g., if the arg was "foo" tags "football", "foobar",
204 and "food" all match.
207 for search_tag in self.docids_by_tag:
208 if tag in search_tag:
209 for docid in self.docids_by_tag[search_tag]:
213 def get_docids_with_property(self, key: str) -> Set[str]:
214 """Return the set of docids that have a particular property no matter
215 what that property's value.
218 key: the key value to search for.
221 A set of docids that contain the key (no matter what value)
224 return self.docids_with_property[key]
226 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
227 """Return the set of docids that have a particular property with a
231 key: the key to search for
232 value: the value that key must have in order to match a doc.
235 A set of docids that contain key with value which may be empty.
237 return self.docids_by_property[(key, value)]
239 def invert_docid_set(self, original: Set[str]) -> Set[str]:
240 """Invert a set of docids."""
241 return {docid for docid in self.documents_by_docid if docid not in original}
243 def get_doc(self, docid: str) -> Optional[Document]:
244 """Given a docid, retrieve the previously added Document.
247 docid: the docid to retrieve
250 The Document with docid or None to indicate no match.
252 return self.documents_by_docid.get(docid, None)
254 def query(self, query: str) -> Optional[Set[str]]:
255 """Query the corpus for documents that match a logical expression.
258 query: the logical query expressed using a simple language
259 that understands conjunction (and operator), disjunction
260 (or operator) and inversion (not operator) as well as
261 parenthesis. Here are some legal sample queries::
263 tag1 and tag2 and not tag3
265 (tag1 or tag2) and (tag3 or tag4)
267 (tag1 and key2:value2) or (tag2 and key1:value1)
274 A (potentially empty) set of docids for the matching
275 (previously added) documents or None on error.
279 root = self._parse_query(query)
280 except ParseError as e:
281 print(e.message, file=sys.stderr)
285 def _parse_query(self, query: str):
286 """Internal parse helper; prefer to use query instead."""
288 parens = set(["(", ")"])
289 and_or = set(["and", "or"])
291 def operator_precedence(token: str) -> Optional[int]:
299 return table.get(token, None)
301 def is_operator(token: str) -> bool:
302 return operator_precedence(token) is not None
305 tokens = query.split()
307 # Handle ( and ) operators stuck to the ends of tokens
308 # that split() doesn't understand.
322 def evaluate(corpus: Corpus, stack: List[str]):
323 node_stack: List[Node] = []
326 if not is_operator(token):
327 node = Node(corpus, Operation.QUERY, [token])
330 operation = Operation.from_token(token)
331 operand_count = operation.num_operands()
332 if len(node_stack) < operand_count:
334 f"Incorrect number of operations for {operation}"
336 for _ in range(operation.num_operands()):
337 args.append(node_stack.pop())
338 node = Node(corpus, operation, args)
339 node_stack.append(node)
344 for token in lex(query):
345 if not is_operator(token):
346 output_stack.append(token)
349 # token is an operator...
351 operator_stack.append(token)
354 while len(operator_stack) > 0:
355 pop_operator = operator_stack.pop()
356 if pop_operator != "(":
357 output_stack.append(pop_operator)
362 raise ParseError("Unbalanced parenthesis in query expression")
366 my_precedence = operator_precedence(token)
367 if my_precedence is None:
368 raise ParseError(f"Unknown operator: {token}")
369 while len(operator_stack) > 0:
370 peek_operator = operator_stack[-1]
371 if not is_operator(peek_operator) or peek_operator == "(":
373 peek_precedence = operator_precedence(peek_operator)
374 if peek_precedence is None:
375 raise ParseError("Internal error")
377 (peek_precedence < my_precedence)
378 or (peek_precedence == my_precedence)
379 and (peek_operator not in and_or)
382 output_stack.append(operator_stack.pop())
383 operator_stack.append(token)
384 while len(operator_stack) > 0:
385 token = operator_stack.pop()
387 raise ParseError("Unbalanced parenthesis in query expression")
388 output_stack.append(token)
389 return evaluate(self, output_stack)
393 """A query AST node."""
399 operands: Sequence[Union[Node, str]],
403 self.operands = operands
405 def eval(self) -> Set[str]:
406 """Evaluate this node."""
408 evaled_operands: List[Union[Set[str], str]] = []
409 for operand in self.operands:
410 if isinstance(operand, Node):
411 evaled_operands.append(operand.eval())
412 elif isinstance(operand, str):
413 evaled_operands.append(operand)
415 raise ParseError(f"Unexpected operand: {operand}")
418 if self.op is Operation.QUERY:
419 for tag in evaled_operands:
420 if isinstance(tag, str):
423 key, value = tag.split(":")
424 except ValueError as v:
426 f'Invalid key:value syntax at "{tag}"'
431 for kv, s in self.corpus.docids_by_property.items():
432 if value in ("*", kv[1]):
436 r = self.corpus.get_docids_with_property(key)
438 r = self.corpus.get_docids_by_property(key, value)
442 for s in self.corpus.docids_by_tag.values():
445 r = self.corpus.get_docids_by_exact_tag(tag)
448 raise ParseError(f"Unexpected query {tag}")
449 elif self.op is Operation.DISJUNCTION:
450 if len(evaled_operands) != 2:
451 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
452 retval.update(evaled_operands[0])
453 retval.update(evaled_operands[1])
454 elif self.op is Operation.CONJUNCTION:
455 if len(evaled_operands) != 2:
456 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
457 retval.update(evaled_operands[0])
458 retval = retval.intersection(evaled_operands[1])
459 elif self.op is Operation.INVERSION:
460 if len(evaled_operands) != 1:
461 raise ParseError("Operation.INVERSION (not) expects one operand.")
462 _ = evaled_operands[0]
463 if isinstance(_, set):
464 retval.update(self.corpus.invert_docid_set(_))
466 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
470 if __name__ == "__main__":