3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents. The corpus is held in memory for fast
11 from __future__ import annotations
14 from collections import defaultdict
15 from dataclasses import dataclass, field
16 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
19 class ParseError(Exception):
20 """An error encountered while parsing a logical search expression."""
22 def __init__(self, message: str):
24 self.message = message
29 """A class representing a searchable document."""
31 # A unique identifier for each document.
34 # A set of tag strings for this document. May be empty.
35 tags: Set[str] = field(default_factory=set)
37 # A list of key->value strings for this document. May be empty.
38 properties: List[Tuple[str, str]] = field(default_factory=list)
40 # An optional reference to something else; interpreted only by
41 # caller code, ignored here.
42 reference: Optional[Any] = None
45 class Operation(enum.Enum):
46 """A logical search query operation."""
54 def from_token(token: str):
56 "not": Operation.INVERSION,
57 "and": Operation.CONJUNCTION,
58 "or": Operation.DISJUNCTION,
60 return table.get(token, None)
62 def num_operands(self) -> Optional[int]:
64 Operation.INVERSION: 1,
65 Operation.CONJUNCTION: 2,
66 Operation.DISJUNCTION: 2,
68 return table.get(self, None)
72 """A collection of searchable documents.
75 >>> c.add_doc(Document(
77 ... tags=set(['urgent', 'important']),
79 ... ('author', 'Scott'),
80 ... ('subject', 'your anniversary')
85 >>> c.add_doc(Document(
87 ... tags=set(['important']),
89 ... ('author', 'Joe'),
90 ... ('subject', 'your performance at work')
95 >>> c.add_doc(Document(
97 ... tags=set(['urgent']),
99 ... ('author', 'Scott'),
100 ... ('subject', 'car turning in front of you')
105 >>> c.query('author:Scott and important')
111 >>> c.query('*:Scott')
115 def __init__(self) -> None:
116 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
117 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
118 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
119 self.documents_by_docid: Dict[str, Document] = {}
121 def add_doc(self, doc: Document) -> None:
122 """Add a new Document to the Corpus. Each Document must have a
123 distinct docid that will serve as its primary identifier. If
124 the same Document is added multiple times, only the most
125 recent addition is indexed. If two distinct documents with
126 the same docid are added, the latter klobbers the former in the
129 Each Document may have an optional set of tags which can be
130 used later in expressions to the query method.
132 Each Document may have an optional list of key->value tuples
133 which can be used later in expressions to the query method.
135 Document includes a user-defined "reference" field which is
136 never interpreted by this module. This is meant to allow easy
137 mapping between Documents in this corpus and external objects
141 if doc.docid in self.documents_by_docid:
142 # Handle collisions; assume that we are re-indexing the
143 # same document so remove it from the indexes before
144 # adding it back again.
145 colliding_doc = self.documents_by_docid[doc.docid]
146 assert colliding_doc.docid == doc.docid
147 del self.documents_by_docid[doc.docid]
148 for tag in colliding_doc.tags:
149 self.docids_by_tag[tag].remove(doc.docid)
150 for key, value in colliding_doc.properties:
151 self.docids_by_property[(key, value)].remove(doc.docid)
152 self.docids_with_property[key].remove(doc.docid)
154 # Index the new Document
155 assert doc.docid not in self.documents_by_docid
156 self.documents_by_docid[doc.docid] = doc
158 self.docids_by_tag[tag].add(doc.docid)
159 for key, value in doc.properties:
160 self.docids_by_property[(key, value)].add(doc.docid)
161 self.docids_with_property[key].add(doc.docid)
163 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
164 """Return the set of docids that have a particular tag."""
165 return self.docids_by_tag[tag]
167 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
168 """Return the set of docids with a tag that contains a str"""
171 for search_tag in self.docids_by_tag:
172 if tag in search_tag:
173 for docid in self.docids_by_tag[search_tag]:
177 def get_docids_with_property(self, key: str) -> Set[str]:
178 """Return the set of docids that have a particular property no matter
179 what that property's value.
182 return self.docids_with_property[key]
184 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
185 """Return the set of docids that have a particular property with a
189 return self.docids_by_property[(key, value)]
191 def invert_docid_set(self, original: Set[str]) -> Set[str]:
192 """Invert a set of docids."""
194 return {docid for docid in self.documents_by_docid if docid not in original}
196 def get_doc(self, docid: str) -> Optional[Document]:
197 """Given a docid, retrieve the previously added Document."""
199 return self.documents_by_docid.get(docid, None)
201 def query(self, query: str) -> Optional[Set[str]]:
202 """Query the corpus for documents that match a logical expression.
203 Returns a (potentially empty) set of docids for the matching
204 (previously added) documents or None on error.
208 tag1 and tag2 and not tag3
210 (tag1 or tag2) and (tag3 or tag4)
212 (tag1 and key2:value2) or (tag2 and key1:value1)
220 root = self._parse_query(query)
221 except ParseError as e:
222 print(e.message, file=sys.stderr)
226 def _parse_query(self, query: str):
227 """Internal parse helper; prefer to use query instead."""
229 parens = set(["(", ")"])
230 and_or = set(["and", "or"])
232 def operator_precedence(token: str) -> Optional[int]:
240 return table.get(token, None)
242 def is_operator(token: str) -> bool:
243 return operator_precedence(token) is not None
246 tokens = query.split()
248 # Handle ( and ) operators stuck to the ends of tokens
249 # that split() doesn't understand.
263 def evaluate(corpus: Corpus, stack: List[str]):
264 node_stack: List[Node] = []
267 if not is_operator(token):
268 node = Node(corpus, Operation.QUERY, [token])
271 operation = Operation.from_token(token)
272 operand_count = operation.num_operands()
273 if len(node_stack) < operand_count:
274 raise ParseError(f"Incorrect number of operations for {operation}")
275 for _ in range(operation.num_operands()):
276 args.append(node_stack.pop())
277 node = Node(corpus, operation, args)
278 node_stack.append(node)
283 for token in lex(query):
284 if not is_operator(token):
285 output_stack.append(token)
288 # token is an operator...
290 operator_stack.append(token)
293 while len(operator_stack) > 0:
294 pop_operator = operator_stack.pop()
295 if pop_operator != "(":
296 output_stack.append(pop_operator)
301 raise ParseError("Unbalanced parenthesis in query expression")
305 my_precedence = operator_precedence(token)
306 if my_precedence is None:
307 raise ParseError(f"Unknown operator: {token}")
308 while len(operator_stack) > 0:
309 peek_operator = operator_stack[-1]
310 if not is_operator(peek_operator) or peek_operator == "(":
312 peek_precedence = operator_precedence(peek_operator)
313 if peek_precedence is None:
314 raise ParseError("Internal error")
316 (peek_precedence < my_precedence)
317 or (peek_precedence == my_precedence)
318 and (peek_operator not in and_or)
321 output_stack.append(operator_stack.pop())
322 operator_stack.append(token)
323 while len(operator_stack) > 0:
324 token = operator_stack.pop()
326 raise ParseError("Unbalanced parenthesis in query expression")
327 output_stack.append(token)
328 return evaluate(self, output_stack)
332 """A query AST node."""
338 operands: Sequence[Union[Node, str]],
342 self.operands = operands
344 def eval(self) -> Set[str]:
345 """Evaluate this node."""
347 evaled_operands: List[Union[Set[str], str]] = []
348 for operand in self.operands:
349 if isinstance(operand, Node):
350 evaled_operands.append(operand.eval())
351 elif isinstance(operand, str):
352 evaled_operands.append(operand)
354 raise ParseError(f"Unexpected operand: {operand}")
357 if self.op is Operation.QUERY:
358 for tag in evaled_operands:
359 if isinstance(tag, str):
362 key, value = tag.split(":")
363 except ValueError as v:
364 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
368 for kv, s in self.corpus.docids_by_property.items():
369 if value in ('*', kv[1]):
373 r = self.corpus.get_docids_with_property(key)
375 r = self.corpus.get_docids_by_property(key, value)
379 for s in self.corpus.docids_by_tag.values():
382 r = self.corpus.get_docids_by_exact_tag(tag)
385 raise ParseError(f"Unexpected query {tag}")
386 elif self.op is Operation.DISJUNCTION:
387 if len(evaled_operands) != 2:
388 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
389 retval.update(evaled_operands[0])
390 retval.update(evaled_operands[1])
391 elif self.op is Operation.CONJUNCTION:
392 if len(evaled_operands) != 2:
393 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
394 retval.update(evaled_operands[0])
395 retval = retval.intersection(evaled_operands[1])
396 elif self.op is Operation.INVERSION:
397 if len(evaled_operands) != 1:
398 raise ParseError("Operation.INVERSION (not) expects one operand.")
399 _ = evaled_operands[0]
400 if isinstance(_, set):
401 retval.update(self.corpus.invert_docid_set(_))
403 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
407 if __name__ == '__main__':