3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents. The corpus and index are held in memory.
9 from __future__ import annotations
13 from collections import defaultdict
14 from dataclasses import dataclass, field
15 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
18 class ParseError(Exception):
19 """An error encountered while parsing a logical search expression."""
21 def __init__(self, message: str):
23 self.message = message
28 """A class representing a searchable document."""
31 """A unique identifier for each document -- must be provided
32 by the caller. See :meth:`python_modules.id_generator.get` or
33 :meth:`python_modules.string_utils.generate_uuid` for potential
36 tags: Set[str] = field(default_factory=set)
37 """A set of tag strings for this document. May be empty. Tags
38 are simply text labels that are associated with a document and
39 may be used to search for it later.
42 properties: List[Tuple[str, str]] = field(default_factory=list)
43 """A list of key->value strings for this document. May be empty.
44 Properties are more flexible tags that have both a label and a
45 value. e.g. "category:mystery" or "author:smith"."""
47 reference: Optional[Any] = None
48 """An optional reference to something else for convenience;
49 interpreted only by caller code, ignored here.
53 class Operation(enum.Enum):
54 """A logical search query operation."""
62 def from_token(token: str):
64 "not": Operation.INVERSION,
65 "and": Operation.CONJUNCTION,
66 "or": Operation.DISJUNCTION,
68 return table.get(token, None)
70 def num_operands(self) -> Optional[int]:
72 Operation.INVERSION: 1,
73 Operation.CONJUNCTION: 2,
74 Operation.DISJUNCTION: 2,
76 return table.get(self, None)
80 """A collection of searchable documents. The caller can
81 add documents to it (or edit existing docs) via :meth:`add_doc`,
82 retrieve a document given its docid via :meth:`get_doc`, and
83 perform various lookups of documents. The most interesting
84 lookup is implemented in :meth:`query`.
87 >>> c.add_doc(Document(
89 ... tags=set(['urgent', 'important']),
91 ... ('author', 'Scott'),
92 ... ('subject', 'your anniversary')
97 >>> c.add_doc(Document(
99 ... tags=set(['important']),
101 ... ('author', 'Joe'),
102 ... ('subject', 'your performance at work')
107 >>> c.add_doc(Document(
109 ... tags=set(['urgent']),
111 ... ('author', 'Scott'),
112 ... ('subject', 'car turning in front of you')
117 >>> c.query('author:Scott and important')
123 >>> c.query('*:Scott')
127 def __init__(self) -> None:
128 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
129 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
130 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
131 self.documents_by_docid: Dict[str, Document] = {}
133 def add_doc(self, doc: Document) -> None:
134 """Add a new Document to the Corpus. Each Document must have a
135 distinct docid that will serve as its primary identifier. If
136 the same Document is added multiple times, only the most
137 recent addition is indexed. If two distinct documents with
138 the same docid are added, the latter klobbers the former in
139 the indexes. See :meth:`python_modules.id_generator.get` or
140 :meth:`python_modules.string_utils.generate_uuid` for potential
143 Each Document may have an optional set of tags which can be
144 used later in expressions to the query method. These are simple
147 Each Document may have an optional list of key->value tuples
148 which can be used later in expressions to the query method.
150 Document includes a user-defined "reference" field which is
151 never interpreted by this module. This is meant to allow easy
152 mapping between Documents in this corpus and external objects
156 doc: the document to add or edit
159 if doc.docid in self.documents_by_docid:
160 # Handle collisions; assume that we are re-indexing the
161 # same document so remove it from the indexes before
162 # adding it back again.
163 colliding_doc = self.documents_by_docid[doc.docid]
164 assert colliding_doc.docid == doc.docid
165 del self.documents_by_docid[doc.docid]
166 for tag in colliding_doc.tags:
167 self.docids_by_tag[tag].remove(doc.docid)
168 for key, value in colliding_doc.properties:
169 self.docids_by_property[(key, value)].remove(doc.docid)
170 self.docids_with_property[key].remove(doc.docid)
172 # Index the new Document
173 assert doc.docid not in self.documents_by_docid
174 self.documents_by_docid[doc.docid] = doc
176 self.docids_by_tag[tag].add(doc.docid)
177 for key, value in doc.properties:
178 self.docids_by_property[(key, value)].add(doc.docid)
179 self.docids_with_property[key].add(doc.docid)
181 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
182 """Return the set of docids that have a particular tag.
185 tag: the tag for which to search
188 A set containing docids with the provided tag which
190 return self.docids_by_tag[tag]
192 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
193 """Return the set of docids with a tag that contains a str.
196 tag: the tag pattern for which to search
199 A set containing docids with tags that match the pattern
200 provided. e.g., if the arg was "foo" tags "football", "foobar",
201 and "food" all match.
204 for search_tag in self.docids_by_tag:
205 if tag in search_tag:
206 for docid in self.docids_by_tag[search_tag]:
210 def get_docids_with_property(self, key: str) -> Set[str]:
211 """Return the set of docids that have a particular property no matter
212 what that property's value.
215 key: the key value to search for.
218 A set of docids that contain the key (no matter what value)
221 return self.docids_with_property[key]
223 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
224 """Return the set of docids that have a particular property with a
228 key: the key to search for
229 value: the value that key must have in order to match a doc.
232 A set of docids that contain key with value which may be empty.
234 return self.docids_by_property[(key, value)]
236 def invert_docid_set(self, original: Set[str]) -> Set[str]:
237 """Invert a set of docids."""
238 return {docid for docid in self.documents_by_docid if docid not in original}
240 def get_doc(self, docid: str) -> Optional[Document]:
241 """Given a docid, retrieve the previously added Document.
244 docid: the docid to retrieve
247 The Document with docid or None to indicate no match.
249 return self.documents_by_docid.get(docid, None)
251 def query(self, query: str) -> Optional[Set[str]]:
252 """Query the corpus for documents that match a logical expression.
255 query: the logical query expressed using a simple language
256 that understands conjunction (and operator), disjunction
257 (or operator) and inversion (not operator) as well as
258 parenthesis. Here are some legal sample queries::
260 tag1 and tag2 and not tag3
262 (tag1 or tag2) and (tag3 or tag4)
264 (tag1 and key2:value2) or (tag2 and key1:value1)
271 A (potentially empty) set of docids for the matching
272 (previously added) documents or None on error.
276 root = self._parse_query(query)
277 except ParseError as e:
278 print(e.message, file=sys.stderr)
282 def _parse_query(self, query: str):
283 """Internal parse helper; prefer to use query instead."""
285 parens = set(["(", ")"])
286 and_or = set(["and", "or"])
288 def operator_precedence(token: str) -> Optional[int]:
296 return table.get(token, None)
298 def is_operator(token: str) -> bool:
299 return operator_precedence(token) is not None
302 tokens = query.split()
304 # Handle ( and ) operators stuck to the ends of tokens
305 # that split() doesn't understand.
319 def evaluate(corpus: Corpus, stack: List[str]):
320 node_stack: List[Node] = []
323 if not is_operator(token):
324 node = Node(corpus, Operation.QUERY, [token])
327 operation = Operation.from_token(token)
328 operand_count = operation.num_operands()
329 if len(node_stack) < operand_count:
331 f"Incorrect number of operations for {operation}"
333 for _ in range(operation.num_operands()):
334 args.append(node_stack.pop())
335 node = Node(corpus, operation, args)
336 node_stack.append(node)
341 for token in lex(query):
342 if not is_operator(token):
343 output_stack.append(token)
346 # token is an operator...
348 operator_stack.append(token)
351 while len(operator_stack) > 0:
352 pop_operator = operator_stack.pop()
353 if pop_operator != "(":
354 output_stack.append(pop_operator)
359 raise ParseError("Unbalanced parenthesis in query expression")
363 my_precedence = operator_precedence(token)
364 if my_precedence is None:
365 raise ParseError(f"Unknown operator: {token}")
366 while len(operator_stack) > 0:
367 peek_operator = operator_stack[-1]
368 if not is_operator(peek_operator) or peek_operator == "(":
370 peek_precedence = operator_precedence(peek_operator)
371 if peek_precedence is None:
372 raise ParseError("Internal error")
374 (peek_precedence < my_precedence)
375 or (peek_precedence == my_precedence)
376 and (peek_operator not in and_or)
379 output_stack.append(operator_stack.pop())
380 operator_stack.append(token)
381 while len(operator_stack) > 0:
382 token = operator_stack.pop()
384 raise ParseError("Unbalanced parenthesis in query expression")
385 output_stack.append(token)
386 return evaluate(self, output_stack)
390 """A query AST node."""
396 operands: Sequence[Union[Node, str]],
400 self.operands = operands
402 def eval(self) -> Set[str]:
403 """Evaluate this node."""
405 evaled_operands: List[Union[Set[str], str]] = []
406 for operand in self.operands:
407 if isinstance(operand, Node):
408 evaled_operands.append(operand.eval())
409 elif isinstance(operand, str):
410 evaled_operands.append(operand)
412 raise ParseError(f"Unexpected operand: {operand}")
415 if self.op is Operation.QUERY:
416 for tag in evaled_operands:
417 if isinstance(tag, str):
420 key, value = tag.split(":")
421 except ValueError as v:
423 f'Invalid key:value syntax at "{tag}"'
428 for kv, s in self.corpus.docids_by_property.items():
429 if value in ('*', kv[1]):
433 r = self.corpus.get_docids_with_property(key)
435 r = self.corpus.get_docids_by_property(key, value)
439 for s in self.corpus.docids_by_tag.values():
442 r = self.corpus.get_docids_by_exact_tag(tag)
445 raise ParseError(f"Unexpected query {tag}")
446 elif self.op is Operation.DISJUNCTION:
447 if len(evaled_operands) != 2:
448 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
449 retval.update(evaled_operands[0])
450 retval.update(evaled_operands[1])
451 elif self.op is Operation.CONJUNCTION:
452 if len(evaled_operands) != 2:
453 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
454 retval.update(evaled_operands[0])
455 retval = retval.intersection(evaled_operands[1])
456 elif self.op is Operation.INVERSION:
457 if len(evaled_operands) != 1:
458 raise ParseError("Operation.INVERSION (not) expects one operand.")
459 _ = evaled_operands[0]
460 if isinstance(_, set):
461 retval.update(self.corpus.invert_docid_set(_))
463 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
467 if __name__ == '__main__':