3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents. The corpus is held in memory for fast
11 from __future__ import annotations
14 from collections import defaultdict
15 from dataclasses import dataclass, field
16 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
19 class ParseError(Exception):
20 """An error encountered while parsing a logical search expression."""
22 def __init__(self, message: str):
24 self.message = message
29 """A class representing a searchable document."""
31 # A unique identifier for each document.
34 # A set of tag strings for this document. May be empty.
35 tags: Set[str] = field(default_factory=set)
37 # A list of key->value strings for this document. May be empty.
38 properties: List[Tuple[str, str]] = field(default_factory=list)
40 # An optional reference to something else; interpreted only by
41 # caller code, ignored here.
42 reference: Optional[Any] = None
45 class Operation(enum.Enum):
46 """A logical search query operation."""
54 def from_token(token: str):
56 "not": Operation.INVERSION,
57 "and": Operation.CONJUNCTION,
58 "or": Operation.DISJUNCTION,
60 return table.get(token, None)
62 def num_operands(self) -> Optional[int]:
64 Operation.INVERSION: 1,
65 Operation.CONJUNCTION: 2,
66 Operation.DISJUNCTION: 2,
68 return table.get(self, None)
72 """A collection of searchable documents.
75 >>> c.add_doc(Document(
77 ... tags=set(['urgent', 'important']),
79 ... ('author', 'Scott'),
80 ... ('subject', 'your anniversary')
85 >>> c.add_doc(Document(
87 ... tags=set(['important']),
89 ... ('author', 'Joe'),
90 ... ('subject', 'your performance at work')
95 >>> c.add_doc(Document(
97 ... tags=set(['urgent']),
99 ... ('author', 'Scott'),
100 ... ('subject', 'car turning in front of you')
105 >>> c.query('author:Scott and important')
109 def __init__(self) -> None:
110 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
111 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
112 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
113 self.documents_by_docid: Dict[str, Document] = {}
115 def add_doc(self, doc: Document) -> None:
116 """Add a new Document to the Corpus. Each Document must have a
117 distinct docid that will serve as its primary identifier. If
118 the same Document is added multiple times, only the most
119 recent addition is indexed. If two distinct documents with
120 the same docid are added, the latter klobbers the former in the
123 Each Document may have an optional set of tags which can be
124 used later in expressions to the query method.
126 Each Document may have an optional list of key->value tuples
127 which can be used later in expressions to the query method.
129 Document includes a user-defined "reference" field which is
130 never interpreted by this module. This is meant to allow easy
131 mapping between Documents in this corpus and external objects
135 if doc.docid in self.documents_by_docid:
136 # Handle collisions; assume that we are re-indexing the
137 # same document so remove it from the indexes before
138 # adding it back again.
139 colliding_doc = self.documents_by_docid[doc.docid]
140 assert colliding_doc.docid == doc.docid
141 del self.documents_by_docid[doc.docid]
142 for tag in colliding_doc.tags:
143 self.docids_by_tag[tag].remove(doc.docid)
144 for key, value in colliding_doc.properties:
145 self.docids_by_property[(key, value)].remove(doc.docid)
146 self.docids_with_property[key].remove(doc.docid)
148 # Index the new Document
149 assert doc.docid not in self.documents_by_docid
150 self.documents_by_docid[doc.docid] = doc
152 self.docids_by_tag[tag].add(doc.docid)
153 for key, value in doc.properties:
154 self.docids_by_property[(key, value)].add(doc.docid)
155 self.docids_with_property[key].add(doc.docid)
157 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
158 """Return the set of docids that have a particular tag."""
160 return self.docids_by_tag[tag]
162 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
163 """Return the set of docids with a tag that contains a str"""
166 for search_tag in self.docids_by_tag:
167 if tag in search_tag:
168 for docid in self.docids_by_tag[search_tag]:
172 def get_docids_with_property(self, key: str) -> Set[str]:
173 """Return the set of docids that have a particular property no matter
174 what that property's value.
177 return self.docids_with_property[key]
179 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
180 """Return the set of docids that have a particular property with a
184 return self.docids_by_property[(key, value)]
186 def invert_docid_set(self, original: Set[str]) -> Set[str]:
187 """Invert a set of docids."""
189 return {docid for docid in self.documents_by_docid if docid not in original}
191 def get_doc(self, docid: str) -> Optional[Document]:
192 """Given a docid, retrieve the previously added Document."""
194 return self.documents_by_docid.get(docid, None)
196 def query(self, query: str) -> Optional[Set[str]]:
197 """Query the corpus for documents that match a logical expression.
198 Returns a (potentially empty) set of docids for the matching
199 (previously added) documents or None on error.
203 tag1 and tag2 and not tag3
205 (tag1 or tag2) and (tag3 or tag4)
207 (tag1 and key2:value2) or (tag2 and key1:value1)
215 root = self._parse_query(query)
216 except ParseError as e:
217 print(e.message, file=sys.stderr)
221 def _parse_query(self, query: str):
222 """Internal parse helper; prefer to use query instead."""
224 parens = set(["(", ")"])
225 and_or = set(["and", "or"])
227 def operator_precedence(token: str) -> Optional[int]:
235 return table.get(token, None)
237 def is_operator(token: str) -> bool:
238 return operator_precedence(token) is not None
241 tokens = query.split()
243 # Handle ( and ) operators stuck to the ends of tokens
244 # that split() doesn't understand.
258 def evaluate(corpus: Corpus, stack: List[str]):
259 node_stack: List[Node] = []
262 if not is_operator(token):
263 node = Node(corpus, Operation.QUERY, [token])
266 operation = Operation.from_token(token)
267 operand_count = operation.num_operands()
268 if len(node_stack) < operand_count:
269 raise ParseError(f"Incorrect number of operations for {operation}")
270 for _ in range(operation.num_operands()):
271 args.append(node_stack.pop())
272 node = Node(corpus, operation, args)
273 node_stack.append(node)
278 for token in lex(query):
279 if not is_operator(token):
280 output_stack.append(token)
283 # token is an operator...
285 operator_stack.append(token)
288 while len(operator_stack) > 0:
289 pop_operator = operator_stack.pop()
290 if pop_operator != "(":
291 output_stack.append(pop_operator)
296 raise ParseError("Unbalanced parenthesis in query expression")
300 my_precedence = operator_precedence(token)
301 if my_precedence is None:
302 raise ParseError(f"Unknown operator: {token}")
303 while len(operator_stack) > 0:
304 peek_operator = operator_stack[-1]
305 if not is_operator(peek_operator) or peek_operator == "(":
307 peek_precedence = operator_precedence(peek_operator)
308 if peek_precedence is None:
309 raise ParseError("Internal error")
311 (peek_precedence < my_precedence)
312 or (peek_precedence == my_precedence)
313 and (peek_operator not in and_or)
316 output_stack.append(operator_stack.pop())
317 operator_stack.append(token)
318 while len(operator_stack) > 0:
319 token = operator_stack.pop()
321 raise ParseError("Unbalanced parenthesis in query expression")
322 output_stack.append(token)
323 return evaluate(self, output_stack)
327 """A query AST node."""
333 operands: Sequence[Union[Node, str]],
337 self.operands = operands
339 def eval(self) -> Set[str]:
340 """Evaluate this node."""
342 evaled_operands: List[Union[Set[str], str]] = []
343 for operand in self.operands:
344 if isinstance(operand, Node):
345 evaled_operands.append(operand.eval())
346 elif isinstance(operand, str):
347 evaled_operands.append(operand)
349 raise ParseError(f"Unexpected operand: {operand}")
352 if self.op is Operation.QUERY:
353 for tag in evaled_operands:
354 if isinstance(tag, str):
357 key, value = tag.split(":")
358 except ValueError as v:
359 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
361 r = self.corpus.get_docids_with_property(key)
363 r = self.corpus.get_docids_by_property(key, value)
365 r = self.corpus.get_docids_by_exact_tag(tag)
368 raise ParseError(f"Unexpected query {tag}")
369 elif self.op is Operation.DISJUNCTION:
370 if len(evaled_operands) != 2:
371 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
372 retval.update(evaled_operands[0])
373 retval.update(evaled_operands[1])
374 elif self.op is Operation.CONJUNCTION:
375 if len(evaled_operands) != 2:
376 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
377 retval.update(evaled_operands[0])
378 retval = retval.intersection(evaled_operands[1])
379 elif self.op is Operation.INVERSION:
380 if len(evaled_operands) != 1:
381 raise ParseError("Operation.INVERSION (not) expects one operand.")
382 _ = evaled_operands[0]
383 if isinstance(_, set):
384 retval.update(self.corpus.invert_docid_set(_))
386 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
390 if __name__ == '__main__':