3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module concerned with the creation of and searching of a
6 corpus of documents. The corpus is held in memory for fast
11 from __future__ import annotations
14 from collections import defaultdict
15 from dataclasses import dataclass, field
16 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
19 class ParseError(Exception):
20 """An error encountered while parsing a logical search expression."""
22 def __init__(self, message: str):
24 self.message = message
29 """A class representing a searchable document."""
31 # A unique identifier for each document.
34 # A set of tag strings for this document. May be empty.
35 tags: Set[str] = field(default_factory=set)
37 # A list of key->value strings for this document. May be empty.
38 properties: List[Tuple[str, str]] = field(default_factory=list)
40 # An optional reference to something else; interpreted only by
41 # caller code, ignored here.
42 reference: Optional[Any] = None
45 class Operation(enum.Enum):
46 """A logical search query operation."""
54 def from_token(token: str):
56 "not": Operation.INVERSION,
57 "and": Operation.CONJUNCTION,
58 "or": Operation.DISJUNCTION,
60 return table.get(token, None)
62 def num_operands(self) -> Optional[int]:
64 Operation.INVERSION: 1,
65 Operation.CONJUNCTION: 2,
66 Operation.DISJUNCTION: 2,
68 return table.get(self, None)
72 """A collection of searchable documents.
75 >>> c.add_doc(Document(
77 ... tags=set(['urgent', 'important']),
79 ... ('author', 'Scott'),
80 ... ('subject', 'your anniversary')
85 >>> c.add_doc(Document(
87 ... tags=set(['important']),
89 ... ('author', 'Joe'),
90 ... ('subject', 'your performance at work')
95 >>> c.add_doc(Document(
97 ... tags=set(['urgent']),
99 ... ('author', 'Scott'),
100 ... ('subject', 'car turning in front of you')
105 >>> c.query('author:Scott and important')
113 def __init__(self) -> None:
114 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
115 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
116 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
117 self.documents_by_docid: Dict[str, Document] = {}
119 def add_doc(self, doc: Document) -> None:
120 """Add a new Document to the Corpus. Each Document must have a
121 distinct docid that will serve as its primary identifier. If
122 the same Document is added multiple times, only the most
123 recent addition is indexed. If two distinct documents with
124 the same docid are added, the latter klobbers the former in the
127 Each Document may have an optional set of tags which can be
128 used later in expressions to the query method.
130 Each Document may have an optional list of key->value tuples
131 which can be used later in expressions to the query method.
133 Document includes a user-defined "reference" field which is
134 never interpreted by this module. This is meant to allow easy
135 mapping between Documents in this corpus and external objects
139 if doc.docid in self.documents_by_docid:
140 # Handle collisions; assume that we are re-indexing the
141 # same document so remove it from the indexes before
142 # adding it back again.
143 colliding_doc = self.documents_by_docid[doc.docid]
144 assert colliding_doc.docid == doc.docid
145 del self.documents_by_docid[doc.docid]
146 for tag in colliding_doc.tags:
147 self.docids_by_tag[tag].remove(doc.docid)
148 for key, value in colliding_doc.properties:
149 self.docids_by_property[(key, value)].remove(doc.docid)
150 self.docids_with_property[key].remove(doc.docid)
152 # Index the new Document
153 assert doc.docid not in self.documents_by_docid
154 self.documents_by_docid[doc.docid] = doc
156 self.docids_by_tag[tag].add(doc.docid)
157 for key, value in doc.properties:
158 self.docids_by_property[(key, value)].add(doc.docid)
159 self.docids_with_property[key].add(doc.docid)
161 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
162 """Return the set of docids that have a particular tag."""
163 return self.docids_by_tag[tag]
165 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
166 """Return the set of docids with a tag that contains a str"""
169 for search_tag in self.docids_by_tag:
170 if tag in search_tag:
171 for docid in self.docids_by_tag[search_tag]:
175 def get_docids_with_property(self, key: str) -> Set[str]:
176 """Return the set of docids that have a particular property no matter
177 what that property's value.
180 return self.docids_with_property[key]
182 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
183 """Return the set of docids that have a particular property with a
187 return self.docids_by_property[(key, value)]
189 def invert_docid_set(self, original: Set[str]) -> Set[str]:
190 """Invert a set of docids."""
192 return {docid for docid in self.documents_by_docid if docid not in original}
194 def get_doc(self, docid: str) -> Optional[Document]:
195 """Given a docid, retrieve the previously added Document."""
197 return self.documents_by_docid.get(docid, None)
199 def query(self, query: str) -> Optional[Set[str]]:
200 """Query the corpus for documents that match a logical expression.
201 Returns a (potentially empty) set of docids for the matching
202 (previously added) documents or None on error.
206 tag1 and tag2 and not tag3
208 (tag1 or tag2) and (tag3 or tag4)
210 (tag1 and key2:value2) or (tag2 and key1:value1)
218 root = self._parse_query(query)
219 except ParseError as e:
220 print(e.message, file=sys.stderr)
224 def _parse_query(self, query: str):
225 """Internal parse helper; prefer to use query instead."""
227 parens = set(["(", ")"])
228 and_or = set(["and", "or"])
230 def operator_precedence(token: str) -> Optional[int]:
238 return table.get(token, None)
240 def is_operator(token: str) -> bool:
241 return operator_precedence(token) is not None
244 tokens = query.split()
246 # Handle ( and ) operators stuck to the ends of tokens
247 # that split() doesn't understand.
261 def evaluate(corpus: Corpus, stack: List[str]):
262 node_stack: List[Node] = []
265 if not is_operator(token):
266 node = Node(corpus, Operation.QUERY, [token])
269 operation = Operation.from_token(token)
270 operand_count = operation.num_operands()
271 if len(node_stack) < operand_count:
272 raise ParseError(f"Incorrect number of operations for {operation}")
273 for _ in range(operation.num_operands()):
274 args.append(node_stack.pop())
275 node = Node(corpus, operation, args)
276 node_stack.append(node)
281 for token in lex(query):
282 if not is_operator(token):
283 output_stack.append(token)
286 # token is an operator...
288 operator_stack.append(token)
291 while len(operator_stack) > 0:
292 pop_operator = operator_stack.pop()
293 if pop_operator != "(":
294 output_stack.append(pop_operator)
299 raise ParseError("Unbalanced parenthesis in query expression")
303 my_precedence = operator_precedence(token)
304 if my_precedence is None:
305 raise ParseError(f"Unknown operator: {token}")
306 while len(operator_stack) > 0:
307 peek_operator = operator_stack[-1]
308 if not is_operator(peek_operator) or peek_operator == "(":
310 peek_precedence = operator_precedence(peek_operator)
311 if peek_precedence is None:
312 raise ParseError("Internal error")
314 (peek_precedence < my_precedence)
315 or (peek_precedence == my_precedence)
316 and (peek_operator not in and_or)
319 output_stack.append(operator_stack.pop())
320 operator_stack.append(token)
321 while len(operator_stack) > 0:
322 token = operator_stack.pop()
324 raise ParseError("Unbalanced parenthesis in query expression")
325 output_stack.append(token)
326 return evaluate(self, output_stack)
330 """A query AST node."""
336 operands: Sequence[Union[Node, str]],
340 self.operands = operands
342 def eval(self) -> Set[str]:
343 """Evaluate this node."""
345 evaled_operands: List[Union[Set[str], str]] = []
346 for operand in self.operands:
347 if isinstance(operand, Node):
348 evaled_operands.append(operand.eval())
349 elif isinstance(operand, str):
350 evaled_operands.append(operand)
352 raise ParseError(f"Unexpected operand: {operand}")
355 if self.op is Operation.QUERY:
356 for tag in evaled_operands:
357 if isinstance(tag, str):
360 key, value = tag.split(":")
361 except ValueError as v:
362 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
365 for s in self.corpus.docids_by_tag.values():
369 r = self.corpus.get_docids_with_property(key)
371 r = self.corpus.get_docids_by_property(key, value)
375 for s in self.corpus.docids_by_tag.values():
378 r = self.corpus.get_docids_by_exact_tag(tag)
381 raise ParseError(f"Unexpected query {tag}")
382 elif self.op is Operation.DISJUNCTION:
383 if len(evaled_operands) != 2:
384 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
385 retval.update(evaled_operands[0])
386 retval.update(evaled_operands[1])
387 elif self.op is Operation.CONJUNCTION:
388 if len(evaled_operands) != 2:
389 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
390 retval.update(evaled_operands[0])
391 retval = retval.intersection(evaled_operands[1])
392 elif self.op is Operation.INVERSION:
393 if len(evaled_operands) != 1:
394 raise ParseError("Operation.INVERSION (not) expects one operand.")
395 _ = evaled_operands[0]
396 if isinstance(_, set):
397 retval.update(self.corpus.invert_docid_set(_))
399 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
403 if __name__ == '__main__':