3 from __future__ import annotations
5 from collections import defaultdict
21 class ParseError(Exception):
22 """An error encountered while parsing a logical search expression."""
24 def __init__(self, message: str):
25 self.message = message
28 class Document(NamedTuple):
29 """A tuple representing a searchable document."""
31 docid: str # a unique idenfier for the document
32 tags: Set[str] # an optional set of tags
35 ] # an optional set of key->value properties
36 reference: Any # an optional reference to something else
39 class Operation(enum.Enum):
40 """A logical search query operation."""
48 def from_token(token: str):
50 "not": Operation.INVERSION,
51 "and": Operation.CONJUNCTION,
52 "or": Operation.DISJUNCTION,
54 return table.get(token, None)
56 def num_operands(self) -> Optional[int]:
58 Operation.INVERSION: 1,
59 Operation.CONJUNCTION: 2,
60 Operation.DISJUNCTION: 2,
62 return table.get(self, None)
66 """A collection of searchable documents."""
68 def __init__(self) -> None:
69 self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
70 self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(
73 self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
74 self.documents_by_docid: Dict[str, Document] = {}
76 def add_doc(self, doc: Document) -> None:
77 """Add a new Document to the Corpus. Each Document must have a
78 distinct docid that will serve as its primary identifier. If
79 the same Document is added multiple times, only the most
80 recent addition is indexed. If two distinct documents with
81 the same docid are added, the latter klobbers the former in the
84 Each Document may have an optional set of tags which can be
85 used later in expressions to the query method.
87 Each Document may have an optional list of key->value tuples
88 which can be used later in expressions to the query method.
90 Document includes a user-defined "reference" field which is
91 never interpreted by this module. This is meant to allow easy
92 mapping between Documents in this corpus and external objects
96 if doc.docid in self.documents_by_docid:
97 # Handle collisions; assume that we are re-indexing the
98 # same document so remove it from the indexes before
99 # adding it back again.
100 colliding_doc = self.documents_by_docid[doc.docid]
101 assert colliding_doc.docid == doc.docid
102 del self.documents_by_docid[doc.docid]
103 for tag in colliding_doc.tags:
104 self.docids_by_tag[tag].remove(doc.docid)
105 for key, value in colliding_doc.properties:
106 self.docids_by_property[(key, value)].remove(doc.docid)
107 self.docids_with_property[key].remove(doc.docid)
109 # Index the new Document
110 assert doc.docid not in self.documents_by_docid
111 self.documents_by_docid[doc.docid] = doc
113 self.docids_by_tag[tag].add(doc.docid)
114 for key, value in doc.properties:
115 self.docids_by_property[(key, value)].add(doc.docid)
116 self.docids_with_property[key].add(doc.docid)
118 def get_docids_by_exact_tag(self, tag: str) -> Set[str]:
119 """Return the set of docids that have a particular tag."""
121 return self.docids_by_tag[tag]
123 def get_docids_by_searching_tags(self, tag: str) -> Set[str]:
124 """Return the set of docids with a tag that contains a str"""
127 for search_tag in self.docids_by_tag:
128 if tag in search_tag:
129 for docid in self.docids_by_tag[search_tag]:
133 def get_docids_with_property(self, key: str) -> Set[str]:
134 """Return the set of docids that have a particular property no matter
135 what that property's value.
138 return self.docids_with_property[key]
140 def get_docids_by_property(self, key: str, value: str) -> Set[str]:
141 """Return the set of docids that have a particular property with a
145 return self.docids_by_property[(key, value)]
147 def invert_docid_set(self, original: Set[str]) -> Set[str]:
148 """Invert a set of docids."""
153 for docid in self.documents_by_docid.keys()
154 if docid not in original
158 def get_doc(self, docid: str) -> Optional[Document]:
159 """Given a docid, retrieve the previously added Document."""
161 return self.documents_by_docid.get(docid, None)
163 def query(self, query: str) -> Optional[Set[str]]:
164 """Query the corpus for documents that match a logical expression.
165 Returns a (potentially empty) set of docids for the matching
166 (previously added) documents or None on error.
170 tag1 and tag2 and not tag3
172 (tag1 or tag2) and (tag3 or tag4)
174 (tag1 and key2:value2) or (tag2 and key1:value1)
182 root = self._parse_query(query)
183 except ParseError as e:
184 print(e.message, file=sys.stderr)
188 def _parse_query(self, query: str):
189 """Internal parse helper; prefer to use query instead."""
191 parens = set(["(", ")"])
192 and_or = set(["and", "or"])
194 def operator_precedence(token: str) -> Optional[int]:
202 return table.get(token, None)
204 def is_operator(token: str) -> bool:
205 return operator_precedence(token) is not None
208 query = query.lower()
209 tokens = query.split()
211 # Handle ( and ) operators stuck to the ends of tokens
212 # that split() doesn't understand.
226 def evaluate(corpus: Corpus, stack: List[str]):
227 node_stack: List[Node] = []
230 if not is_operator(token):
231 node = Node(corpus, Operation.QUERY, [token])
234 operation = Operation.from_token(token)
235 operand_count = operation.num_operands()
236 if len(node_stack) < operand_count:
238 f"Incorrect number of operations for {operation}"
240 for _ in range(operation.num_operands()):
241 args.append(node_stack.pop())
242 node = Node(corpus, operation, args)
243 node_stack.append(node)
248 for token in lex(query):
249 if not is_operator(token):
250 output_stack.append(token)
253 # token is an operator...
255 operator_stack.append(token)
258 while len(operator_stack) > 0:
259 pop_operator = operator_stack.pop()
260 if pop_operator != "(":
261 output_stack.append(pop_operator)
267 "Unbalanced parenthesis in query expression"
272 my_precedence = operator_precedence(token)
273 if my_precedence is None:
274 raise ParseError(f"Unknown operator: {token}")
275 while len(operator_stack) > 0:
276 peek_operator = operator_stack[-1]
277 if not is_operator(peek_operator) or peek_operator == "(":
279 peek_precedence = operator_precedence(peek_operator)
280 if peek_precedence is None:
281 raise ParseError("Internal error")
283 (peek_precedence < my_precedence)
284 or (peek_precedence == my_precedence)
285 and (peek_operator not in and_or)
288 output_stack.append(operator_stack.pop())
289 operator_stack.append(token)
290 while len(operator_stack) > 0:
291 token = operator_stack.pop()
293 raise ParseError("Unbalanced parenthesis in query expression")
294 output_stack.append(token)
295 return evaluate(self, output_stack)
299 """A query AST node."""
305 operands: Sequence[Union[Node, str]],
309 self.operands = operands
311 def eval(self) -> Set[str]:
312 """Evaluate this node."""
314 evaled_operands: List[Union[Set[str], str]] = []
315 for operand in self.operands:
316 if isinstance(operand, Node):
317 evaled_operands.append(operand.eval())
318 elif isinstance(operand, str):
319 evaled_operands.append(operand)
321 raise ParseError(f"Unexpected operand: {operand}")
324 if self.op is Operation.QUERY:
325 for tag in evaled_operands:
326 if isinstance(tag, str):
329 key, value = tag.split(":")
330 except ValueError as v:
332 f'Invalid key:value syntax at "{tag}"'
335 r = self.corpus.get_docids_with_property(key)
337 r = self.corpus.get_docids_by_property(key, value)
339 r = self.corpus.get_docids_by_exact_tag(tag)
342 raise ParseError(f"Unexpected query {tag}")
343 elif self.op is Operation.DISJUNCTION:
344 if len(evaled_operands) != 2:
346 "Operation.DISJUNCTION (or) expects two operands."
348 retval.update(evaled_operands[0])
349 retval.update(evaled_operands[1])
350 elif self.op is Operation.CONJUNCTION:
351 if len(evaled_operands) != 2:
353 "Operation.CONJUNCTION (and) expects two operands."
355 retval.update(evaled_operands[0])
356 retval = retval.intersection(evaled_operands[1])
357 elif self.op is Operation.INVERSION:
358 if len(evaled_operands) != 1:
360 "Operation.INVERSION (not) expects one operand."
362 _ = evaled_operands[0]
363 if isinstance(_, set):
364 retval.update(self.corpus.invert_docid_set(_))
366 raise ParseError(f"Unexpected negation operand {_} ({type(_)})")