Source code for aw_transform.heartbeats

import logging
from datetime import timedelta
from typing import List, Optional

from aw_core.models import Event

logger = logging.getLogger(__name__)


[docs]def heartbeat_reduce(events: List[Event], pulsetime: float) -> List[Event]: """Merges consecutive events together according to the rules of `heartbeat_merge`.""" reduced = [] if events: reduced.append(events.pop(0)) for heartbeat in events: merged = heartbeat_merge(reduced[-1], heartbeat, pulsetime) if merged is not None: # Heartbeat was merged reduced[-1] = merged else: # Heartbeat was not merged reduced.append(heartbeat) return reduced
[docs]def heartbeat_merge( last_event: Event, heartbeat: Event, pulsetime: float ) -> Optional[Event]: """ Merges two events if they have identical data and the heartbeat timestamp is within the pulsetime window. """ if last_event.data == heartbeat.data: # Seconds between end of last_event and start of heartbeat pulseperiod_end = ( last_event.timestamp + last_event.duration + timedelta(seconds=pulsetime) ) within_pulsetime_window = ( last_event.timestamp <= heartbeat.timestamp <= pulseperiod_end ) if within_pulsetime_window: # Seconds between end of last_event and start of timestamp new_duration = ( heartbeat.timestamp - last_event.timestamp ) + heartbeat.duration if last_event.duration < timedelta(0): logger.warning( "Merging heartbeats would result in a negative duration, refusing to merge." ) else: # Taking the max of durations ensures heartbeats that end before the last event don't shorten it last_event.duration = max((last_event.duration, new_duration)) return last_event return None