3 """This is a module concerned with the creation of and searching of a
4 corpus of documents. The corpus is held in memory for fast
7 from __future__ import annotations
11 from collections import defaultdict
12 from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
15 class ParseError(Exception):
16 """An error encountered while parsing a logical search expression."""
18 def __init__(self, message: str):
20 self.message = message
23 class Document(NamedTuple):
24 """A tuple representing a searchable document."""
26 docid: str # a unique idenfier for the document
27 tags: Set[str] # an optional set of tags
28 properties: List[Tuple[str, str]] # an optional set of key->value properties
29 reference: Any # an optional reference to something else
32 class Operation(enum.Enum):
33 """A logical search query operation."""
41 def from_token(token: str):
43 "not": Operation.INVERSION,
44 "and": Operation.CONJUNCTION,
45 "or": Operation.DISJUNCTION,
47 return table.get(token, None)
49 def num_operands(self) -> Optional[int]:
51 Operation.INVERSION: 1,
52 Operation.CONJUNCTION: 2,
53 Operation.DISJUNCTION: 2,
55 return table.get(self, None)
59 """A collection of searchable documents.
62 >>> c.add_doc(Document(
64 ... tags=set(['urgent', 'important']),
66 ... ('author', 'Scott'),
67 ... ('subject', 'your anniversary')
72 >>> c.add_doc(Document(
74 ... tags=set(['important']),
76 ... ('author', 'Joe'),
77 ... ('subject', 'your performance at work')
82 >>> c.add_doc(Document(
84 ... tags=set(['urgent']),
86 ... ('author', 'Scott'),
87 ... ('subject', 'car turning in front of you')
92 >>> c.query('author:Scott and important')
96 def __init__(self) -> None:
97 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
98 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
99 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
100 self.documents_by_docid: Dict[str, Document] = {}
102 def add_doc(self, doc: Document) -> None:
103 """Add a new Document to the Corpus. Each Document must have a
104 distinct docid that will serve as its primary identifier. If
105 the same Document is added multiple times, only the most
106 recent addition is indexed. If two distinct documents with
107 the same docid are added, the latter klobbers the former in the
110 Each Document may have an optional set of tags which can be
111 used later in expressions to the query method.
113 Each Document may have an optional list of key->value tuples
114 which can be used later in expressions to the query method.
116 Document includes a user-defined "reference" field which is
117 never interpreted by this module. This is meant to allow easy
118 mapping between Documents in this corpus and external objects
122 if doc.docid in self.documents_by_docid:
123 # Handle collisions; assume that we are re-indexing the
124 # same document so remove it from the indexes before
125 # adding it back again.
126 colliding_doc = self.documents_by_docid[doc.docid]
127 assert colliding_doc.docid == doc.docid
128 del self.documents_by_docid[doc.docid]
129 for tag in colliding_doc.tags:
130 self.docids_by_tag[tag].remove(doc.docid)
131 for key, value in colliding_doc.properties:
132 self.docids_by_property[(key, value)].remove(doc.docid)
133 self.docids_with_property[key].remove(doc.docid)
135 # Index the new Document
136 assert doc.docid not in self.documents_by_docid
137 self.documents_by_docid[doc.docid] = doc
139 self.docids_by_tag[tag].add(doc.docid)
140 for key, value in doc.properties:
141 self.docids_by_property[(key, value)].add(doc.docid)
142 self.docids_with_property[key].add(doc.docid)
144 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
145 """Return the set of docids that have a particular tag."""
147 return self.docids_by_tag[tag]
149 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
150 """Return the set of docids with a tag that contains a str"""
153 for search_tag in self.docids_by_tag:
154 if tag in search_tag:
155 for docid in self.docids_by_tag[search_tag]:
159 def get_docids_with_property(self, key: str) -> Set[str]:
160 """Return the set of docids that have a particular property no matter
161 what that property's value.
164 return self.docids_with_property[key]
166 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
167 """Return the set of docids that have a particular property with a
171 return self.docids_by_property[(key, value)]
173 def invert_docid_set(self, original: Set[str]) -> Set[str]:
174 """Invert a set of docids."""
176 return set([docid for docid in self.documents_by_docid.keys() if docid not in original])
178 def get_doc(self, docid: str) -> Optional[Document]:
179 """Given a docid, retrieve the previously added Document."""
181 return self.documents_by_docid.get(docid, None)
183 def query(self, query: str) -> Optional[Set[str]]:
184 """Query the corpus for documents that match a logical expression.
185 Returns a (potentially empty) set of docids for the matching
186 (previously added) documents or None on error.
190 tag1 and tag2 and not tag3
192 (tag1 or tag2) and (tag3 or tag4)
194 (tag1 and key2:value2) or (tag2 and key1:value1)
202 root = self._parse_query(query)
203 except ParseError as e:
204 print(e.message, file=sys.stderr)
208 def _parse_query(self, query: str):
209 """Internal parse helper; prefer to use query instead."""
211 parens = set(["(", ")"])
212 and_or = set(["and", "or"])
214 def operator_precedence(token: str) -> Optional[int]:
222 return table.get(token, None)
224 def is_operator(token: str) -> bool:
225 return operator_precedence(token) is not None
228 tokens = query.split()
230 # Handle ( and ) operators stuck to the ends of tokens
231 # that split() doesn't understand.
245 def evaluate(corpus: Corpus, stack: List[str]):
246 node_stack: List[Node] = []
249 if not is_operator(token):
250 node = Node(corpus, Operation.QUERY, [token])
253 operation = Operation.from_token(token)
254 operand_count = operation.num_operands()
255 if len(node_stack) < operand_count:
256 raise ParseError(f"Incorrect number of operations for {operation}")
257 for _ in range(operation.num_operands()):
258 args.append(node_stack.pop())
259 node = Node(corpus, operation, args)
260 node_stack.append(node)
265 for token in lex(query):
266 if not is_operator(token):
267 output_stack.append(token)
270 # token is an operator...
272 operator_stack.append(token)
275 while len(operator_stack) > 0:
276 pop_operator = operator_stack.pop()
277 if pop_operator != "(":
278 output_stack.append(pop_operator)
283 raise ParseError("Unbalanced parenthesis in query expression")
287 my_precedence = operator_precedence(token)
288 if my_precedence is None:
289 raise ParseError(f"Unknown operator: {token}")
290 while len(operator_stack) > 0:
291 peek_operator = operator_stack[-1]
292 if not is_operator(peek_operator) or peek_operator == "(":
294 peek_precedence = operator_precedence(peek_operator)
295 if peek_precedence is None:
296 raise ParseError("Internal error")
298 (peek_precedence < my_precedence)
299 or (peek_precedence == my_precedence)
300 and (peek_operator not in and_or)
303 output_stack.append(operator_stack.pop())
304 operator_stack.append(token)
305 while len(operator_stack) > 0:
306 token = operator_stack.pop()
308 raise ParseError("Unbalanced parenthesis in query expression")
309 output_stack.append(token)
310 return evaluate(self, output_stack)
314 """A query AST node."""
320 operands: Sequence[Union[Node, str]],
324 self.operands = operands
326 def eval(self) -> Set[str]:
327 """Evaluate this node."""
329 evaled_operands: List[Union[Set[str], str]] = []
330 for operand in self.operands:
331 if isinstance(operand, Node):
332 evaled_operands.append(operand.eval())
333 elif isinstance(operand, str):
334 evaled_operands.append(operand)
336 raise ParseError(f"Unexpected operand: {operand}")
339 if self.op is Operation.QUERY:
340 for tag in evaled_operands:
341 if isinstance(tag, str):
344 key, value = tag.split(":")
345 except ValueError as v:
346 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
348 r = self.corpus.get_docids_with_property(key)
350 r = self.corpus.get_docids_by_property(key, value)
352 r = self.corpus.get_docids_by_exact_tag(tag)
355 raise ParseError(f"Unexpected query {tag}")
356 elif self.op is Operation.DISJUNCTION:
357 if len(evaled_operands) != 2:
358 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
359 retval.update(evaled_operands[0])
360 retval.update(evaled_operands[1])
361 elif self.op is Operation.CONJUNCTION:
362 if len(evaled_operands) != 2:
363 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
364 retval.update(evaled_operands[0])
365 retval = retval.intersection(evaled_operands[1])
366 elif self.op is Operation.INVERSION:
367 if len(evaled_operands) != 1:
368 raise ParseError("Operation.INVERSION (not) expects one operand.")
369 _ = evaled_operands[0]
370 if isinstance(_, set):
371 retval.update(self.corpus.invert_docid_set(_))
373 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
377 if __name__ == '__main__':