3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents. The corpus and index are held in memory.
7 The query language contains AND, OR, NOT, and parenthesis to support
8 flexible search semantics.
11 from __future__ import annotations
15 from collections import defaultdict
16 from dataclasses import dataclass, field
17 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
20 class ParseError(Exception):
21 """An error encountered while parsing a logical search expression."""
23 def __init__(self, message: str):
25 self.message = message
30 """A class representing a searchable document."""
33 """A unique identifier for each document -- must be provided
34 by the caller. See :meth:`python_modules.id_generator.get` or
35 :meth:`python_modules.string_utils.generate_uuid` for potential
38 tags: Set[str] = field(default_factory=set)
39 """A set of tag strings for this document. May be empty. Tags
40 are simply text labels that are associated with a document and
41 may be used to search for it later.
44 properties: List[Tuple[str, str]] = field(default_factory=list)
45 """A list of key->value strings for this document. May be empty.
46 Properties are more flexible tags that have both a label and a
47 value. e.g. "category:mystery" or "author:smith"."""
49 reference: Optional[Any] = None
50 """An optional reference to something else for convenience;
51 interpreted only by caller code, ignored here.
55 class Operation(enum.Enum):
56 """A logical search query operation."""
64 def from_token(token: str):
66 "not": Operation.INVERSION,
67 "and": Operation.CONJUNCTION,
68 "or": Operation.DISJUNCTION,
70 return table.get(token, None)
72 def num_operands(self) -> Optional[int]:
74 Operation.INVERSION: 1,
75 Operation.CONJUNCTION: 2,
76 Operation.DISJUNCTION: 2,
78 return table.get(self, None)
82 """A collection of searchable documents. The caller can
83 add documents to it (or edit existing docs) via :meth:`add_doc`,
84 retrieve a document given its docid via :meth:`get_doc`, and
85 perform various lookups of documents. The most interesting
86 lookup is implemented in :meth:`query`.
89 >>> c.add_doc(Document(
91 ... tags=set(['urgent', 'important']),
93 ... ('author', 'Scott'),
94 ... ('subject', 'your anniversary')
99 >>> c.add_doc(Document(
101 ... tags=set(['important']),
103 ... ('author', 'Joe'),
104 ... ('subject', 'your performance at work')
109 >>> c.add_doc(Document(
111 ... tags=set(['urgent']),
113 ... ('author', 'Scott'),
114 ... ('subject', 'car turning in front of you')
119 >>> c.query('author:Scott and important')
125 >>> c.query('*:Scott')
129 def __init__(self) -> None:
130 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
131 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
132 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
133 self.documents_by_docid: Dict[str, Document] = {}
135 def add_doc(self, doc: Document) -> None:
136 """Add a new Document to the Corpus. Each Document must have a
137 distinct docid that will serve as its primary identifier. If
138 the same Document is added multiple times, only the most
139 recent addition is indexed. If two distinct documents with
140 the same docid are added, the latter klobbers the former in
141 the indexes. See :meth:`python_modules.id_generator.get` or
142 :meth:`python_modules.string_utils.generate_uuid` for potential
145 Each Document may have an optional set of tags which can be
146 used later in expressions to the query method. These are simple
149 Each Document may have an optional list of key->value tuples
150 which can be used later in expressions to the query method.
152 Document includes a user-defined "reference" field which is
153 never interpreted by this module. This is meant to allow easy
154 mapping between Documents in this corpus and external objects
158 doc: the document to add or edit
161 if doc.docid in self.documents_by_docid:
162 # Handle collisions; assume that we are re-indexing the
163 # same document so remove it from the indexes before
164 # adding it back again.
165 colliding_doc = self.documents_by_docid[doc.docid]
166 assert colliding_doc.docid == doc.docid
167 del self.documents_by_docid[doc.docid]
168 for tag in colliding_doc.tags:
169 self.docids_by_tag[tag].remove(doc.docid)
170 for key, value in colliding_doc.properties:
171 self.docids_by_property[(key, value)].remove(doc.docid)
172 self.docids_with_property[key].remove(doc.docid)
174 # Index the new Document
175 assert doc.docid not in self.documents_by_docid
176 self.documents_by_docid[doc.docid] = doc
178 self.docids_by_tag[tag].add(doc.docid)
179 for key, value in doc.properties:
180 self.docids_by_property[(key, value)].add(doc.docid)
181 self.docids_with_property[key].add(doc.docid)
183 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
184 """Return the set of docids that have a particular tag.
187 tag: the tag for which to search
190 A set containing docids with the provided tag which
192 return self.docids_by_tag[tag]
194 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
195 """Return the set of docids with a tag that contains a str.
198 tag: the tag pattern for which to search
201 A set containing docids with tags that match the pattern
202 provided. e.g., if the arg was "foo" tags "football", "foobar",
203 and "food" all match.
206 for search_tag in self.docids_by_tag:
207 if tag in search_tag:
208 for docid in self.docids_by_tag[search_tag]:
212 def get_docids_with_property(self, key: str) -> Set[str]:
213 """Return the set of docids that have a particular property no matter
214 what that property's value.
217 key: the key value to search for.
220 A set of docids that contain the key (no matter what value)
223 return self.docids_with_property[key]
225 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
226 """Return the set of docids that have a particular property with a
230 key: the key to search for
231 value: the value that key must have in order to match a doc.
234 A set of docids that contain key with value which may be empty.
236 return self.docids_by_property[(key, value)]
238 def invert_docid_set(self, original: Set[str]) -> Set[str]:
239 """Invert a set of docids."""
240 return {docid for docid in self.documents_by_docid if docid not in original}
242 def get_doc(self, docid: str) -> Optional[Document]:
243 """Given a docid, retrieve the previously added Document.
246 docid: the docid to retrieve
249 The Document with docid or None to indicate no match.
251 return self.documents_by_docid.get(docid, None)
253 def query(self, query: str) -> Optional[Set[str]]:
254 """Query the corpus for documents that match a logical expression.
257 query: the logical query expressed using a simple language
258 that understands conjunction (and operator), disjunction
259 (or operator) and inversion (not operator) as well as
260 parenthesis. Here are some legal sample queries::
262 tag1 and tag2 and not tag3
264 (tag1 or tag2) and (tag3 or tag4)
266 (tag1 and key2:value2) or (tag2 and key1:value1)
273 A (potentially empty) set of docids for the matching
274 (previously added) documents or None on error.
278 root = self._parse_query(query)
279 except ParseError as e:
280 print(e.message, file=sys.stderr)
284 def _parse_query(self, query: str):
285 """Internal parse helper; prefer to use query instead."""
287 parens = set(["(", ")"])
288 and_or = set(["and", "or"])
290 def operator_precedence(token: str) -> Optional[int]:
298 return table.get(token, None)
300 def is_operator(token: str) -> bool:
301 return operator_precedence(token) is not None
304 tokens = query.split()
306 # Handle ( and ) operators stuck to the ends of tokens
307 # that split() doesn't understand.
321 def evaluate(corpus: Corpus, stack: List[str]):
322 node_stack: List[Node] = []
325 if not is_operator(token):
326 node = Node(corpus, Operation.QUERY, [token])
329 operation = Operation.from_token(token)
330 operand_count = operation.num_operands()
331 if len(node_stack) < operand_count:
333 f"Incorrect number of operations for {operation}"
335 for _ in range(operation.num_operands()):
336 args.append(node_stack.pop())
337 node = Node(corpus, operation, args)
338 node_stack.append(node)
343 for token in lex(query):
344 if not is_operator(token):
345 output_stack.append(token)
348 # token is an operator...
350 operator_stack.append(token)
353 while len(operator_stack) > 0:
354 pop_operator = operator_stack.pop()
355 if pop_operator != "(":
356 output_stack.append(pop_operator)
361 raise ParseError("Unbalanced parenthesis in query expression")
365 my_precedence = operator_precedence(token)
366 if my_precedence is None:
367 raise ParseError(f"Unknown operator: {token}")
368 while len(operator_stack) > 0:
369 peek_operator = operator_stack[-1]
370 if not is_operator(peek_operator) or peek_operator == "(":
372 peek_precedence = operator_precedence(peek_operator)
373 if peek_precedence is None:
374 raise ParseError("Internal error")
376 (peek_precedence < my_precedence)
377 or (peek_precedence == my_precedence)
378 and (peek_operator not in and_or)
381 output_stack.append(operator_stack.pop())
382 operator_stack.append(token)
383 while len(operator_stack) > 0:
384 token = operator_stack.pop()
386 raise ParseError("Unbalanced parenthesis in query expression")
387 output_stack.append(token)
388 return evaluate(self, output_stack)
392 """A query AST node."""
398 operands: Sequence[Union[Node, str]],
402 self.operands = operands
404 def eval(self) -> Set[str]:
405 """Evaluate this node."""
407 evaled_operands: List[Union[Set[str], str]] = []
408 for operand in self.operands:
409 if isinstance(operand, Node):
410 evaled_operands.append(operand.eval())
411 elif isinstance(operand, str):
412 evaled_operands.append(operand)
414 raise ParseError(f"Unexpected operand: {operand}")
417 if self.op is Operation.QUERY:
418 for tag in evaled_operands:
419 if isinstance(tag, str):
422 key, value = tag.split(":")
423 except ValueError as v:
425 f'Invalid key:value syntax at "{tag}"'
430 for kv, s in self.corpus.docids_by_property.items():
431 if value in ('*', kv[1]):
435 r = self.corpus.get_docids_with_property(key)
437 r = self.corpus.get_docids_by_property(key, value)
441 for s in self.corpus.docids_by_tag.values():
444 r = self.corpus.get_docids_by_exact_tag(tag)
447 raise ParseError(f"Unexpected query {tag}")
448 elif self.op is Operation.DISJUNCTION:
449 if len(evaled_operands) != 2:
450 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
451 retval.update(evaled_operands[0])
452 retval.update(evaled_operands[1])
453 elif self.op is Operation.CONJUNCTION:
454 if len(evaled_operands) != 2:
455 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
456 retval.update(evaled_operands[0])
457 retval = retval.intersection(evaled_operands[1])
458 elif self.op is Operation.INVERSION:
459 if len(evaled_operands) != 1:
460 raise ParseError("Operation.INVERSION (not) expects one operand.")
461 _ = evaled_operands[0]
462 if isinstance(_, set):
463 retval.update(self.corpus.invert_docid_set(_))
465 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
469 if __name__ == '__main__':