3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents. The corpus is held in memory for fast
11 from __future__ import annotations
14 from collections import defaultdict
15 from dataclasses import dataclass, field
16 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
19 class ParseError(Exception):
20 """An error encountered while parsing a logical search expression."""
22 def __init__(self, message: str):
24 self.message = message
29 """A class representing a searchable document."""
31 # A unique identifier for each document.
34 # A set of tag strings for this document. May be empty.
35 tags: Set[str] = field(default_factory=set)
37 # A list of key->value strings for this document. May be empty.
38 properties: List[Tuple[str, str]] = field(default_factory=list)
40 # An optional reference to something else; interpreted only by
41 # caller code, ignored here.
42 reference: Optional[Any] = None
45 class Operation(enum.Enum):
46 """A logical search query operation."""
54 def from_token(token: str):
56 "not": Operation.INVERSION,
57 "and": Operation.CONJUNCTION,
58 "or": Operation.DISJUNCTION,
60 return table.get(token, None)
62 def num_operands(self) -> Optional[int]:
64 Operation.INVERSION: 1,
65 Operation.CONJUNCTION: 2,
66 Operation.DISJUNCTION: 2,
68 return table.get(self, None)
72 """A collection of searchable documents.
75 >>> c.add_doc(Document(
77 ... tags=set(['urgent', 'important']),
79 ... ('author', 'Scott'),
80 ... ('subject', 'your anniversary')
85 >>> c.add_doc(Document(
87 ... tags=set(['important']),
89 ... ('author', 'Joe'),
90 ... ('subject', 'your performance at work')
95 >>> c.add_doc(Document(
97 ... tags=set(['urgent']),
99 ... ('author', 'Scott'),
100 ... ('subject', 'car turning in front of you')
105 >>> c.query('author:Scott and important')
111 def __init__(self) -> None:
112 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
113 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
114 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
115 self.documents_by_docid: Dict[str, Document] = {}
117 def add_doc(self, doc: Document) -> None:
118 """Add a new Document to the Corpus. Each Document must have a
119 distinct docid that will serve as its primary identifier. If
120 the same Document is added multiple times, only the most
121 recent addition is indexed. If two distinct documents with
122 the same docid are added, the latter klobbers the former in the
125 Each Document may have an optional set of tags which can be
126 used later in expressions to the query method.
128 Each Document may have an optional list of key->value tuples
129 which can be used later in expressions to the query method.
131 Document includes a user-defined "reference" field which is
132 never interpreted by this module. This is meant to allow easy
133 mapping between Documents in this corpus and external objects
137 if doc.docid in self.documents_by_docid:
138 # Handle collisions; assume that we are re-indexing the
139 # same document so remove it from the indexes before
140 # adding it back again.
141 colliding_doc = self.documents_by_docid[doc.docid]
142 assert colliding_doc.docid == doc.docid
143 del self.documents_by_docid[doc.docid]
144 for tag in colliding_doc.tags:
145 self.docids_by_tag[tag].remove(doc.docid)
146 for key, value in colliding_doc.properties:
147 self.docids_by_property[(key, value)].remove(doc.docid)
148 self.docids_with_property[key].remove(doc.docid)
150 # Index the new Document
151 assert doc.docid not in self.documents_by_docid
152 self.documents_by_docid[doc.docid] = doc
154 self.docids_by_tag[tag].add(doc.docid)
155 for key, value in doc.properties:
156 self.docids_by_property[(key, value)].add(doc.docid)
157 self.docids_with_property[key].add(doc.docid)
159 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
160 """Return the set of docids that have a particular tag."""
162 return self.docids_by_tag[tag]
164 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
165 """Return the set of docids with a tag that contains a str"""
168 for search_tag in self.docids_by_tag:
169 if tag in search_tag:
170 for docid in self.docids_by_tag[search_tag]:
174 def get_docids_with_property(self, key: str) -> Set[str]:
175 """Return the set of docids that have a particular property no matter
176 what that property's value.
179 return self.docids_with_property[key]
181 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
182 """Return the set of docids that have a particular property with a
186 return self.docids_by_property[(key, value)]
188 def invert_docid_set(self, original: Set[str]) -> Set[str]:
189 """Invert a set of docids."""
191 return {docid for docid in self.documents_by_docid if docid not in original}
193 def get_doc(self, docid: str) -> Optional[Document]:
194 """Given a docid, retrieve the previously added Document."""
196 return self.documents_by_docid.get(docid, None)
198 def query(self, query: str) -> Optional[Set[str]]:
199 """Query the corpus for documents that match a logical expression.
200 Returns a (potentially empty) set of docids for the matching
201 (previously added) documents or None on error.
205 tag1 and tag2 and not tag3
207 (tag1 or tag2) and (tag3 or tag4)
209 (tag1 and key2:value2) or (tag2 and key1:value1)
217 return set(self.documents_by_docid.keys())
219 root = self._parse_query(query)
220 except ParseError as e:
221 print(e.message, file=sys.stderr)
225 def _parse_query(self, query: str):
226 """Internal parse helper; prefer to use query instead."""
228 parens = set(["(", ")"])
229 and_or = set(["and", "or"])
231 def operator_precedence(token: str) -> Optional[int]:
239 return table.get(token, None)
241 def is_operator(token: str) -> bool:
242 return operator_precedence(token) is not None
245 tokens = query.split()
247 # Handle ( and ) operators stuck to the ends of tokens
248 # that split() doesn't understand.
262 def evaluate(corpus: Corpus, stack: List[str]):
263 node_stack: List[Node] = []
266 if not is_operator(token):
267 node = Node(corpus, Operation.QUERY, [token])
270 operation = Operation.from_token(token)
271 operand_count = operation.num_operands()
272 if len(node_stack) < operand_count:
273 raise ParseError(f"Incorrect number of operations for {operation}")
274 for _ in range(operation.num_operands()):
275 args.append(node_stack.pop())
276 node = Node(corpus, operation, args)
277 node_stack.append(node)
282 for token in lex(query):
283 if not is_operator(token):
284 output_stack.append(token)
287 # token is an operator...
289 operator_stack.append(token)
292 while len(operator_stack) > 0:
293 pop_operator = operator_stack.pop()
294 if pop_operator != "(":
295 output_stack.append(pop_operator)
300 raise ParseError("Unbalanced parenthesis in query expression")
304 my_precedence = operator_precedence(token)
305 if my_precedence is None:
306 raise ParseError(f"Unknown operator: {token}")
307 while len(operator_stack) > 0:
308 peek_operator = operator_stack[-1]
309 if not is_operator(peek_operator) or peek_operator == "(":
311 peek_precedence = operator_precedence(peek_operator)
312 if peek_precedence is None:
313 raise ParseError("Internal error")
315 (peek_precedence < my_precedence)
316 or (peek_precedence == my_precedence)
317 and (peek_operator not in and_or)
320 output_stack.append(operator_stack.pop())
321 operator_stack.append(token)
322 while len(operator_stack) > 0:
323 token = operator_stack.pop()
325 raise ParseError("Unbalanced parenthesis in query expression")
326 output_stack.append(token)
327 return evaluate(self, output_stack)
331 """A query AST node."""
337 operands: Sequence[Union[Node, str]],
341 self.operands = operands
343 def eval(self) -> Set[str]:
344 """Evaluate this node."""
346 evaled_operands: List[Union[Set[str], str]] = []
347 for operand in self.operands:
348 if isinstance(operand, Node):
349 evaled_operands.append(operand.eval())
350 elif isinstance(operand, str):
351 evaled_operands.append(operand)
353 raise ParseError(f"Unexpected operand: {operand}")
356 if self.op is Operation.QUERY:
357 for tag in evaled_operands:
358 if isinstance(tag, str):
361 key, value = tag.split(":")
362 except ValueError as v:
363 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
365 r = self.corpus.get_docids_with_property(key)
367 r = self.corpus.get_docids_by_property(key, value)
369 r = self.corpus.get_docids_by_exact_tag(tag)
372 raise ParseError(f"Unexpected query {tag}")
373 elif self.op is Operation.DISJUNCTION:
374 if len(evaled_operands) != 2:
375 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
376 retval.update(evaled_operands[0])
377 retval.update(evaled_operands[1])
378 elif self.op is Operation.CONJUNCTION:
379 if len(evaled_operands) != 2:
380 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
381 retval.update(evaled_operands[0])
382 retval = retval.intersection(evaled_operands[1])
383 elif self.op is Operation.INVERSION:
384 if len(evaled_operands) != 1:
385 raise ParseError("Operation.INVERSION (not) expects one operand.")
386 _ = evaled_operands[0]
387 if isinstance(_, set):
388 retval.update(self.corpus.invert_docid_set(_))
390 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
394 if __name__ == '__main__':