API Reference (Python)¶
Here’s an API reference for some of the most central components in aw_core
, aw_client
and aw_server
.
These are the most important packages in ActivityWatch.
A lot of it currently lacks proper docstrings, but it’s a start.
Contents
aw_core¶
- class aw_core.Event(id: Optional[Union[int, str]] = None, timestamp: Optional[Union[datetime.datetime, str]] = None, duration: Union[datetime.timedelta, int, float] = 0, data: Dict[str, Any] = {})[source]¶
Used to represents an event.
- property data: dict¶
- property duration: datetime.timedelta¶
- property id: Optional[Union[int, str]]¶
- property timestamp: datetime.datetime¶
aw_core.models¶
- class aw_core.models.Event(id: Optional[Union[int, str]] = None, timestamp: Optional[Union[datetime.datetime, str]] = None, duration: Union[datetime.timedelta, int, float] = 0, data: Dict[str, Any] = {})[source]¶
Used to represents an event.
- property data: dict¶
- property duration: datetime.timedelta¶
- property id: Optional[Union[int, str]]¶
- property timestamp: datetime.datetime¶
aw_core.log¶
- aw_core.log.get_latest_log_file(name, testing=False) Optional[str] [source]¶
Returns the filename of the last logfile with
name
. Useful when you want to read the logfile of another ActivityWatch service.
aw_core.dirs¶
aw_client¶
The aw_client package contains a programmer-friendly wrapper around the servers REST API.
- class aw_client.ActivityWatchClient(client_name: str = 'unknown', testing=False, host=None, port=None, protocol='http')[source]¶
-
- get_event(bucket_id: str, event_id: int) Optional[aw_core.models.Event] [source]¶
- get_eventcount(bucket_id: str, limit: int = - 1, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None) int [source]¶
- get_events(bucket_id: str, limit: int = - 1, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None) List[aw_core.models.Event] [source]¶
- heartbeat(bucket_id: str, event: aw_core.models.Event, pulsetime: float, queued: bool = False, commit_interval: Optional[float] = None) None [source]¶
- Args:
bucket_id: The bucket_id of the bucket to send the heartbeat to event: The actual heartbeat event pulsetime: The maximum amount of time in seconds since the last heartbeat to be merged with the previous heartbeat in aw-server queued: Use the aw-client queue feature to queue events if client loses connection with the server commit_interval: Override default pre-merge commit interval
- NOTE: This endpoint can use the failed requests retry queue.
This makes the request itself non-blocking and therefore the function will in that case always returns None.
- insert_event(bucket_id: str, event: aw_core.models.Event) None [source]¶
- insert_events(bucket_id: str, events: List[aw_core.models.Event]) None [source]¶
aw_transform¶
The aw_transform
package contains transforms used in the query language.
Note
Their function signatures and return types may deviate from how the transforms are actually implemented in the query language. For more details, see aw_query.functions
- class aw_transform.Rule(rules: Dict[str, Any])[source]¶
- ignore_case: bool¶
- match(e: aw_core.models.Event) bool [source]¶
- regex: Optional[Pattern]¶
- select_keys: Optional[List[str]]¶
- aw_transform.categorize(events: List[aw_core.models.Event], classes: List[Tuple[List[str], aw_transform.classify.Rule]]) List[aw_core.models.Event] [source]¶
- aw_transform.chunk_events_by_key(events: List[aw_core.models.Event], key: str, pulsetime: float = 5.0) List[aw_core.models.Event] [source]¶
“Chunks” adjacent events together which have the same value for a key, and stores the original events in the
subevents
key of the new event.
- aw_transform.concat(events1, events2) List[aw_core.models.Event] [source]¶
Concatenates two lists of events
- aw_transform.filter_keyvals(events: List[aw_core.models.Event], key: str, vals: List[str], exclude=False) List[aw_core.models.Event] [source]¶
- aw_transform.filter_keyvals_regex(events: List[aw_core.models.Event], key: str, regex: str) List[aw_core.models.Event] [source]¶
- aw_transform.filter_period_intersect(events: List[aw_core.models.Event], filterevents: List[aw_core.models.Event]) List[aw_core.models.Event] [source]¶
Filters away all events or time periods of events in which a filterevent does not have an intersecting time period.
Useful for example when you want to filter away events or part of events during which a user was AFK.
- Usage:
windowevents_notafk = filter_period_intersect(windowevents, notafkevents)
- Example:
events1 | ======= ======== | events2 | ------ --- --- ---- | result | ==== = ==== |
A JavaScript version used to exist in aw-webui but was removed in this PR.
- aw_transform.flood(events: List[aw_core.models.Event], pulsetime: float = 5) List[aw_core.models.Event] [source]¶
Takes a list of events and “floods” any empty space between events by extending one of the surrounding events to cover the empty space.
- For more details on flooding, see this issue:
- aw_transform.heartbeat_merge(last_event: aw_core.models.Event, heartbeat: aw_core.models.Event, pulsetime: float) Optional[aw_core.models.Event] [source]¶
Merges two events if they have identical data and the heartbeat timestamp is within the pulsetime window.
- aw_transform.heartbeat_reduce(events: List[aw_core.models.Event], pulsetime: float) List[aw_core.models.Event] [source]¶
Merges consecutive events together according to the rules of
heartbeat_merge
.
- aw_transform.limit_events(events, count) List[aw_core.models.Event] [source]¶
Returns the
count
first events in the list of events
- aw_transform.merge_events_by_keys(events, keys) List[aw_core.models.Event] [source]¶
Sums the duration of all events which share a value for a key and returns a new event for each value.
- aw_transform.period_union(events1: List[aw_core.models.Event], events2: List[aw_core.models.Event]) List[aw_core.models.Event] [source]¶
Takes a list of two events and returns a new list of events covering the union of the timeperiods contained in the eventlists with no overlapping events.
Warning
This function strips all data from events as it cannot keep it consistent.
- Example:
events1 | ------- --------- | events2 | ------ --- -- ---- | result | ----------- -- --------- |
- aw_transform.simplify_string(events: List[aw_core.models.Event], key: str = 'title') List[aw_core.models.Event] [source]¶
- aw_transform.sort_by_duration(events) List[aw_core.models.Event] [source]¶
Sorts a list of events by duration
- aw_transform.sort_by_timestamp(events) List[aw_core.models.Event] [source]¶
Sorts a list of events by timestamp
- aw_transform.split_url_events(events: List[aw_core.models.Event]) List[aw_core.models.Event] [source]¶
- aw_transform.sum_durations(events) datetime.timedelta [source]¶
Sums the durations for the given events
- aw_transform.tag(events: List[aw_core.models.Event], classes: List[Tuple[str, aw_transform.classify.Rule]]) List[aw_core.models.Event] [source]¶
- aw_transform.union(events1: List[aw_core.models.Event], events2: List[aw_core.models.Event]) List[aw_core.models.Event] [source]¶
Concatenates and sorts union of 2 event lists and removes duplicates.
- Example:
Merges events from a backup-bucket with events from a “living” bucket.
events = union(events_backup, events_living)
- aw_transform.union_no_overlap(events1: List[aw_core.models.Event], events2: List[aw_core.models.Event]) List[aw_core.models.Event] [source]¶
Merges two eventlists and removes overlap, the first eventlist will have precedence
- Example:
events1 | xxx xx xxx | events1 | —- —— – | result | xxx– xx —-xxx – |
aw_query¶
The aw_query
package contains the interpreter for the query language and registers the standard functions, usually based on Python implementations of them available in aw_transform
.
- aw_query.query(name: str, query: str, starttime: datetime.datetime, endtime: datetime.datetime, datastore: aw_datastore.datastore.Datastore) Any [source]¶
- aw_query.functions.q2_chunk_events_by_key(events: list, key: str) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.chunk_events_by_key
“Chunks” adjacent events together which have the same value for a key, and stores the original events in the
subevents
key of the new event.
- aw_query.functions.q2_concat(events1: list, events2: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.concat
Concatenates two lists of events
- aw_query.functions.q2_exclude_keyvals(events: list, key: str, vals: list) List[aw_core.models.Event] [source]¶
- aw_query.functions.q2_filter_keyvals(events: list, key: str, vals: list) List[aw_core.models.Event] [source]¶
- aw_query.functions.q2_filter_keyvals_regex(events: list, key: str, regex: str) List[aw_core.models.Event] [source]¶
- aw_query.functions.q2_filter_period_intersect(events: list, filterevents: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.filter_period_intersect
Filters away all events or time periods of events in which a filterevent does not have an intersecting time period.
Useful for example when you want to filter away events or part of events during which a user was AFK.
- Usage:
windowevents_notafk = filter_period_intersect(windowevents, notafkevents)
- Example:
events1 | ======= ======== | events2 | ------ --- --- ---- | result | ==== = ==== |
A JavaScript version used to exist in aw-webui but was removed in this PR.
- aw_query.functions.q2_find_bucket(datastore: aw_datastore.datastore.Datastore, filter_str: str, hostname: Optional[str] = None)[source]¶
Find bucket by using a filter_str (to avoid hardcoding bucket names)
- aw_query.functions.q2_flood(events: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.flood
Takes a list of events and “floods” any empty space between events by extending one of the surrounding events to cover the empty space.
- For more details on flooding, see this issue:
- aw_query.functions.q2_limit_events(events: list, count: int) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.limit_events
Returns the
count
first events in the list of events
- aw_query.functions.q2_merge_events_by_keys(events: list, keys: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.merge_events_by_keys
Sums the duration of all events which share a value for a key and returns a new event for each value.
- aw_query.functions.q2_period_union(events1: list, events2: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.period_union
Takes a list of two events and returns a new list of events covering the union of the timeperiods contained in the eventlists with no overlapping events.
Warning
This function strips all data from events as it cannot keep it consistent.
- Example:
events1 | ------- --------- | events2 | ------ --- -- ---- | result | ----------- -- --------- |
- aw_query.functions.q2_query_bucket(datastore: aw_datastore.datastore.Datastore, namespace: Dict[str, Any], bucketname: str) List[aw_core.models.Event] [source]¶
- aw_query.functions.q2_query_bucket_eventcount(datastore: aw_datastore.datastore.Datastore, namespace: Dict[str, Any], bucketname: str) int [source]¶
- aw_query.functions.q2_simplify_window_titles(events: list, key: str) List[aw_core.models.Event] [source]¶
- aw_query.functions.q2_sort_by_duration(events: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.sort_by_duration
Sorts a list of events by duration
- aw_query.functions.q2_sort_by_timestamp(events: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.sort_by_timestamp
Sorts a list of events by timestamp
- aw_query.functions.q2_split_url_events(events: list) List[aw_core.models.Event] [source]¶
- aw_query.functions.q2_sum_durations(events: list) datetime.timedelta [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.sum_durations
Sums the durations for the given events
- aw_query.functions.q2_union_no_overlap(events1: list, events2: list) List[aw_core.models.Event] [source]¶
Note
Documentation automatically copied from underlying function
aw_transform.union_no_overlap
Merges two eventlists and removes overlap, the first eventlist will have precedence
- Example:
events1 | xxx xx xxx | events1 | —- —— – | result | xxx– xx —-xxx – |
aw_server¶
aw_server.api¶
The ServerAPI
class contains the basic API methods, these methods are primarily called from RPC layers such as the one found in aw_server.rest
.
- class aw_server.api.ServerAPI(db, testing)[source]¶
- create_bucket(bucket_id: str, event_type: str, client: str, hostname: str, created: Optional[datetime.datetime] = None, data: Optional[Dict[str, Any]] = None) bool [source]¶
Create a bucket.
If hostname is “!local”, the hostname and device_id will be set from the server info. This is useful for watchers which are known/assumed to run locally but might not know their hostname (like aw-watcher-web).
Returns True if successful, otherwise false if a bucket with the given ID already existed.
- create_events(bucket_id: str, events: List[aw_core.models.Event]) Optional[aw_core.models.Event] [source]¶
Create events for a bucket. Can handle both single events and multiple ones.
Returns the inserted event when a single event was inserted, otherwise None.
- export_all() Dict[str, Any] [source]¶
Exports all buckets and their events to a format consistent across versions
- export_bucket(bucket_id: str) Dict[str, Any] [source]¶
Export a bucket to a dataformat consistent across versions, including all events in it.
- get_event(bucket_id: str, event_id: int) Optional[aw_core.models.Event] [source]¶
Get a single event from a bucket
- get_eventcount(bucket_id: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None) int [source]¶
Get eventcount from a bucket
- get_events(bucket_id: str, limit: int = - 1, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None) List[aw_core.models.Event] [source]¶
Get events from a bucket
- heartbeat(bucket_id: str, heartbeat: aw_core.models.Event, pulsetime: float) aw_core.models.Event [source]¶
Heartbeats are useful when implementing watchers that simply keep track of a state, how long it’s in that state and when it changes. A single heartbeat always has a duration of zero.
If the heartbeat was identical to the last (apart from timestamp), then the last event has its duration updated. If the heartbeat differed, then a new event is created.
- Such as:
Active application and window title - Example: aw-watcher-window
Currently open document/browser tab/playing song - Example: wakatime - Example: aw-watcher-web - Example: aw-watcher-spotify
Is the user active/inactive? Send an event on some interval indicating if the user is active or not. - Example: aw-watcher-afk
Inspired by: https://wakatime.com/developers#heartbeats