Source code for aw_query.query2

import logging
from abc import abstractmethod
from datetime import datetime
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Sequence,
    Tuple,
    Type,
)

from aw_datastore import Datastore

from .exceptions import QueryInterpretException, QueryParseException
from .functions import functions

logger = logging.getLogger(__name__)


class QToken:
    @abstractmethod
    def interpret(self, datastore: Datastore, namespace: dict):
        raise NotImplementedError

    @staticmethod
    @abstractmethod
    def parse(string: str, namespace: dict):
        raise NotImplementedError

    @staticmethod
    @abstractmethod
    def check(string: str) -> Tuple[Optional[str], str]:
        raise NotImplementedError


class QInteger(QToken):
    def __init__(self, value) -> None:
        self.value = value

    def interpret(self, datastore: Datastore, namespace: dict):
        return self.value

    @staticmethod
    def parse(string: str, namespace: dict) -> QToken:
        return QInteger(int(string))

    @staticmethod
    def check(string: str):
        token = ""
        for char in string:
            if char.isdigit():
                token += char
            else:
                break
        return token, string[len(token) :]


class QVariable(QToken):
    def __init__(self, name, value) -> None:
        self.name = name
        self.value = value

    def interpret(self, datastore: Datastore, namespace: dict):
        if self.name not in namespace:
            raise QueryInterpretException(
                f"Tried to reference variable '{self.name}' which is not defined"
            )
        namespace[self.name] = self.value
        return self.value

    @staticmethod
    def parse(string: str, namespace: dict) -> QToken:
        val = None
        if string in namespace:
            val = namespace[string]
        return QVariable(string, val)

    @staticmethod
    def check(string: str):
        token = ""
        for i, char in enumerate(string):
            if char.isalpha() or char == "_":
                token += char
            elif i != 0 and char.isdigit():
                token += char
            else:
                break
        return token, string[len(token) :]


class QString(QToken):
    def __init__(self, value):
        self.value = value

    def interpret(self, datastore: Datastore, namespace: dict):
        return self.value

    @staticmethod
    def parse(string: str, namespace: dict) -> "QString":
        quotes_type = string[0]
        string = string.replace("\\" + quotes_type, quotes_type)
        string = string[1:-1]
        return QString(string)

    @staticmethod
    def check(string: str):
        token = ""
        quotes_type = string[0]
        if quotes_type != '"' and quotes_type != "'":
            return token, string
        token += quotes_type
        prev_char = None
        for char in string[1:]:
            token += char
            if (
                char == quotes_type and prev_char != "\\"
            ):  # escape quote_type with backslash
                break
            prev_char = char
        if token[-1] != quotes_type or len(token) < 2:
            # Unclosed string?
            raise QueryParseException("Failed to parse string")
        return token, string[len(token) :]


class QFunction(QToken):
    def __init__(self, name, args):
        self.name = name
        self.args = args

    def interpret(self, datastore: Datastore, namespace: dict):
        if self.name not in functions:
            raise QueryInterpretException(
                f"Tried to call function '{self.name}' which doesn't exist"
            )
        call_args = [datastore, namespace]
        for arg in self.args:
            call_args.append(arg.interpret(datastore, namespace))
        # logger.debug("Arguments for functioncall to {} is {}".format(self.name, call_args))
        try:
            result = functions[self.name](*call_args)  # type: ignore
        except TypeError:
            raise QueryInterpretException(
                f"Tried to call function {self.name} with invalid amount of arguments"
            ) from None
        return result

    @staticmethod
    def parse(string: str, namespace: dict) -> QToken:
        arg_start = 0
        arg_end = len(string) - 1
        # Find opening bracket
        for char in string:
            if char == "(":
                break
            arg_start = arg_start + 1
        # Parse name
        name = string[:arg_start]
        # Parse arguments
        args = []
        args_str = string[arg_start + 1 : arg_end]
        while args_str:
            (arg_t, arg), args_str = _parse_token(args_str, namespace)
            comma = args_str.find(",")
            if comma != -1:
                args_str = args_str[comma + 1 :]
            args.append(arg_t.parse(arg, namespace))
        return QFunction(name, args)

    @staticmethod
    def check(string: str):
        i = 0
        # Find opening bracket
        found = False
        for char in string:
            if char.isalpha() or char == "_":
                i = i + 1
            elif i != 0 and char.isdigit():
                i = i + 1
            elif char == "(":
                i = i + 1
                found = True
                break
            else:
                break
        if not found:
            return None, string
        to_consume = 1
        single_quote = False
        double_quote = False
        prev_char = None
        for char in string[i:]:
            i = i + 1
            if char == "'" and prev_char != "\\" and not double_quote:
                single_quote = not single_quote
            elif char == '"' and prev_char != "\\" and not single_quote:
                double_quote = not double_quote
            elif single_quote or double_quote:
                pass
            elif i != 0 and char.isdigit():
                pass
            elif char == "(":
                to_consume += 1
            elif char == ")":
                to_consume -= 1
            if to_consume == 0:
                break
            prev_char = char
        if to_consume != 0:
            return None, string
        return string[:i], string[i + 1 :]


