3 from __future__ import annotations
7 from collections import defaultdict
8 from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
11 class ParseError(Exception):
12 """An error encountered while parsing a logical search expression."""
14 def __init__(self, message: str):
15 self.message = message
18 class Document(NamedTuple):
19 """A tuple representing a searchable document."""
21 docid: str # a unique idenfier for the document
22 tags: Set[str] # an optional set of tags
23 properties: List[Tuple[str, str]] # an optional set of key->value properties
24 reference: Any # an optional reference to something else
27 class Operation(enum.Enum):
28 """A logical search query operation."""
36 def from_token(token: str):
38 "not": Operation.INVERSION,
39 "and": Operation.CONJUNCTION,
40 "or": Operation.DISJUNCTION,
42 return table.get(token, None)
44 def num_operands(self) -> Optional[int]:
46 Operation.INVERSION: 1,
47 Operation.CONJUNCTION: 2,
48 Operation.DISJUNCTION: 2,
50 return table.get(self, None)
54 """A collection of searchable documents.
57 >>> c.add_doc(Document(
59 ... tags=set(['urgent', 'important']),
61 ... ('author', 'Scott'),
62 ... ('subject', 'your anniversary')
67 >>> c.add_doc(Document(
69 ... tags=set(['important']),
71 ... ('author', 'Joe'),
72 ... ('subject', 'your performance at work')
77 >>> c.add_doc(Document(
79 ... tags=set(['urgent']),
81 ... ('author', 'Scott'),
82 ... ('subject', 'car turning in front of you')
87 >>> c.query('author:Scott and important')
91 def __init__(self) -> None:
92 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
93 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
94 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
95 self.documents_by_docid: Dict[str, Document] = {}
97 def add_doc(self, doc: Document) -> None:
98 """Add a new Document to the Corpus. Each Document must have a
99 distinct docid that will serve as its primary identifier. If
100 the same Document is added multiple times, only the most
101 recent addition is indexed. If two distinct documents with
102 the same docid are added, the latter klobbers the former in the
105 Each Document may have an optional set of tags which can be
106 used later in expressions to the query method.
108 Each Document may have an optional list of key->value tuples
109 which can be used later in expressions to the query method.
111 Document includes a user-defined "reference" field which is
112 never interpreted by this module. This is meant to allow easy
113 mapping between Documents in this corpus and external objects
117 if doc.docid in self.documents_by_docid:
118 # Handle collisions; assume that we are re-indexing the
119 # same document so remove it from the indexes before
120 # adding it back again.
121 colliding_doc = self.documents_by_docid[doc.docid]
122 assert colliding_doc.docid == doc.docid
123 del self.documents_by_docid[doc.docid]
124 for tag in colliding_doc.tags:
125 self.docids_by_tag[tag].remove(doc.docid)
126 for key, value in colliding_doc.properties:
127 self.docids_by_property[(key, value)].remove(doc.docid)
128 self.docids_with_property[key].remove(doc.docid)
130 # Index the new Document
131 assert doc.docid not in self.documents_by_docid
132 self.documents_by_docid[doc.docid] = doc
134 self.docids_by_tag[tag].add(doc.docid)
135 for key, value in doc.properties:
136 self.docids_by_property[(key, value)].add(doc.docid)
137 self.docids_with_property[key].add(doc.docid)
139 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
140 """Return the set of docids that have a particular tag."""
142 return self.docids_by_tag[tag]
144 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
145 """Return the set of docids with a tag that contains a str"""
148 for search_tag in self.docids_by_tag:
149 if tag in search_tag:
150 for docid in self.docids_by_tag[search_tag]:
154 def get_docids_with_property(self, key: str) -> Set[str]:
155 """Return the set of docids that have a particular property no matter
156 what that property's value.
159 return self.docids_with_property[key]
161 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
162 """Return the set of docids that have a particular property with a
166 return self.docids_by_property[(key, value)]
168 def invert_docid_set(self, original: Set[str]) -> Set[str]:
169 """Invert a set of docids."""
172 [docid for docid in self.documents_by_docid.keys() if docid not in original]
175 def get_doc(self, docid: str) -> Optional[Document]:
176 """Given a docid, retrieve the previously added Document."""
178 return self.documents_by_docid.get(docid, None)
180 def query(self, query: str) -> Optional[Set[str]]:
181 """Query the corpus for documents that match a logical expression.
182 Returns a (potentially empty) set of docids for the matching
183 (previously added) documents or None on error.
187 tag1 and tag2 and not tag3
189 (tag1 or tag2) and (tag3 or tag4)
191 (tag1 and key2:value2) or (tag2 and key1:value1)
199 root = self._parse_query(query)
200 except ParseError as e:
201 print(e.message, file=sys.stderr)
205 def _parse_query(self, query: str):
206 """Internal parse helper; prefer to use query instead."""
208 parens = set(["(", ")"])
209 and_or = set(["and", "or"])
211 def operator_precedence(token: str) -> Optional[int]:
219 return table.get(token, None)
221 def is_operator(token: str) -> bool:
222 return operator_precedence(token) is not None
225 tokens = query.split()
227 # Handle ( and ) operators stuck to the ends of tokens
228 # that split() doesn't understand.
242 def evaluate(corpus: Corpus, stack: List[str]):
243 node_stack: List[Node] = []
246 if not is_operator(token):
247 node = Node(corpus, Operation.QUERY, [token])
250 operation = Operation.from_token(token)
251 operand_count = operation.num_operands()
252 if len(node_stack) < operand_count:
254 f"Incorrect number of operations for {operation}"
256 for _ in range(operation.num_operands()):
257 args.append(node_stack.pop())
258 node = Node(corpus, operation, args)
259 node_stack.append(node)
264 for token in lex(query):
265 if not is_operator(token):
266 output_stack.append(token)
269 # token is an operator...
271 operator_stack.append(token)
274 while len(operator_stack) > 0:
275 pop_operator = operator_stack.pop()
276 if pop_operator != "(":
277 output_stack.append(pop_operator)
282 raise ParseError("Unbalanced parenthesis in query expression")
286 my_precedence = operator_precedence(token)
287 if my_precedence is None:
288 raise ParseError(f"Unknown operator: {token}")
289 while len(operator_stack) > 0:
290 peek_operator = operator_stack[-1]
291 if not is_operator(peek_operator) or peek_operator == "(":
293 peek_precedence = operator_precedence(peek_operator)
294 if peek_precedence is None:
295 raise ParseError("Internal error")
297 (peek_precedence < my_precedence)
298 or (peek_precedence == my_precedence)
299 and (peek_operator not in and_or)
302 output_stack.append(operator_stack.pop())
303 operator_stack.append(token)
304 while len(operator_stack) > 0:
305 token = operator_stack.pop()
307 raise ParseError("Unbalanced parenthesis in query expression")
308 output_stack.append(token)
309 return evaluate(self, output_stack)
313 """A query AST node."""
319 operands: Sequence[Union[Node, str]],
323 self.operands = operands
325 def eval(self) -> Set[str]:
326 """Evaluate this node."""
328 evaled_operands: List[Union[Set[str], str]] = []
329 for operand in self.operands:
330 if isinstance(operand, Node):
331 evaled_operands.append(operand.eval())
332 elif isinstance(operand, str):
333 evaled_operands.append(operand)
335 raise ParseError(f"Unexpected operand: {operand}")
338 if self.op is Operation.QUERY:
339 for tag in evaled_operands:
340 if isinstance(tag, str):
343 key, value = tag.split(":")
344 except ValueError as v:
346 f'Invalid key:value syntax at "{tag}"'
349 r = self.corpus.get_docids_with_property(key)
351 r = self.corpus.get_docids_by_property(key, value)
353 r = self.corpus.get_docids_by_exact_tag(tag)
356 raise ParseError(f"Unexpected query {tag}")
357 elif self.op is Operation.DISJUNCTION:
358 if len(evaled_operands) != 2:
359 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
360 retval.update(evaled_operands[0])
361 retval.update(evaled_operands[1])
362 elif self.op is Operation.CONJUNCTION:
363 if len(evaled_operands) != 2:
364 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
365 retval.update(evaled_operands[0])
366 retval = retval.intersection(evaled_operands[1])
367 elif self.op is Operation.INVERSION:
368 if len(evaled_operands) != 1:
369 raise ParseError("Operation.INVERSION (not) expects one operand.")
370 _ = evaled_operands[0]
371 if isinstance(_, set):
372 retval.update(self.corpus.invert_docid_set(_))
374 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
378 if __name__ == '__main__':