from typing import Pattern, List, Iterable, Tuple, Dict, Optional, Any
from functools import reduce
import re
from aw_core import Event
Tag = str
Category = List[str]
[docs]class Rule:
regex: Optional[Pattern]
select_keys: Optional[List[str]]
ignore_case: bool
def __init__(self, rules: Dict[str, Any]) -> None:
self.select_keys = rules.get("select_keys", None)
self.ignore_case = rules.get("ignore_case", False)
# NOTE: Also checks that the regex isn't an empty string (which would erroneously match everything)
regex_str = rules.get("regex", None)
self.regex = (
re.compile(
regex_str, (re.IGNORECASE if self.ignore_case else 0) | re.UNICODE
)
if regex_str
else None
)
[docs] def match(self, e: Event) -> bool:
if self.select_keys:
values = [e.data.get(key, None) for key in self.select_keys]
else:
values = list(e.data.values())
if self.regex:
for val in values:
if isinstance(val, str) and self.regex.search(val):
return True
return False
[docs]def categorize(
events: List[Event], classes: List[Tuple[Category, Rule]]
) -> List[Event]:
return [_categorize_one(e, classes) for e in events]
def _categorize_one(e: Event, classes: List[Tuple[Category, Rule]]) -> Event:
e.data["$category"] = _pick_category(
[_cls for _cls, rule in classes if rule.match(e)]
)
return e
[docs]def tag(events: List[Event], classes: List[Tuple[Tag, Rule]]) -> List[Event]:
return [_tag_one(e, classes) for e in events]
def _tag_one(e: Event, classes: List[Tuple[Tag, Rule]]) -> Event:
e.data["$tags"] = [_cls for _cls, rule in classes if rule.match(e)]
return e
def _pick_category(tags: Iterable[Category]) -> Category:
return reduce(_pick_deepest_cat, tags, ["Uncategorized"])
def _pick_deepest_cat(t1: Category, t2: Category) -> Category:
# t1 will be the accumulator when used in reduce
# Always bias against t1, since it could be "Uncategorized"
return t2 if len(t2) >= len(t1) else t1