3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents. The corpus and index are held in memory.
9 from __future__ import annotations
12 from collections import defaultdict
13 from dataclasses import dataclass, field
14 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
17 class ParseError(Exception):
18 """An error encountered while parsing a logical search expression."""
20 def __init__(self, message: str):
22 self.message = message
27 """A class representing a searchable document."""
30 """A unique identifier for each document -- must be provided
31 by the caller. See :meth:`python_modules.id_generator.get` or
32 :meth:`python_modules.string_utils.generate_uuid` for potential
35 tags: Set[str] = field(default_factory=set)
36 """A set of tag strings for this document. May be empty. Tags
37 are simply text labels that are associated with a document and
38 may be used to search for it later.
41 properties: List[Tuple[str, str]] = field(default_factory=list)
42 """A list of key->value strings for this document. May be empty.
43 Properties are more flexible tags that have both a label and a
44 value. e.g. "category:mystery" or "author:smith"."""
46 reference: Optional[Any] = None
47 """An optional reference to something else for convenience;
48 interpreted only by caller code, ignored here.
52 class Operation(enum.Enum):
53 """A logical search query operation."""
61 def from_token(token: str):
63 "not": Operation.INVERSION,
64 "and": Operation.CONJUNCTION,
65 "or": Operation.DISJUNCTION,
67 return table.get(token, None)
69 def num_operands(self) -> Optional[int]:
71 Operation.INVERSION: 1,
72 Operation.CONJUNCTION: 2,
73 Operation.DISJUNCTION: 2,
75 return table.get(self, None)
79 """A collection of searchable documents. The caller can
80 add documents to it (or edit existing docs) via :meth:`add_doc`,
81 retrieve a document given its docid via :meth:`get_doc`, and
82 perform various lookups of documents. The most interesting
83 lookup is implemented in :meth:`query`.
86 >>> c.add_doc(Document(
88 ... tags=set(['urgent', 'important']),
90 ... ('author', 'Scott'),
91 ... ('subject', 'your anniversary')
96 >>> c.add_doc(Document(
98 ... tags=set(['important']),
100 ... ('author', 'Joe'),
101 ... ('subject', 'your performance at work')
106 >>> c.add_doc(Document(
108 ... tags=set(['urgent']),
110 ... ('author', 'Scott'),
111 ... ('subject', 'car turning in front of you')
116 >>> c.query('author:Scott and important')
122 >>> c.query('*:Scott')
126 def __init__(self) -> None:
127 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
128 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
129 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
130 self.documents_by_docid: Dict[str, Document] = {}
132 def add_doc(self, doc: Document) -> None:
133 """Add a new Document to the Corpus. Each Document must have a
134 distinct docid that will serve as its primary identifier. If
135 the same Document is added multiple times, only the most
136 recent addition is indexed. If two distinct documents with
137 the same docid are added, the latter klobbers the former in
138 the indexes. See :meth:`python_modules.id_generator.get` or
139 :meth:`python_modules.string_utils.generate_uuid` for potential
142 Each Document may have an optional set of tags which can be
143 used later in expressions to the query method. These are simple
146 Each Document may have an optional list of key->value tuples
147 which can be used later in expressions to the query method.
149 Document includes a user-defined "reference" field which is
150 never interpreted by this module. This is meant to allow easy
151 mapping between Documents in this corpus and external objects
155 doc: the document to add or edit
158 if doc.docid in self.documents_by_docid:
159 # Handle collisions; assume that we are re-indexing the
160 # same document so remove it from the indexes before
161 # adding it back again.
162 colliding_doc = self.documents_by_docid[doc.docid]
163 assert colliding_doc.docid == doc.docid
164 del self.documents_by_docid[doc.docid]
165 for tag in colliding_doc.tags:
166 self.docids_by_tag[tag].remove(doc.docid)
167 for key, value in colliding_doc.properties:
168 self.docids_by_property[(key, value)].remove(doc.docid)
169 self.docids_with_property[key].remove(doc.docid)
171 # Index the new Document
172 assert doc.docid not in self.documents_by_docid
173 self.documents_by_docid[doc.docid] = doc
175 self.docids_by_tag[tag].add(doc.docid)
176 for key, value in doc.properties:
177 self.docids_by_property[(key, value)].add(doc.docid)
178 self.docids_with_property[key].add(doc.docid)
180 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
181 """Return the set of docids that have a particular tag.
184 tag: the tag for which to search
187 A set containing docids with the provided tag which
189 return self.docids_by_tag[tag]
191 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
192 """Return the set of docids with a tag that contains a str.
195 tag: the tag pattern for which to search
198 A set containing docids with tags that match the pattern
199 provided. e.g., if the arg was "foo" tags "football", "foobar",
200 and "food" all match.
203 for search_tag in self.docids_by_tag:
204 if tag in search_tag:
205 for docid in self.docids_by_tag[search_tag]:
209 def get_docids_with_property(self, key: str) -> Set[str]:
210 """Return the set of docids that have a particular property no matter
211 what that property's value.
214 key: the key value to search for.
217 A set of docids that contain the key (no matter what value)
220 return self.docids_with_property[key]
222 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
223 """Return the set of docids that have a particular property with a
227 key: the key to search for
228 value: the value that key must have in order to match a doc.
231 A set of docids that contain key with value which may be empty.
233 return self.docids_by_property[(key, value)]
235 def invert_docid_set(self, original: Set[str]) -> Set[str]:
236 """Invert a set of docids."""
237 return {docid for docid in self.documents_by_docid if docid not in original}
239 def get_doc(self, docid: str) -> Optional[Document]:
240 """Given a docid, retrieve the previously added Document.
243 docid: the docid to retrieve
246 The Document with docid or None to indicate no match.
248 return self.documents_by_docid.get(docid, None)
250 def query(self, query: str) -> Optional[Set[str]]:
251 """Query the corpus for documents that match a logical expression.
254 query: the logical query expressed using a simple language
255 that understands conjunction (and operator), disjunction
256 (or operator) and inversion (not operator) as well as
257 parenthesis. Here are some legal sample queries::
259 tag1 and tag2 and not tag3
261 (tag1 or tag2) and (tag3 or tag4)
263 (tag1 and key2:value2) or (tag2 and key1:value1)
270 A (potentially empty) set of docids for the matching
271 (previously added) documents or None on error.
275 root = self._parse_query(query)
276 except ParseError as e:
277 print(e.message, file=sys.stderr)
281 def _parse_query(self, query: str):
282 """Internal parse helper; prefer to use query instead."""
284 parens = set(["(", ")"])
285 and_or = set(["and", "or"])
287 def operator_precedence(token: str) -> Optional[int]:
295 return table.get(token, None)
297 def is_operator(token: str) -> bool:
298 return operator_precedence(token) is not None
301 tokens = query.split()
303 # Handle ( and ) operators stuck to the ends of tokens
304 # that split() doesn't understand.
318 def evaluate(corpus: Corpus, stack: List[str]):
319 node_stack: List[Node] = []
322 if not is_operator(token):
323 node = Node(corpus, Operation.QUERY, [token])
326 operation = Operation.from_token(token)
327 operand_count = operation.num_operands()
328 if len(node_stack) < operand_count:
329 raise ParseError(f"Incorrect number of operations for {operation}")
330 for _ in range(operation.num_operands()):
331 args.append(node_stack.pop())
332 node = Node(corpus, operation, args)
333 node_stack.append(node)
338 for token in lex(query):
339 if not is_operator(token):
340 output_stack.append(token)
343 # token is an operator...
345 operator_stack.append(token)
348 while len(operator_stack) > 0:
349 pop_operator = operator_stack.pop()
350 if pop_operator != "(":
351 output_stack.append(pop_operator)
356 raise ParseError("Unbalanced parenthesis in query expression")
360 my_precedence = operator_precedence(token)
361 if my_precedence is None:
362 raise ParseError(f"Unknown operator: {token}")
363 while len(operator_stack) > 0:
364 peek_operator = operator_stack[-1]
365 if not is_operator(peek_operator) or peek_operator == "(":
367 peek_precedence = operator_precedence(peek_operator)
368 if peek_precedence is None:
369 raise ParseError("Internal error")
371 (peek_precedence < my_precedence)
372 or (peek_precedence == my_precedence)
373 and (peek_operator not in and_or)
376 output_stack.append(operator_stack.pop())
377 operator_stack.append(token)
378 while len(operator_stack) > 0:
379 token = operator_stack.pop()
381 raise ParseError("Unbalanced parenthesis in query expression")
382 output_stack.append(token)
383 return evaluate(self, output_stack)
387 """A query AST node."""
393 operands: Sequence[Union[Node, str]],
397 self.operands = operands
399 def eval(self) -> Set[str]:
400 """Evaluate this node."""
402 evaled_operands: List[Union[Set[str], str]] = []
403 for operand in self.operands:
404 if isinstance(operand, Node):
405 evaled_operands.append(operand.eval())
406 elif isinstance(operand, str):
407 evaled_operands.append(operand)
409 raise ParseError(f"Unexpected operand: {operand}")
412 if self.op is Operation.QUERY:
413 for tag in evaled_operands:
414 if isinstance(tag, str):
417 key, value = tag.split(":")
418 except ValueError as v:
419 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
423 for kv, s in self.corpus.docids_by_property.items():
424 if value in ('*', kv[1]):
428 r = self.corpus.get_docids_with_property(key)
430 r = self.corpus.get_docids_by_property(key, value)
434 for s in self.corpus.docids_by_tag.values():
437 r = self.corpus.get_docids_by_exact_tag(tag)
440 raise ParseError(f"Unexpected query {tag}")
441 elif self.op is Operation.DISJUNCTION:
442 if len(evaled_operands) != 2:
443 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
444 retval.update(evaled_operands[0])
445 retval.update(evaled_operands[1])
446 elif self.op is Operation.CONJUNCTION:
447 if len(evaled_operands) != 2:
448 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
449 retval.update(evaled_operands[0])
450 retval = retval.intersection(evaled_operands[1])
451 elif self.op is Operation.INVERSION:
452 if len(evaled_operands) != 1:
453 raise ParseError("Operation.INVERSION (not) expects one operand.")
454 _ = evaled_operands[0]
455 if isinstance(_, set):
456 retval.update(self.corpus.invert_docid_set(_))
458 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
462 if __name__ == '__main__':