Source code for aw_transform.heartbeats

import logging
from datetime import timedelta
from typing import List, Optional

from aw_core.models import Event

logger = logging.getLogger(__name__)


[docs]def heartbeat_reduce(events: List[Event], pulsetime: float) -> List[Event]: """Merges consecutive events together according to the rules of `heartbeat_merge`.""" reduced = [] if events: reduced.append(events.pop(0)) for heartbeat in events: merged = heartbeat_merge(reduced[-1], heartbeat, pulsetime) if merged is not None: # Heartbeat was merged reduced[-1] = merged else: # Heartbeat was not merged reduced.append(heartbeat) return reduced
[docs]def heartbeat_merge(last_event: Event, heartbeat: Event, pulsetime: float) -> Optional[Event]: """ Merges two events if they have identical data and the heartbeat timestamp is within the pulsetime window. """ if last_event.data == heartbeat.data: # Seconds between end of last_event and start of heartbeat pulseperiod_end = last_event.timestamp + last_event.duration + timedelta(seconds=pulsetime) within_pulsetime_window = last_event.timestamp <= heartbeat.timestamp <= pulseperiod_end if within_pulsetime_window: # Seconds between end of last_event and start of timestamp new_duration = (heartbeat.timestamp - last_event.timestamp) + heartbeat.duration if last_event.duration < timedelta(0): logger.warning("Merging heartbeats would result in a negative duration, refusing to merge.") else: last_event.duration = new_duration return last_event return None