3 """This is a module concerned with the creation of and searching of a
4 corpus of documents. The corpus is held in memory for fast
7 from __future__ import annotations
10 from collections import defaultdict
11 from dataclasses import dataclass, field
12 from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
15 class ParseError(Exception):
16 """An error encountered while parsing a logical search expression."""
18 def __init__(self, message: str):
20 self.message = message
25 """A class representing a searchable document."""
27 # A unique identifier for each document.
30 # A set of tag strings for this document. May be empty.
31 tags: Set[str] = field(default_factory=set)
33 # A list of key->value strings for this document. May be empty.
34 properties: List[Tuple[str, str]] = field(default_factory=list)
36 # An optional reference to something else; interpreted only by
37 # caller code, ignored here.
38 reference: Optional[Any] = None
41 class Operation(enum.Enum):
42 """A logical search query operation."""
50 def from_token(token: str):
52 "not": Operation.INVERSION,
53 "and": Operation.CONJUNCTION,
54 "or": Operation.DISJUNCTION,
56 return table.get(token, None)
58 def num_operands(self) -> Optional[int]:
60 Operation.INVERSION: 1,
61 Operation.CONJUNCTION: 2,
62 Operation.DISJUNCTION: 2,
64 return table.get(self, None)
68 """A collection of searchable documents.
71 >>> c.add_doc(Document(
73 ... tags=set(['urgent', 'important']),
75 ... ('author', 'Scott'),
76 ... ('subject', 'your anniversary')
81 >>> c.add_doc(Document(
83 ... tags=set(['important']),
85 ... ('author', 'Joe'),
86 ... ('subject', 'your performance at work')
91 >>> c.add_doc(Document(
93 ... tags=set(['urgent']),
95 ... ('author', 'Scott'),
96 ... ('subject', 'car turning in front of you')
101 >>> c.query('author:Scott and important')
105 def __init__(self) -> None:
106 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
107 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
108 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
109 self.documents_by_docid: Dict[str, Document] = {}
111 def add_doc(self, doc: Document) -> None:
112 """Add a new Document to the Corpus. Each Document must have a
113 distinct docid that will serve as its primary identifier. If
114 the same Document is added multiple times, only the most
115 recent addition is indexed. If two distinct documents with
116 the same docid are added, the latter klobbers the former in the
119 Each Document may have an optional set of tags which can be
120 used later in expressions to the query method.
122 Each Document may have an optional list of key->value tuples
123 which can be used later in expressions to the query method.
125 Document includes a user-defined "reference" field which is
126 never interpreted by this module. This is meant to allow easy
127 mapping between Documents in this corpus and external objects
131 if doc.docid in self.documents_by_docid:
132 # Handle collisions; assume that we are re-indexing the
133 # same document so remove it from the indexes before
134 # adding it back again.
135 colliding_doc = self.documents_by_docid[doc.docid]
136 assert colliding_doc.docid == doc.docid
137 del self.documents_by_docid[doc.docid]
138 for tag in colliding_doc.tags:
139 self.docids_by_tag[tag].remove(doc.docid)
140 for key, value in colliding_doc.properties:
141 self.docids_by_property[(key, value)].remove(doc.docid)
142 self.docids_with_property[key].remove(doc.docid)
144 # Index the new Document
145 assert doc.docid not in self.documents_by_docid
146 self.documents_by_docid[doc.docid] = doc
148 self.docids_by_tag[tag].add(doc.docid)
149 for key, value in doc.properties:
150 self.docids_by_property[(key, value)].add(doc.docid)
151 self.docids_with_property[key].add(doc.docid)
153 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
154 """Return the set of docids that have a particular tag."""
156 return self.docids_by_tag[tag]
158 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
159 """Return the set of docids with a tag that contains a str"""
162 for search_tag in self.docids_by_tag:
163 if tag in search_tag:
164 for docid in self.docids_by_tag[search_tag]:
168 def get_docids_with_property(self, key: str) -> Set[str]:
169 """Return the set of docids that have a particular property no matter
170 what that property's value.
173 return self.docids_with_property[key]
175 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
176 """Return the set of docids that have a particular property with a
180 return self.docids_by_property[(key, value)]
182 def invert_docid_set(self, original: Set[str]) -> Set[str]:
183 """Invert a set of docids."""
185 return {docid for docid in self.documents_by_docid if docid not in original}
187 def get_doc(self, docid: str) -> Optional[Document]:
188 """Given a docid, retrieve the previously added Document."""
190 return self.documents_by_docid.get(docid, None)
192 def query(self, query: str) -> Optional[Set[str]]:
193 """Query the corpus for documents that match a logical expression.
194 Returns a (potentially empty) set of docids for the matching
195 (previously added) documents or None on error.
199 tag1 and tag2 and not tag3
201 (tag1 or tag2) and (tag3 or tag4)
203 (tag1 and key2:value2) or (tag2 and key1:value1)
211 root = self._parse_query(query)
212 except ParseError as e:
213 print(e.message, file=sys.stderr)
217 def _parse_query(self, query: str):
218 """Internal parse helper; prefer to use query instead."""
220 parens = set(["(", ")"])
221 and_or = set(["and", "or"])
223 def operator_precedence(token: str) -> Optional[int]:
231 return table.get(token, None)
233 def is_operator(token: str) -> bool:
234 return operator_precedence(token) is not None
237 tokens = query.split()
239 # Handle ( and ) operators stuck to the ends of tokens
240 # that split() doesn't understand.
254 def evaluate(corpus: Corpus, stack: List[str]):
255 node_stack: List[Node] = []
258 if not is_operator(token):
259 node = Node(corpus, Operation.QUERY, [token])
262 operation = Operation.from_token(token)
263 operand_count = operation.num_operands()
264 if len(node_stack) < operand_count:
265 raise ParseError(f"Incorrect number of operations for {operation}")
266 for _ in range(operation.num_operands()):
267 args.append(node_stack.pop())
268 node = Node(corpus, operation, args)
269 node_stack.append(node)
274 for token in lex(query):
275 if not is_operator(token):
276 output_stack.append(token)
279 # token is an operator...
281 operator_stack.append(token)
284 while len(operator_stack) > 0:
285 pop_operator = operator_stack.pop()
286 if pop_operator != "(":
287 output_stack.append(pop_operator)
292 raise ParseError("Unbalanced parenthesis in query expression")
296 my_precedence = operator_precedence(token)
297 if my_precedence is None:
298 raise ParseError(f"Unknown operator: {token}")
299 while len(operator_stack) > 0:
300 peek_operator = operator_stack[-1]
301 if not is_operator(peek_operator) or peek_operator == "(":
303 peek_precedence = operator_precedence(peek_operator)
304 if peek_precedence is None:
305 raise ParseError("Internal error")
307 (peek_precedence < my_precedence)
308 or (peek_precedence == my_precedence)
309 and (peek_operator not in and_or)
312 output_stack.append(operator_stack.pop())
313 operator_stack.append(token)
314 while len(operator_stack) > 0:
315 token = operator_stack.pop()
317 raise ParseError("Unbalanced parenthesis in query expression")
318 output_stack.append(token)
319 return evaluate(self, output_stack)
323 """A query AST node."""
329 operands: Sequence[Union[Node, str]],
333 self.operands = operands
335 def eval(self) -> Set[str]:
336 """Evaluate this node."""
338 evaled_operands: List[Union[Set[str], str]] = []
339 for operand in self.operands:
340 if isinstance(operand, Node):
341 evaled_operands.append(operand.eval())
342 elif isinstance(operand, str):
343 evaled_operands.append(operand)
345 raise ParseError(f"Unexpected operand: {operand}")
348 if self.op is Operation.QUERY:
349 for tag in evaled_operands:
350 if isinstance(tag, str):
353 key, value = tag.split(":")
354 except ValueError as v:
355 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
357 r = self.corpus.get_docids_with_property(key)
359 r = self.corpus.get_docids_by_property(key, value)
361 r = self.corpus.get_docids_by_exact_tag(tag)
364 raise ParseError(f"Unexpected query {tag}")
365 elif self.op is Operation.DISJUNCTION:
366 if len(evaled_operands) != 2:
367 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
368 retval.update(evaled_operands[0])
369 retval.update(evaled_operands[1])
370 elif self.op is Operation.CONJUNCTION:
371 if len(evaled_operands) != 2:
372 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
373 retval.update(evaled_operands[0])
374 retval = retval.intersection(evaled_operands[1])
375 elif self.op is Operation.INVERSION:
376 if len(evaled_operands) != 1:
377 raise ParseError("Operation.INVERSION (not) expects one operand.")
378 _ = evaled_operands[0]
379 if isinstance(_, set):
380 retval.update(self.corpus.invert_docid_set(_))
382 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
386 if __name__ == '__main__':