class QDict(QToken):
    def __init__(self, value: dict) -> None:
        self.value = value

    def interpret(self, datastore: Datastore, namespace: dict):
        expanded_dict = {}
        for key, value in self.value.items():
            expanded_dict[key] = value.interpret(datastore, namespace)
        return expanded_dict

    @staticmethod
    def parse(string: str, namespace: dict) -> QToken:
        entries_str = string[1:-1]
        d: Dict[str, QToken] = {}
        while len(entries_str) > 0:
            entries_str = entries_str.strip()
            if len(d) > 0 and entries_str[0] == ",":
                entries_str = entries_str[1:]
            # parse key
            (key_t, key_str), entries_str = _parse_token(entries_str, namespace)
            if key_t != QString:
                raise QueryParseException("Key in dict is not a str")
            key = QString.parse(key_str, {}).value
            entries_str = entries_str.strip()
            # Remove :
            if entries_str[0] != ":":
                raise QueryParseException("Key in dict is not followed by a :")
            entries_str = entries_str[1:]
            # parse val
            (val_t, val_str), entries_str = _parse_token(entries_str, namespace)
            if not val_t:
                raise QueryParseException("Dict expected a value, got nothing")
            val = val_t.parse(val_str, namespace)
            # set
            d[key] = val
        return QDict(d)

    @staticmethod
    def check(string: str):
        if string[0] != "{":
            return None, string
        # Find closing bracket
        i = 1
        to_consume = 1
        single_quote = False
        double_quote = False
        prev_char = None
        for char in string[i:]:
            i += 1
            if char == "'" and prev_char != "\\" and not double_quote:
                single_quote = not single_quote
            elif char == '"' and prev_char != "\\" and not single_quote:
                double_quote = not double_quote
            elif single_quote or double_quote:
                pass
            elif char == "}":
                to_consume = to_consume - 1
            elif char == "{":
                to_consume = to_consume + 1
            if to_consume == 0:
                break
            prev_char = char
        return string[:i], string[i + 1 :]


class QList(QToken):
    def __init__(self, value: list) -> None:
        self.value = value

    def interpret(self, datastore: Datastore, namespace: dict):
        expanded_list = []
        for value in self.value:
            expanded_list.append(value.interpret(datastore, namespace))
        return expanded_list

    @staticmethod
    def parse(string: str, namespace: dict) -> QToken:
        entries_str = string[1:-1]
        ls: List[QToken] = []
        while len(entries_str) > 0:
            entries_str = entries_str.strip()
            if len(ls) > 0 and entries_str[0] == ",":
                entries_str = entries_str[1:]
            # parse
            (val_t, val_str), entries_str = _parse_token(entries_str, namespace)
            if not val_t:
                raise QueryParseException("List expected a value, got nothing")
            val = val_t.parse(val_str, namespace)
            # set
            ls.append(val)
        return QList(ls)

    @staticmethod
    def check(string: str):
        if string[0] != "[":
            return None, string
        # Find closing bracket
        i = 1
        to_consume = 1
        single_quote = False
        double_quote = False
        prev_char = None
        for char in string[i:]:
            i += 1
            if char == "'" and prev_char != "\\" and not double_quote:
                single_quote = not single_quote
            elif char == '"' and prev_char != "\\" and not single_quote:
                double_quote = not double_quote
            elif double_quote or single_quote:
                pass
            elif char == "]":
                to_consume = to_consume - 1
            elif char == "[":
                to_consume = to_consume + 1
            if to_consume == 0:
                break
            prev_char = char
        return string[:i], string[i + 1 :]


qtypes: Sequence[Type[QToken]] = [QString, QInteger, QFunction, QDict, QList, QVariable]


def _parse_token(string: str, namespace: dict) -> Tuple[Tuple[Any, str], str]:
    # TODO: The whole parsing thing is shoddily written, needs a rewrite from ground-up
    if not isinstance(string, str):
        raise QueryParseException(
            "Reached unreachable, cannot parse something that isn't a string"
        )
    if len(string) == 0:
        return (None, ""), string
    string = string.strip()
    token = None
    t = None  # Declare so we can return it
    for t in qtypes:
        token, string = t.check(string)
        if token:
            break
    if not token:
        raise QueryParseException(f"Syntax error: {string}")
    return (t, token), string


def create_namespace() -> dict:
    namespace = {
        "True": True,
        "False": False,
        "true": True,
        "false": False,
    }
    return namespace


def parse(line, namespace):
    separator_i = line.find("=")
    var_str = line[:separator_i]
    val_str = line[separator_i + 1 :]
    if not val_str:
        # TODO: Proper message
        raise QueryParseException("Nothing to assign")
    (var_t, var), var_str = _parse_token(var_str, namespace)
    var_str = var_str.strip()
    if var_str:  # Didn't consume whole var string
        raise QueryParseException("Invalid syntax for assignment variable")
    if var_t is not QVariable:
        raise QueryParseException("Cannot assign to a non-variable")
    (val_t, val), var_str = _parse_token(val_str, namespace)
    if var_str:  # Didn't consume whole val string
        raise QueryParseException("Invalid syntax for value to assign")
    # Parse token
    var = var_t.parse(var, namespace)
    val = val_t.parse(val, namespace)
    return var, val


def interpret(var, val, namespace, datastore):
    namespace[var.name] = val.interpret(datastore, namespace)
    # logger.debug("Set {} to {}".format(var.name, namespace[var.name]))


def get_return(namespace):
    if "RETURN" not in namespace:
        raise QueryParseException(
            "Query doesn't assign the RETURN variable, nothing to respond"
        )
    return namespace["RETURN"]


[docs]def query( name: str, query: str, starttime: datetime, endtime: datetime, datastore: Datastore ) -> Any: namespace = create_namespace() namespace["NAME"] = name namespace["STARTTIME"] = starttime.isoformat() namespace["ENDTIME"] = endtime.isoformat() query_stmts = query.split(";") for statement in query_stmts: statement = statement.strip() if statement: logger.debug("Parsing: " + statement) var, val = parse(statement, namespace) interpret(var, val, namespace, datastore) result = get_return(namespace) return result