3 from __future__ import annotations
5 from collections import defaultdict
21 class ParseError(Exception):
22 """An error encountered while parsing a logical search expression."""
24 def __init__(self, message: str):
25 self.message = message
28 class Document(NamedTuple):
29 """A tuple representing a searchable document."""
31 docid: str # a unique idenfier for the document
32 tags: Set[str] # an optional set of tags
33 properties: List[Tuple[str, str]] # an optional set of key->value properties
34 reference: Any # an optional reference to something else
37 class Operation(enum.Enum):
38 """A logical search query operation."""
46 def from_token(token: str):
48 "not": Operation.INVERSION,
49 "and": Operation.CONJUNCTION,
50 "or": Operation.DISJUNCTION,
52 return table.get(token, None)
54 def num_operands(self) -> Optional[int]:
56 Operation.INVERSION: 1,
57 Operation.CONJUNCTION: 2,
58 Operation.DISJUNCTION: 2,
60 return table.get(self, None)
64 """A collection of searchable documents.
67 >>> c.add_doc(Document(
69 ... tags=set(['urgent', 'important']),
71 ... ('author', 'Scott'),
72 ... ('subject', 'your anniversary')
77 >>> c.add_doc(Document(
79 ... tags=set(['important']),
81 ... ('author', 'Joe'),
82 ... ('subject', 'your performance at work')
87 >>> c.add_doc(Document(
89 ... tags=set(['urgent']),
91 ... ('author', 'Scott'),
92 ... ('subject', 'car turning in front of you')
97 >>> c.query('author:Scott and important')
101 def __init__(self) -> None:
102 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
103 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
104 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
105 self.documents_by_docid: Dict[str, Document] = {}
107 def add_doc(self, doc: Document) -> None:
108 """Add a new Document to the Corpus. Each Document must have a
109 distinct docid that will serve as its primary identifier. If
110 the same Document is added multiple times, only the most
111 recent addition is indexed. If two distinct documents with
112 the same docid are added, the latter klobbers the former in the
115 Each Document may have an optional set of tags which can be
116 used later in expressions to the query method.
118 Each Document may have an optional list of key->value tuples
119 which can be used later in expressions to the query method.
121 Document includes a user-defined "reference" field which is
122 never interpreted by this module. This is meant to allow easy
123 mapping between Documents in this corpus and external objects
127 if doc.docid in self.documents_by_docid:
128 # Handle collisions; assume that we are re-indexing the
129 # same document so remove it from the indexes before
130 # adding it back again.
131 colliding_doc = self.documents_by_docid[doc.docid]
132 assert colliding_doc.docid == doc.docid
133 del self.documents_by_docid[doc.docid]
134 for tag in colliding_doc.tags:
135 self.docids_by_tag[tag].remove(doc.docid)
136 for key, value in colliding_doc.properties:
137 self.docids_by_property[(key, value)].remove(doc.docid)
138 self.docids_with_property[key].remove(doc.docid)
140 # Index the new Document
141 assert doc.docid not in self.documents_by_docid
142 self.documents_by_docid[doc.docid] = doc
144 self.docids_by_tag[tag].add(doc.docid)
145 for key, value in doc.properties:
146 self.docids_by_property[(key, value)].add(doc.docid)
147 self.docids_with_property[key].add(doc.docid)
149 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
150 """Return the set of docids that have a particular tag."""
152 return self.docids_by_tag[tag]
154 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
155 """Return the set of docids with a tag that contains a str"""
158 for search_tag in self.docids_by_tag:
159 if tag in search_tag:
160 for docid in self.docids_by_tag[search_tag]:
164 def get_docids_with_property(self, key: str) -> Set[str]:
165 """Return the set of docids that have a particular property no matter
166 what that property's value.
169 return self.docids_with_property[key]
171 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
172 """Return the set of docids that have a particular property with a
176 return self.docids_by_property[(key, value)]
178 def invert_docid_set(self, original: Set[str]) -> Set[str]:
179 """Invert a set of docids."""
182 [docid for docid in self.documents_by_docid.keys() if docid not in original]
185 def get_doc(self, docid: str) -> Optional[Document]:
186 """Given a docid, retrieve the previously added Document."""
188 return self.documents_by_docid.get(docid, None)
190 def query(self, query: str) -> Optional[Set[str]]:
191 """Query the corpus for documents that match a logical expression.
192 Returns a (potentially empty) set of docids for the matching
193 (previously added) documents or None on error.
197 tag1 and tag2 and not tag3
199 (tag1 or tag2) and (tag3 or tag4)
201 (tag1 and key2:value2) or (tag2 and key1:value1)
209 root = self._parse_query(query)
210 except ParseError as e:
211 print(e.message, file=sys.stderr)
215 def _parse_query(self, query: str):
216 """Internal parse helper; prefer to use query instead."""
218 parens = set(["(", ")"])
219 and_or = set(["and", "or"])
221 def operator_precedence(token: str) -> Optional[int]:
229 return table.get(token, None)
231 def is_operator(token: str) -> bool:
232 return operator_precedence(token) is not None
235 tokens = query.split()
237 # Handle ( and ) operators stuck to the ends of tokens
238 # that split() doesn't understand.
252 def evaluate(corpus: Corpus, stack: List[str]):
253 node_stack: List[Node] = []
256 if not is_operator(token):
257 node = Node(corpus, Operation.QUERY, [token])
260 operation = Operation.from_token(token)
261 operand_count = operation.num_operands()
262 if len(node_stack) < operand_count:
264 f"Incorrect number of operations for {operation}"
266 for _ in range(operation.num_operands()):
267 args.append(node_stack.pop())
268 node = Node(corpus, operation, args)
269 node_stack.append(node)
274 for token in lex(query):
275 if not is_operator(token):
276 output_stack.append(token)
279 # token is an operator...
281 operator_stack.append(token)
284 while len(operator_stack) > 0:
285 pop_operator = operator_stack.pop()
286 if pop_operator != "(":
287 output_stack.append(pop_operator)
292 raise ParseError("Unbalanced parenthesis in query expression")
296 my_precedence = operator_precedence(token)
297 if my_precedence is None:
298 raise ParseError(f"Unknown operator: {token}")
299 while len(operator_stack) > 0:
300 peek_operator = operator_stack[-1]
301 if not is_operator(peek_operator) or peek_operator == "(":
303 peek_precedence = operator_precedence(peek_operator)
304 if peek_precedence is None:
305 raise ParseError("Internal error")
307 (peek_precedence < my_precedence)
308 or (peek_precedence == my_precedence)
309 and (peek_operator not in and_or)
312 output_stack.append(operator_stack.pop())
313 operator_stack.append(token)
314 while len(operator_stack) > 0:
315 token = operator_stack.pop()
317 raise ParseError("Unbalanced parenthesis in query expression")
318 output_stack.append(token)
319 return evaluate(self, output_stack)
323 """A query AST node."""
329 operands: Sequence[Union[Node, str]],
333 self.operands = operands
335 def eval(self) -> Set[str]:
336 """Evaluate this node."""
338 evaled_operands: List[Union[Set[str], str]] = []
339 for operand in self.operands:
340 if isinstance(operand, Node):
341 evaled_operands.append(operand.eval())
342 elif isinstance(operand, str):
343 evaled_operands.append(operand)
345 raise ParseError(f"Unexpected operand: {operand}")
348 if self.op is Operation.QUERY:
349 for tag in evaled_operands:
350 if isinstance(tag, str):
353 key, value = tag.split(":")
354 except ValueError as v:
356 f'Invalid key:value syntax at "{tag}"'
359 r = self.corpus.get_docids_with_property(key)
361 r = self.corpus.get_docids_by_property(key, value)
363 r = self.corpus.get_docids_by_exact_tag(tag)
366 raise ParseError(f"Unexpected query {tag}")
367 elif self.op is Operation.DISJUNCTION:
368 if len(evaled_operands) != 2:
369 raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
370 retval.update(evaled_operands[0])
371 retval.update(evaled_operands[1])
372 elif self.op is Operation.CONJUNCTION:
373 if len(evaled_operands) != 2:
374 raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
375 retval.update(evaled_operands[0])
376 retval = retval.intersection(evaled_operands[1])
377 elif self.op is Operation.INVERSION:
378 if len(evaled_operands) != 1:
379 raise ParseError("Operation.INVERSION (not) expects one operand.")
380 _ = evaled_operands[0]
381 if isinstance(_, set):
382 retval.update(self.corpus.invert_docid_set(_))
384 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
388 if __name__ == '__main__':