Source code for aw_server.api

import functools
import json
import logging
from datetime import datetime
from pathlib import Path
from socket import gethostname
from typing import (
    Any,
    Dict,
    List,
    Optional,
)
from uuid import uuid4

import iso8601
from aw_core.dirs import get_data_dir
from aw_core.log import get_log_file_path
from aw_core.models import Event
from aw_query import query2
from aw_transform import heartbeat_merge

from .__about__ import __version__
from .exceptions import NotFound

logger = logging.getLogger(__name__)


def get_device_id() -> str:
    path = Path(get_data_dir("aw-server")) / "device_id"
    if path.exists():
        with open(path) as f:
            return f.read()
    else:
        uuid = str(uuid4())
        with open(path, "w") as f:
            f.write(uuid)
        return uuid


def check_bucket_exists(f):
    @functools.wraps(f)
    def g(self, bucket_id, *args, **kwargs):
        if bucket_id not in self.db.buckets():
            raise NotFound("NoSuchBucket", f"There's no bucket named {bucket_id}")
        return f(self, bucket_id, *args, **kwargs)

    return g


[docs]class ServerAPI: def __init__(self, db, testing) -> None: self.db = db self.testing = testing self.last_event = {} # type: dict
[docs] def get_info(self) -> Dict[str, Any]: """Get server info""" payload = { "hostname": gethostname(), "version": __version__, "testing": self.testing, "device_id": get_device_id(), } return payload
[docs] def get_buckets(self) -> Dict[str, Dict]: """Get dict {bucket_name: Bucket} of all buckets""" logger.debug("Received get request for buckets") buckets = self.db.buckets() for b in buckets: # TODO: Move this code to aw-core? last_events = self.db[b].get(limit=1) if len(last_events) > 0: last_event = last_events[0] last_updated = last_event.timestamp + last_event.duration buckets[b]["last_updated"] = last_updated.isoformat() return buckets
[docs] @check_bucket_exists def get_bucket_metadata(self, bucket_id: str) -> Dict[str, Any]: """Get metadata about bucket.""" bucket = self.db[bucket_id] return bucket.metadata()
[docs] @check_bucket_exists def export_bucket(self, bucket_id: str) -> Dict[str, Any]: """Export a bucket to a dataformat consistent across versions, including all events in it.""" bucket = self.get_bucket_metadata(bucket_id) bucket["events"] = self.get_events(bucket_id, limit=-1) # Scrub event IDs for event in bucket["events"]: del event["id"] return bucket
[docs] def export_all(self) -> Dict[str, Any]: """Exports all buckets and their events to a format consistent across versions""" buckets = self.get_buckets() exported_buckets = {} for bid in buckets.keys(): exported_buckets[bid] = self.export_bucket(bid) return exported_buckets
[docs] def import_bucket(self, bucket_data: Any): bucket_id = bucket_data["id"] logger.info(f"Importing bucket {bucket_id}") # TODO: Check that bucket doesn't already exist self.db.create_bucket( bucket_id, type=bucket_data["type"], client=bucket_data["client"], hostname=bucket_data["hostname"], created=( bucket_data["created"] if isinstance(bucket_data["created"], datetime) else iso8601.parse_date(bucket_data["created"]) ), ) # scrub IDs from events # (otherwise causes weird bugs with no events seemingly imported when importing events exported from aw-server-rust, which contains IDs) for event in bucket_data["events"]: if "id" in event: del event["id"] self.create_events( bucket_id, [Event(**e) if isinstance(e, dict) else e for e in bucket_data["events"]], )
[docs] def import_all(self, buckets: Dict[str, Any]): for bid, bucket in buckets.items(): self.import_bucket(bucket)
[docs] def create_bucket( self, bucket_id: str, event_type: str, client: str, hostname: str, created: Optional[datetime] = None, data: Optional[Dict[str, Any]] = None, ) -> bool: """ Create a bucket. If hostname is "!local", the hostname and device_id will be set from the server info. This is useful for watchers which are known/assumed to run locally but might not know their hostname (like aw-watcher-web). Returns True if successful, otherwise false if a bucket with the given ID already existed. """ if created is None: created = datetime.now() if bucket_id in self.db.buckets(): return False if hostname == "!local": info = self.get_info() if data is None: data = {} hostname = info["hostname"] data["device_id"] = info["device_id"] self.db.create_bucket( bucket_id, type=event_type, client=client, hostname=hostname, created=created, data=data, ) return True
[docs] @check_bucket_exists def update_bucket( self, bucket_id: str, event_type: Optional[str] = None, client: Optional[str] = None, hostname: Optional[str] = None, data: Optional[Dict[str, Any]] = None, ) -> None: """Update bucket metadata""" self.db.update_bucket( bucket_id, type=event_type, client=client, hostname=hostname, data=data, ) return None
[docs] @check_bucket_exists def delete_bucket(self, bucket_id: str) -> None: """Delete a bucket""" self.db.delete_bucket(bucket_id) logger.debug(f"Deleted bucket '{bucket_id}'") return None
[docs] @check_bucket_exists def get_event( self, bucket_id: str, event_id: int, ) -> Optional[Event]: """Get a single event from a bucket""" logger.debug( f"Received get request for event {event_id} in bucket '{bucket_id}'" ) event = self.db[bucket_id].get_by_id(event_id) return event.to_json_dict() if event else None
[docs] @check_bucket_exists def get_events( self, bucket_id: str, limit: int = -1, start: Optional[datetime] = None, end: Optional[datetime] = None, ) -> List[Event]: """Get events from a bucket""" logger.debug(f"Received get request for events in bucket '{bucket_id}'") if limit is None: # Let limit = None also mean "no limit" limit = -1 events = [ event.to_json_dict() for event in self.db[bucket_id].get(limit, start, end) ] return events
[docs] @check_bucket_exists def create_events(self, bucket_id: str, events: List[Event]) -> Optional[Event]: """Create events for a bucket. Can handle both single events and multiple ones. Returns the inserted event when a single event was inserted, otherwise None.""" return self.db[bucket_id].insert(events)
[docs] @check_bucket_exists def get_eventcount( self, bucket_id: str, start: Optional[datetime] = None, end: Optional[datetime] = None, ) -> int: """Get eventcount from a bucket""" logger.debug(f"Received get request for eventcount in bucket '{bucket_id}'") return self.db[bucket_id].get_eventcount(start, end)
[docs] @check_bucket_exists def delete_event(self, bucket_id: str, event_id) -> bool: """Delete a single event from a bucket""" return self.db[bucket_id].delete(event_id)
[docs] @check_bucket_exists def heartbeat(self, bucket_id: str, heartbeat: Event, pulsetime: float) -> Event: """ Heartbeats are useful when implementing watchers that simply keep track of a state, how long it's in that state and when it changes. A single heartbeat always has a duration of zero. If the heartbeat was identical to the last (apart from timestamp), then the last event has its duration updated. If the heartbeat differed, then a new event is created. Such as: - Active application and window title - Example: aw-watcher-window - Currently open document/browser tab/playing song - Example: wakatime - Example: aw-watcher-web - Example: aw-watcher-spotify - Is the user active/inactive? Send an event on some interval indicating if the user is active or not. - Example: aw-watcher-afk Inspired by: https://wakatime.com/developers#heartbeats """ logger.debug( "Received heartbeat in bucket '{}'\n\ttimestamp: {}, duration: {}, pulsetime: {}\n\tdata: {}".format( bucket_id, heartbeat.timestamp, heartbeat.duration, pulsetime, heartbeat.data, ) ) # The endtime here is set such that in the event that the heartbeat is older than an # existing event we should try to merge it with the last event before the heartbeat instead. # FIXME: This (the endtime=heartbeat.timestamp) gets rid of the "heartbeat was older than last event" # warning and also causes a already existing "newer" event to be overwritten in the # replace_last call below. This is problematic. # Solution: This could be solved if we were able to replace arbitrary events. # That way we could double check that the event has been applied # and if it hasn't we simply replace it with the updated counterpart. last_event = None if bucket_id not in self.last_event: last_events = self.db[bucket_id].get(limit=1) if len(last_events) > 0: last_event = last_events[0] else: last_event = self.last_event[bucket_id] if last_event: if last_event.data == heartbeat.data: merged = heartbeat_merge(last_event, heartbeat, pulsetime) if merged is not None: # Heartbeat was merged into last_event logger.debug( "Received valid heartbeat, merging. (bucket: {})".format( bucket_id ) ) self.last_event[bucket_id] = merged self.db[bucket_id].replace_last(merged) return merged else: logger.info( "Received heartbeat after pulse window, inserting as new event. (bucket: {})".format( bucket_id ) ) else: logger.debug( "Received heartbeat with differing data, inserting as new event. (bucket: {})".format( bucket_id ) ) else: logger.info( "Received heartbeat, but bucket was previously empty, inserting as new event. (bucket: {})".format( bucket_id ) ) self.db[bucket_id].insert(heartbeat) self.last_event[bucket_id] = heartbeat return heartbeat
[docs] def query2(self, name, query, timeperiods, cache): result = [] for timeperiod in timeperiods: period = timeperiod.split("/")[ :2 ] # iso8601 timeperiods are separated by a slash starttime = iso8601.parse_date(period[0]) endtime = iso8601.parse_date(period[1]) query = "".join(query) result.append(query2.query(name, query, starttime, endtime, self.db)) return result
# TODO: Right now the log format on disk has to be JSON, this is hard to read by humans...
[docs] def get_log(self): """Get the server log in json format""" payload = [] with open(get_log_file_path()) as log_file: for line in log_file.readlines()[::-1]: payload.append(json.loads(line)) return payload, 200