3 from __future__ import annotations
7 from collections import defaultdict
8 from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
11 class ParseError(Exception):
12 """An error encountered while parsing a logical search expression."""
14 def __init__(self, message: str):
15 self.message = message
18 class Document(NamedTuple):
19 """A tuple representing a searchable document."""
21 docid: str # a unique idenfier for the document
22 tags: Set[str] # an optional set of tags
23 properties: List[Tuple[str, str]] # an optional set of key->value properties
24 reference: Any # an optional reference to something else
27 class Operation(enum.Enum):
28 """A logical search query operation."""
36 def from_token(token: str):
38 "not": Operation.INVERSION,
39 "and": Operation.CONJUNCTION,
40 "or": Operation.DISJUNCTION,
42 return table.get(token, None)
44 def num_operands(self) -> Optional[int]:
46 Operation.INVERSION: 1,
47 Operation.CONJUNCTION: 2,
48 Operation.DISJUNCTION: 2,
50 return table.get(self, None)
54 """A collection of searchable documents.
57 >>> c.add_doc(Document(
59 ... tags=set(['urgent', 'important']),
61 ... ('author', 'Scott'),
62 ... ('subject', 'your anniversary')
67 >>> c.add_doc(Document(
69 ... tags=set(['important']),
71 ... ('author', 'Joe'),
72 ... ('subject', 'your performance at work')
77 >>> c.add_doc(Document(
79 ... tags=set(['urgent']),
81 ... ('author', 'Scott'),
82 ... ('subject', 'car turning in front of you')
87 >>> c.query('author:Scott and important')
91 def __init__(self) -> None:
92 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
93 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
94 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
95 self.documents_by_docid: Dict[str, Document] = {}
97 def add_doc(self, doc: Document) -> None:
98 """Add a new Document to the Corpus. Each Document must have a
99 distinct docid that will serve as its primary identifier. If
100 the same Document is added multiple times, only the most
101 recent addition is indexed. If two distinct documents with
102 the same docid are added, the latter klobbers the former in the
105 Each Document may have an optional set of tags which can be
106 used later in expressions to the query method.
108 Each Document may have an optional list of key->value tuples
109 which can be used later in expressions to the query method.
111 Document includes a user-defined "reference" field which is
112 never interpreted by this module. This is meant to allow easy
113 mapping between Documents in this corpus and external objects
117 if doc.docid in self.documents_by_docid:
118 # Handle collisions; assume that we are re-indexing the
119 # same document so remove it from the indexes before
120 # adding it back again.
121 colliding_doc = self.documents_by_docid[doc.docid]
122 assert colliding_doc.docid == doc.docid
123 del self.documents_by_docid[doc.docid]
124 for tag in colliding_doc.tags:
125 self.docids_by_tag[tag].remove(doc.docid)
126 for key, value in colliding_doc.properties:
127 self.docids_by_property[(key, value)].remove(doc.docid)
128 self.docids_with_property[key].remove(doc.docid)
130 # Index the new Document
131 assert doc.docid not in self.documents_by_docid
132 self.documents_by_docid[doc.docid] = doc
134 self.docids_by_tag[tag].add(doc.docid)
135 for key, value in doc.properties:
136 self.docids_by_property[(key, value)].add(doc.docid)
137 self.docids_with_property[key].add(doc.docid)
139 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
140 """Return the set of docids that have a particular tag."""
142 return self.docids_by_tag[tag]
144 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
145 """Return the set of docids with a tag that contains a str"""
148 for search_tag in self.docids_by_tag:
149 if tag in search_tag:
150 for docid in self.docids_by_tag[search_tag]:
154 def get_docids_with_property(self, key: str) -> Set[str]:
155 """Return the set of docids that have a particular property no matter
156 what that property's value.
159 return self.docids_with_property[key]
161 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
162 """Return the set of docids that have a particular property with a
166 return self.docids_by_property[(key, value)]
168 def invert_docid_set(self, original: Set[str]) -> Set[str]:
169 """Invert a set of docids."""
171 return set([docid for docid in self.documents_by_docid.keys() if docid not in original])
173 def get_doc(self, docid: str) -> Optional[Document]:
174 """Given a docid, retrieve the previously added Document."""
176 return self.documents_by_docid.get(docid, None)
178 def query(self, query: str) -> Optional[Set[str]]:
179 """Query the corpus for documents that match a logical expression.
180 Returns a (potentially empty) set of docids for the matching
181 (previously added) documents or None on error.
185 tag1 and tag2 and not tag3
187 (tag1 or tag2) and (tag3 or tag4)
189 (tag1 and key2:value2) or (tag2 and key1:value1)
197 root = self._parse_query(query)
198 except ParseError as e:
199 print(e.message, file=sys.stderr)
203 def _parse_query(self, query: str):
204 """Internal parse helper; prefer to use query instead."""
206 parens = set(["(", ")"])
207 and_or = set(["and", "or"])
209 def operator_precedence(token: str) -> Optional[int]:
217 return table.get(token, None)
219 def is_operator(token: str) -> bool:
220 return operator_precedence(token) is not None
223 tokens = query.split()
225 # Handle ( and ) operators stuck to the ends of tokens
226 # that split() doesn't understand.
240 def evaluate(corpus: Corpus, stack: List[str]):
241 node_stack: List[Node] = []
244 if not is_operator(token):
245 node = Node(corpus, Operation.QUERY, [token])
248 operation = Operation.from_token(token)
249 operand_count = operation.num_operands()
250 if len(node_stack) < operand_count:
251 raise ParseError(f"Incorrect number of operations for {operation}")
252 for _ in range(operation.num_operands()):
253 args.append(node_stack.pop())
254 node = Node(corpus, operation, args)
255 node_stack.append(node)
260 for token in lex(query):
261 if not is_operator(token):
262 output_stack.append(token)
265 # token is an operator...
267 operator_stack.append(token)
270 while len(operator_stack) > 0:
271 pop_operator = operator_stack.pop()
272 if pop_operator != "(":
273 output_stack.append(pop_operator)
278 raise ParseError("Unbalanced parenthesis in query expression")
282 my_precedence = operator_precedence(token)
283 if my_precedence is None:
284 raise ParseError(f"Unknown operator: {token}")
285 while len(operator_stack) > 0:
286 peek_operator = operator_stack[-1]
287 if not is_operator(peek_operator) or peek_operator == "(":
289 peek_precedence = operator_precedence(peek_operator)
290 if peek_precedence is None:
291 raise ParseError("Internal error")
293 (peek_precedence < my_precedence)
294 or (peek_precedence == my_precedence)
295 and (peek_operator not in and_or)
298 output_stack.append(operator_stack.pop())
299 operator_stack.append(token)
300 while len(operator_stack) > 0:
301 token = operator_stack.pop()
303 raise ParseError("Unbalanced parenthesis in query expression")
304 output_stack.append(token)
305 return evaluate(self, output_stack)
309 """A query AST node."""
315 operands: Sequence[Union[Node, str]],
319 self.operands = operands
321 def eval(self) -> Set[str]:
322 """Evaluate this node."""
324 evaled_operands: List[Union[Set[str], str]] = []
325 for operand in self.operands:
326 if isinstance(operand, Node):
327 evaled_operands.append(operand.eval())
328 elif isinstance(operand, str):
329 evaled_operands.append(operand)
331 raise ParseError(f"Unexpected operand: {operand}")
334 if self.op is Operation.QUERY:
335 for tag in evaled_operands:
336 if isinstance(tag, str):
339 key, value = tag.split(":")
340 except ValueError as v:
341 raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
343 r = self.corpus.get_docids_with_property(key)
345 r = self.corpus.get_docids_by_property(key, value)
347 r = self.corpus.get_docids_by_exact_tag(tag)
350 raise ParseError(f"Unexpected query {tag}")
351 elif self.op is Operation.DISJUNCTION:
352 if len(evaled_operands) != 2:
353 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
354 retval.update(evaled_operands[0])
355 retval.update(evaled_operands[1])
356 elif self.op is Operation.CONJUNCTION:
357 if len(evaled_operands) != 2:
358 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
359 retval.update(evaled_operands[0])
360 retval = retval.intersection(evaled_operands[1])
361 elif self.op is Operation.INVERSION:
362 if len(evaled_operands) != 1:
363 raise ParseError("Operation.INVERSION (not) expects one operand.")
364 _ = evaled_operands[0]
365 if isinstance(_, set):
366 retval.update(self.corpus.invert_docid_set(_))
368 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
372 if __name__ == '__main__':