Source code for aw_transform.classify

from typing import Pattern, List, Iterable, Tuple, Dict, Optional, Any
from functools import reduce
import re

from aw_core import Event

Tag = str
Category = List[str]

[docs]class Rule: regex: Optional[Pattern] select_keys: Optional[List[str]] ignore_case: bool def __init__(self, rules: Dict[str, Any]): self.select_keys = rules.get("select_keys", None) self.ignore_case = rules.get("ignore_case", False) # NOTE: Also checks that the regex isn't an empty string (which would erroneously match everything) regex_str = rules.get("regex", None) self.regex = re.compile(regex_str, (re.IGNORECASE if self.ignore_case else 0) | re.UNICODE) if regex_str else None
[docs] def match(self, e: Event) -> bool: if self.select_keys: values = [, None) for key in self.select_keys] else: values = list( if self.regex: for val in values: if isinstance(val, str) and return True return False
[docs]def categorize(events: List[Event], classes: List[Tuple[Category, Rule]]): return [_categorize_one(e, classes) for e in events]
def _categorize_one(e: Event, classes: List[Tuple[Category, Rule]]) -> Event:["$category"] = _pick_category([_cls for _cls, rule in classes if rule.match(e)]) return e
[docs]def tag(events: List[Event], classes: List[Tuple[Tag, Rule]]): return [_tag_one(e, classes) for e in events]
def _tag_one(e: Event, classes: List[Tuple[Tag, Rule]]) -> Event:["$tags"] = [_cls for _cls, rule in classes if rule.match(e)] return e def _pick_category(tags: Iterable[Category]) -> Category: return reduce(_pick_deepest_cat, tags, ["Uncategorized"]) def _pick_deepest_cat(t1: Category, t2: Category) -> Category: # t1 will be the accumulator when used in reduce # Always bias against t1, since it could be "Uncategorized" return t2 if len(t2) >= len(t1) else t1