3 from __future__ import annotations
5 from collections import defaultdict
21 class ParseError(Exception):
22 """An error encountered while parsing a logical search expression."""
24 def __init__(self, message: str):
25 self.message = message
28 class Document(NamedTuple):
29 """A tuple representing a searchable document."""
31 docid: str # a unique idenfier for the document
32 tags: Set[str] # an optional set of tags
35 ] # an optional set of key->value properties
36 reference: Any # an optional reference to something else
39 class Operation(enum.Enum):
40 """A logical search query operation."""
48 def from_token(token: str):
50 "not": Operation.INVERSION,
51 "and": Operation.CONJUNCTION,
52 "or": Operation.DISJUNCTION,
54 return table.get(token, None)
56 def num_operands(self) -> Optional[int]:
58 Operation.INVERSION: 1,
59 Operation.CONJUNCTION: 2,
60 Operation.DISJUNCTION: 2,
62 return table.get(self, None)
66 """A collection of searchable documents.
69 >>> c.add_doc(Document(
71 ... tags=set(['urgent', 'important']),
73 ... ('author', 'Scott'),
74 ... ('subject', 'your anniversary')
79 >>> c.add_doc(Document(
81 ... tags=set(['important']),
83 ... ('author', 'Joe'),
84 ... ('subject', 'your performance at work')
89 >>> c.query('author:Scott and important')
93 def __init__(self) -> None:
94 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
95 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(
98 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
99 self.documents_by_docid: Dict[str, Document] = {}
101 def add_doc(self, doc: Document) -> None:
102 """Add a new Document to the Corpus. Each Document must have a
103 distinct docid that will serve as its primary identifier. If
104 the same Document is added multiple times, only the most
105 recent addition is indexed. If two distinct documents with
106 the same docid are added, the latter klobbers the former in the
109 Each Document may have an optional set of tags which can be
110 used later in expressions to the query method.
112 Each Document may have an optional list of key->value tuples
113 which can be used later in expressions to the query method.
115 Document includes a user-defined "reference" field which is
116 never interpreted by this module. This is meant to allow easy
117 mapping between Documents in this corpus and external objects
121 if doc.docid in self.documents_by_docid:
122 # Handle collisions; assume that we are re-indexing the
123 # same document so remove it from the indexes before
124 # adding it back again.
125 colliding_doc = self.documents_by_docid[doc.docid]
126 assert colliding_doc.docid == doc.docid
127 del self.documents_by_docid[doc.docid]
128 for tag in colliding_doc.tags:
129 self.docids_by_tag[tag].remove(doc.docid)
130 for key, value in colliding_doc.properties:
131 self.docids_by_property[(key, value)].remove(doc.docid)
132 self.docids_with_property[key].remove(doc.docid)
134 # Index the new Document
135 assert doc.docid not in self.documents_by_docid
136 self.documents_by_docid[doc.docid] = doc
138 self.docids_by_tag[tag].add(doc.docid)
139 for key, value in doc.properties:
140 self.docids_by_property[(key, value)].add(doc.docid)
141 self.docids_with_property[key].add(doc.docid)
143 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
144 """Return the set of docids that have a particular tag."""
146 return self.docids_by_tag[tag]
148 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
149 """Return the set of docids with a tag that contains a str"""
152 for search_tag in self.docids_by_tag:
153 if tag in search_tag:
154 for docid in self.docids_by_tag[search_tag]:
158 def get_docids_with_property(self, key: str) -> Set[str]:
159 """Return the set of docids that have a particular property no matter
160 what that property's value.
163 return self.docids_with_property[key]
165 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
166 """Return the set of docids that have a particular property with a
170 return self.docids_by_property[(key, value)]
172 def invert_docid_set(self, original: Set[str]) -> Set[str]:
173 """Invert a set of docids."""
178 for docid in self.documents_by_docid.keys()
179 if docid not in original
183 def get_doc(self, docid: str) -> Optional[Document]:
184 """Given a docid, retrieve the previously added Document."""
186 return self.documents_by_docid.get(docid, None)
188 def query(self, query: str) -> Optional[Set[str]]:
189 """Query the corpus for documents that match a logical expression.
190 Returns a (potentially empty) set of docids for the matching
191 (previously added) documents or None on error.
195 tag1 and tag2 and not tag3
197 (tag1 or tag2) and (tag3 or tag4)
199 (tag1 and key2:value2) or (tag2 and key1:value1)
207 root = self._parse_query(query)
208 except ParseError as e:
209 print(e.message, file=sys.stderr)
213 def _parse_query(self, query: str):
214 """Internal parse helper; prefer to use query instead."""
216 parens = set(["(", ")"])
217 and_or = set(["and", "or"])
219 def operator_precedence(token: str) -> Optional[int]:
227 return table.get(token, None)
229 def is_operator(token: str) -> bool:
230 return operator_precedence(token) is not None
233 tokens = query.split()
235 # Handle ( and ) operators stuck to the ends of tokens
236 # that split() doesn't understand.
250 def evaluate(corpus: Corpus, stack: List[str]):
251 node_stack: List[Node] = []
254 if not is_operator(token):
255 node = Node(corpus, Operation.QUERY, [token])
258 operation = Operation.from_token(token)
259 operand_count = operation.num_operands()
260 if len(node_stack) < operand_count:
262 f"Incorrect number of operations for {operation}"
264 for _ in range(operation.num_operands()):
265 args.append(node_stack.pop())
266 node = Node(corpus, operation, args)
267 node_stack.append(node)
272 for token in lex(query):
273 if not is_operator(token):
274 output_stack.append(token)
277 # token is an operator...
279 operator_stack.append(token)
282 while len(operator_stack) > 0:
283 pop_operator = operator_stack.pop()
284 if pop_operator != "(":
285 output_stack.append(pop_operator)
291 "Unbalanced parenthesis in query expression"
296 my_precedence = operator_precedence(token)
297 if my_precedence is None:
298 raise ParseError(f"Unknown operator: {token}")
299 while len(operator_stack) > 0:
300 peek_operator = operator_stack[-1]
301 if not is_operator(peek_operator) or peek_operator == "(":
303 peek_precedence = operator_precedence(peek_operator)
304 if peek_precedence is None:
305 raise ParseError("Internal error")
307 (peek_precedence < my_precedence)
308 or (peek_precedence == my_precedence)
309 and (peek_operator not in and_or)
312 output_stack.append(operator_stack.pop())
313 operator_stack.append(token)
314 while len(operator_stack) > 0:
315 token = operator_stack.pop()
317 raise ParseError("Unbalanced parenthesis in query expression")
318 output_stack.append(token)
319 return evaluate(self, output_stack)
323 """A query AST node."""
329 operands: Sequence[Union[Node, str]],
333 self.operands = operands
335 def eval(self) -> Set[str]:
336 """Evaluate this node."""
338 evaled_operands: List[Union[Set[str], str]] = []
339 for operand in self.operands:
340 if isinstance(operand, Node):
341 evaled_operands.append(operand.eval())
342 elif isinstance(operand, str):
343 evaled_operands.append(operand)
345 raise ParseError(f"Unexpected operand: {operand}")
348 if self.op is Operation.QUERY:
349 for tag in evaled_operands:
350 if isinstance(tag, str):
353 key, value = tag.split(":")
354 except ValueError as v:
356 f'Invalid key:value syntax at "{tag}"'
359 r = self.corpus.get_docids_with_property(key)
361 r = self.corpus.get_docids_by_property(key, value)
363 r = self.corpus.get_docids_by_exact_tag(tag)
366 raise ParseError(f"Unexpected query {tag}")
367 elif self.op is Operation.DISJUNCTION:
368 if len(evaled_operands) != 2:
370 "Operation.DISJUNCTION (or) expects two operands."
372 retval.update(evaled_operands[0])
373 retval.update(evaled_operands[1])
374 elif self.op is Operation.CONJUNCTION:
375 if len(evaled_operands) != 2:
377 "Operation.CONJUNCTION (and) expects two operands."
379 retval.update(evaled_operands[0])
380 retval = retval.intersection(evaled_operands[1])
381 elif self.op is Operation.INVERSION:
382 if len(evaled_operands) != 1:
384 "Operation.INVERSION (not) expects one operand."
386 _ = evaled_operands[0]
387 if isinstance(_, set):
388 retval.update(self.corpus.invert_docid_set(_))
390 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")
394 if __name__ == '__main__':