from datetime import timedelta
from functools import wraps
from inspect import signature
from typing import (
Any,
Callable,
Dict,
List,
Optional,
)
import iso8601
from aw_core.models import Event
from aw_datastore import Datastore
from aw_transform import (
Rule,
categorize,
chunk_events_by_key,
concat,
filter_keyvals,
filter_keyvals_regex,
filter_period_intersect,
flood,
limit_events,
merge_events_by_keys,
period_union,
simplify_string,
sort_by_duration,
sort_by_timestamp,
split_url_events,
sum_durations,
tag,
union_no_overlap,
)
from .exceptions import QueryFunctionException
def _verify_bucket_exists(datastore, bucketname):
if bucketname in datastore.buckets():
return
else:
raise QueryFunctionException(f"There's no bucket named '{bucketname}'")
def _verify_variable_is_type(variable, t):
if not isinstance(variable, t):
raise QueryFunctionException(
f"Variable '{variable}' passed to function call is of invalid type. Expected {t} but was {type(variable)}"
)
# TODO: proper type checking (typecheck-decorator in pypi?)
TNamespace = Dict[str, Any]
TQueryFunction = Callable[..., Any]
"""
Declarations
"""
functions: Dict[str, TQueryFunction] = {}
def q2_function(transform_func=None):
"""
Decorator used to register query functions.
Automatically adds mock arguments for Datastore and TNamespace
if not in function signature.
"""
def h(f):
sig = signature(f)
# If function lacks docstring, use docstring from underlying function in aw_transform
if transform_func and transform_func.__doc__ and not f.__doc__:
f.__doc__ = f".. note:: Documentation automatically copied from underlying function `aw_transform.{transform_func.__name__}`\n\n{transform_func.__doc__}"
@wraps(f)
def g(datastore: Datastore, namespace: TNamespace, *args, **kwargs):
# Remove datastore and namespace argument for functions that don't need it
args = (datastore, namespace, *args)
if TNamespace not in (sig.parameters[p].annotation for p in sig.parameters):
args = (args[0], *args[2:])
if Datastore not in (sig.parameters[p].annotation for p in sig.parameters):
args = args[1:]
return f(*args, **kwargs)
fname = f.__name__
if fname[:3] == "q2_":
fname = fname[3:]
functions[fname] = g
return g
return h
def q2_typecheck(f):
"""Decorator that typechecks using `_verify_variable_is_type`"""
sig = signature(f)
@wraps(f)
def g(*args, **kwargs):
# FIXME: If the first argument passed to a query2 function is a straight [] then the second argument disappears from the argument list for unknown reasons, which breaks things
for i, p in enumerate(sig.parameters):
param = sig.parameters[p]
# print(f"Checking that param ({param}) was {param.annotation}, value: {args[i]}")
# FIXME: Won't check keyword arguments
if (
param.annotation in [list, str, int, float]
and param.default == param.empty
):
_verify_variable_is_type(args[i], param.annotation)
return f(*args, **kwargs)
return g
"""
Getting buckets
"""
[docs]@q2_function()
@q2_typecheck
def q2_find_bucket(
datastore: Datastore, filter_str: str, hostname: Optional[str] = None
):
"""Find bucket by using a filter_str (to avoid hardcoding bucket names)"""
for bucket in datastore.buckets():
if filter_str in bucket:
bucket_metadata = datastore[bucket].metadata()
if hostname:
if bucket_metadata["hostname"] == hostname:
return bucket
else:
return bucket
raise QueryFunctionException(
f"Unable to find bucket matching '{filter_str}' (hostname filter set to '{hostname}')"
)
"""
Data gathering functions
"""
[docs]@q2_function()
@q2_typecheck
def q2_query_bucket(
datastore: Datastore, namespace: TNamespace, bucketname: str
) -> List[Event]:
_verify_bucket_exists(datastore, bucketname)
try:
starttime = iso8601.parse_date(namespace["STARTTIME"])
endtime = iso8601.parse_date(namespace["ENDTIME"])
except iso8601.ParseError:
raise QueryFunctionException(
"Unable to parse starttime/endtime for query_bucket"
) from None
return datastore[bucketname].get(starttime=starttime, endtime=endtime)
[docs]@q2_function()
@q2_typecheck
def q2_query_bucket_eventcount(
datastore: Datastore, namespace: TNamespace, bucketname: str
) -> int:
_verify_bucket_exists(datastore, bucketname)
starttime = iso8601.parse_date(namespace["STARTTIME"])
endtime = iso8601.parse_date(namespace["ENDTIME"])
return datastore[bucketname].get_eventcount(starttime=starttime, endtime=endtime)
"""
Filtering functions
"""
[docs]@q2_function(filter_keyvals)
@q2_typecheck
def q2_filter_keyvals(events: list, key: str, vals: list) -> List[Event]:
return filter_keyvals(events, key, vals, False)
[docs]@q2_function(filter_keyvals)
@q2_typecheck
def q2_exclude_keyvals(events: list, key: str, vals: list) -> List[Event]:
return filter_keyvals(events, key, vals, True)
[docs]@q2_function(filter_keyvals_regex)
@q2_typecheck
def q2_filter_keyvals_regex(events: list, key: str, regex: str) -> List[Event]:
return filter_keyvals_regex(events, key, regex)
[docs]@q2_function(filter_period_intersect)
@q2_typecheck
def q2_filter_period_intersect(events: list, filterevents: list) -> List[Event]:
return filter_period_intersect(events, filterevents)
[docs]@q2_function(period_union)
@q2_typecheck
def q2_period_union(events1: list, events2: list) -> List[Event]:
return period_union(events1, events2)
[docs]@q2_function(limit_events)
@q2_typecheck
def q2_limit_events(events: list, count: int) -> List[Event]:
return limit_events(events, count)
"""
Merge functions
"""
[docs]@q2_function(merge_events_by_keys)
@q2_typecheck
def q2_merge_events_by_keys(events: list, keys: list) -> List[Event]:
return merge_events_by_keys(events, keys)
[docs]@q2_function(chunk_events_by_key)
@q2_typecheck
def q2_chunk_events_by_key(events: list, key: str) -> List[Event]:
return chunk_events_by_key(events, key)
"""
Sort functions
"""
[docs]@q2_function(sort_by_timestamp)
@q2_typecheck
def q2_sort_by_timestamp(events: list) -> List[Event]:
return sort_by_timestamp(events)
[docs]@q2_function(sort_by_duration)
@q2_typecheck
def q2_sort_by_duration(events: list) -> List[Event]:
return sort_by_duration(events)
"""
Summarizing functions
"""
[docs]@q2_function(sum_durations)
@q2_typecheck
def q2_sum_durations(events: list) -> timedelta:
return sum_durations(events)
[docs]@q2_function(concat)
@q2_typecheck
def q2_concat(events1: list, events2: list) -> List[Event]:
return concat(events1, events2)
[docs]@q2_function(union_no_overlap)
@q2_typecheck
def q2_union_no_overlap(events1: list, events2: list) -> List[Event]:
return union_no_overlap(events1, events2)
"""
Flood functions
"""
[docs]@q2_function(flood)
@q2_typecheck
def q2_flood(events: list) -> List[Event]:
return flood(events)
"""
Watcher specific functions
"""
[docs]@q2_function(split_url_events)
@q2_typecheck
def q2_split_url_events(events: list) -> List[Event]:
return split_url_events(events)
[docs]@q2_function(simplify_string)
@q2_typecheck
def q2_simplify_window_titles(events: list, key: str) -> List[Event]:
return simplify_string(events, key=key)
"""
Test functions
"""
[docs]@q2_function()
@q2_typecheck
def q2_nop():
"""No operation function for unittesting"""
return 1
"""
Classify
"""
[docs]@q2_function(categorize)
@q2_typecheck
def q2_categorize(events: list, classes: list):
classes = [(_cls, Rule(rule_dict)) for _cls, rule_dict in classes]
return categorize(events, classes)
[docs]@q2_function(tag)
@q2_typecheck
def q2_tag(events: list, classes: list):
classes = [(_cls, Rule(rule_dict)) for _cls, rule_dict in classes]
return tag(events, classes)