Source code for aw_transform.filter_period_intersect

import logging
from typing import List, Iterable, Tuple
from copy import deepcopy

from aw_core import Event
from timeslot import Timeslot

logger = logging.getLogger(__name__)


def _get_event_period(event: Event) -> Timeslot:
    start = event.timestamp
    end = start + event.duration
    return Timeslot(start, end)


def _replace_event_period(event: Event, period: Timeslot) -> Event:
    e = deepcopy(event)
    e.timestamp = period.start
    e.duration = period.duration
    return e


def _intersecting_eventpairs(
    events1: List[Event], events2: List[Event]
) -> Iterable[Tuple[Event, Event, Timeslot]]:
    """A generator that yields each overlapping pair of events from two eventlists along with a Timeslot of the intersection"""
    events1.sort(key=lambda e: e.timestamp)
    events2.sort(key=lambda e: e.timestamp)
    e1_i = 0
    e2_i = 0
    while e1_i < len(events1) and e2_i < len(events2):
        e1 = events1[e1_i]
        e2 = events2[e2_i]
        e1_p = _get_event_period(e1)
        e2_p = _get_event_period(e2)

        ip = e1_p.intersection(e2_p)
        if ip:
            # If events intersected, yield events
            yield (e1, e2, ip)
            if e1_p.end <= e2_p.end:
                e1_i += 1
            else:
                e2_i += 1
        else:
            # No intersection, check if event is before/after filterevent
            if e1_p.end <= e2_p.start:
                # Event ended before filter event started
                e1_i += 1
            elif e2_p.end <= e1_p.start:
                # Event started after filter event ended
                e2_i += 1
            else:
                logger.error("Should be unreachable, skipping period")
                e1_i += 1
                e2_i += 1


[docs]def filter_period_intersect( events: List[Event], filterevents: List[Event] ) -> List[Event]: """ Filters away all events or time periods of events in which a filterevent does not have an intersecting time period. Useful for example when you want to filter away events or part of events during which a user was AFK. Usage: windowevents_notafk = filter_period_intersect(windowevents, notafkevents) Example: .. code-block:: none events1 | ======= ======== | events2 | ------ --- --- ---- | result | ==== = ==== | A JavaScript version used to exist in aw-webui but was removed in `this PR <https://github.com/ActivityWatch/aw-webui/pull/48>`_. """ events = sorted(events) filterevents = sorted(filterevents) return [ _replace_event_period(e1, ip) for (e1, _, ip) in _intersecting_eventpairs(events, filterevents) ]
[docs]def period_union(events1: List[Event], events2: List[Event]) -> List[Event]: """ Takes a list of two events and returns a new list of events covering the union of the timeperiods contained in the eventlists with no overlapping events. .. warning:: This function strips all data from events as it cannot keep it consistent. Example: .. code-block:: none events1 | ------- --------- | events2 | ------ --- -- ---- | result | ----------- -- --------- | """ events = sorted(events1 + events2) merged_events = [] if events: merged_events.append(events.pop(0)) for e in events: last_event = merged_events[-1] e_p = _get_event_period(e) le_p = _get_event_period(last_event) if not e_p.gap(le_p): new_period = e_p.union(le_p) merged_events[-1] = _replace_event_period(last_event, new_period) else: merged_events.append(e) for event in merged_events: # Clear data event.data = {} return merged_events
[docs]def union(events1: List[Event], events2: List[Event]) -> List[Event]: """ Concatenates and sorts union of 2 event lists and removes duplicates. Example: Merges events from a backup-bucket with events from a "living" bucket. .. code-block:: python events = union(events_backup, events_living) """ events1 = sorted(events1, key=lambda e: (e.timestamp, e.duration)) events2 = sorted(events2, key=lambda e: (e.timestamp, e.duration)) events_union = [] e1_i = 0 e2_i = 0 while e1_i < len(events1) and e2_i < len(events2): e1 = events1[e1_i] e2 = events2[e2_i] if e1 == e2: events_union.append(e1) e1_i += 1 e2_i += 1 else: if e1.timestamp < e2.timestamp: events_union.append(e1) e1_i += 1 elif e1.timestamp > e2.timestamp: events_union.append(e2) e2_i += 1 elif e1.duration < e2.duration: events_union.append(e1) e1_i += 1 else: events_union.append(e2) e2_i += 1 if e1_i < len(events1): events_union.extend(events1[e1_i:]) if e2_i < len(events2): events_union.extend(events2[e2_i:]) return events_union