From 89c21772afcf5bcb8390dc965387129eead38762 Mon Sep 17 00:00:00 2001 From: Frederick Muriuki Muriithi Date: Mon, 30 Mar 2026 11:13:49 -0500 Subject: Add checks for privileges. * Make privileges a package rather than a module and rename previous module. * Add generic checks for most-common features of the system. --- gn_libs/privileges.py | 166 ----------------------------------------- gn_libs/privileges/__init__.py | 2 + gn_libs/privileges/authspec.py | 166 +++++++++++++++++++++++++++++++++++++++++ gn_libs/privileges/checks.py | 48 ++++++++++++ 4 files changed, 216 insertions(+), 166 deletions(-) delete mode 100644 gn_libs/privileges.py create mode 100644 gn_libs/privileges/__init__.py create mode 100644 gn_libs/privileges/authspec.py create mode 100644 gn_libs/privileges/checks.py diff --git a/gn_libs/privileges.py b/gn_libs/privileges.py deleted file mode 100644 index 32c943d..0000000 --- a/gn_libs/privileges.py +++ /dev/null @@ -1,166 +0,0 @@ -"""Utilities for handling privileges.""" -import logging -from functools import reduce -from typing import Union, Sequence, Iterator, TypeAlias, TypedDict - -logger = logging.getLogger(__name__) - -Operator: TypeAlias = str # Valid operators: "AND", "OR" -Privilege: TypeAlias = str -PrivilegesList: TypeAlias = Sequence[Privilege] -ParseTree = tuple[Operator, - # Leaves (`PrivilegesList` objects) on the left, - # trees (`ParseTree` objects) on the right - Union[PrivilegesList, tuple[PrivilegesList, 'ParseTree']]] - - -class SpecificationValueError(ValueError): - """Raised when there is an error in the specification string.""" - - -_OPERATORS_ = ("OR", "AND") -_EMPTY_SPEC_ERROR_ = SpecificationValueError( - "Empty specification. I do not know what to do.") - - -def __add_leaves__( - index: int, - tree: tuple[Operator], - leaves: dict -) -> Union[tuple[Operator], Union[ParseTree, tuple]]: - """Add leaves to the tree.""" - if leaves.get(index): - return tree + (leaves[index],) - return tree + (tuple()) - - -class ParsingState(TypedDict): - """Class to create a state object. Mostly used to silence MyPy""" - tokens: list[str] - trees: list[tuple[int, int, str, int, int]]#(name, parent, operator, start, end) - open_parens: int - current_tree: int - leaves: dict[int, tuple[str, ...]]#[parent-tree, [index, index, ...]] - - -def __build_tree__(tree_state: ParsingState) -> ParseTree: - """Given computed state, build the actual tree.""" - _built = [] - for idx, tree in enumerate(tree_state["trees"]): - _built.append(__add_leaves__(idx, (tree[2],), tree_state["leaves"])) - - logger.debug("Number of built trees: %s, %s", len(_built), _built) - _num_trees = len(_built) - for idx in range(0, _num_trees): - _last_tree = _built.pop() - logger.debug("LAST TREE: %s, %s", _last_tree, len(_last_tree)) - if len(_last_tree) <= 1:# Has no leaves or subtrees - _last_tree = None# type: ignore[assignment] - continue# more evil - _name = tree_state["trees"][_num_trees - 1 - idx][0] - _parent = tree_state["trees"][ - tree_state["trees"][_num_trees - 1 - idx][1]] - _op = tree_state["trees"][_num_trees - 1 - idx][2] - logger.debug("TREE => name: %s, operation: %s, parent: %s", - _name, _op, _parent) - if _name != _parent[0]:# not root tree - if _op == _parent[2]: - _built[_parent[0]] = ( - _built[_parent[0]][0],# Operator - _built[_parent[0]][1] + _last_tree[1]# merge leaves - ) + _last_tree[2:]#Add any trees left over - else: - _built[_parent[0]] += (_last_tree,) - - if _last_tree is None: - raise _EMPTY_SPEC_ERROR_ - return _last_tree - - -def __parse_tree__(tokens: Iterator[str]) -> ParseTree: - """Parse the tokens into a tree.""" - _state = ParsingState( - tokens=[], trees=[], open_parens=0, current_tree=0, leaves={}) - for _idx, _token in enumerate(tokens): - _state["tokens"].append(_token) - - if _idx==0: - if _token[1:].upper() not in _OPERATORS_: - raise SpecificationValueError(f"Invalid operator: {_token[1:]}") - _state["open_parens"] += 1 - _state["trees"].append((0, 0, _token[1:].upper(), _idx, -1)) - _state["current_tree"] = 0 - continue# this is bad! - - if _token == ")":# end a tree - logger.debug("ENDING A TREE: %s", _state) - _state["open_parens"] -= 1 - _state["trees"][_state["current_tree"]] = ( - _state["trees"][_state["current_tree"]][0:-1] + (_idx,)) - # We go back to the parent below. - _state["current_tree"] = _state["trees"][_state["current_tree"]][1] - continue# still really bad! - - if _token[1:].upper() in _OPERATORS_:# new child tree - _state["open_parens"] += 1 - _state["trees"].append((len(_state["trees"]), - _state["current_tree"], - _token[1:].upper(), - _idx, - -1)) - _state["current_tree"] = len(_state["trees"]) - 1 - continue# more evil still - - logger.debug("state: %s", _state) - # leaves - _state["leaves"][_state["current_tree"]] = _state["leaves"].get( - _state["current_tree"], tuple()) + (_token,) - - # Build parse-tree from state - if _state["open_parens"] != 0: - raise SpecificationValueError("Unbalanced parentheses.") - return __build_tree__(_state) - - -def __tokenise__(spec: str) -> Iterator[str]: - """Clean up and tokenise the string.""" - return (token.strip() - for token in spec.replace( - "(", " (" - ).replace( - ")", " ) " - ).replace( - "( ", "(" - ).split()) - - -def parse(spec: str) -> ParseTree: - """Parse a string specification for privileges and return a tree of data - objects of the form ( ())""" - if spec.strip() == "": - raise _EMPTY_SPEC_ERROR_ - - return __parse_tree__(__tokenise__(spec)) - - -def __make_checker__(check_fn): - def __checker__(privileges, *checks): - def __check__(acc, curr): - if curr[0] in _OPERATORS_: - return acc + (_OPERATOR_FUNCTION_[curr[0]]( - privileges, *curr[1:]),) - return acc + (check_fn((priv in privileges) for priv in curr),) - results = reduce(__check__, checks, tuple()) - return len(results) > 0 and check_fn(results) - - return __checker__ - - -_OPERATOR_FUNCTION_ = { - "OR": __make_checker__(any), - "AND": __make_checker__(all) -} -def check(spec: str, privileges: tuple[str, ...]) -> bool: - """Check that the sequence of `privileges` satisfies `spec`.""" - _spec = parse(spec) - return _OPERATOR_FUNCTION_[_spec[0]](privileges, *_spec[1:]) diff --git a/gn_libs/privileges/__init__.py b/gn_libs/privileges/__init__.py new file mode 100644 index 0000000..9b2af85 --- /dev/null +++ b/gn_libs/privileges/__init__.py @@ -0,0 +1,2 @@ +from .authspec import check, parse, SpecificationValueError +from .checks import can_view, can_edit, can_create, can_delete diff --git a/gn_libs/privileges/authspec.py b/gn_libs/privileges/authspec.py new file mode 100644 index 0000000..32c943d --- /dev/null +++ b/gn_libs/privileges/authspec.py @@ -0,0 +1,166 @@ +"""Utilities for handling privileges.""" +import logging +from functools import reduce +from typing import Union, Sequence, Iterator, TypeAlias, TypedDict + +logger = logging.getLogger(__name__) + +Operator: TypeAlias = str # Valid operators: "AND", "OR" +Privilege: TypeAlias = str +PrivilegesList: TypeAlias = Sequence[Privilege] +ParseTree = tuple[Operator, + # Leaves (`PrivilegesList` objects) on the left, + # trees (`ParseTree` objects) on the right + Union[PrivilegesList, tuple[PrivilegesList, 'ParseTree']]] + + +class SpecificationValueError(ValueError): + """Raised when there is an error in the specification string.""" + + +_OPERATORS_ = ("OR", "AND") +_EMPTY_SPEC_ERROR_ = SpecificationValueError( + "Empty specification. I do not know what to do.") + + +def __add_leaves__( + index: int, + tree: tuple[Operator], + leaves: dict +) -> Union[tuple[Operator], Union[ParseTree, tuple]]: + """Add leaves to the tree.""" + if leaves.get(index): + return tree + (leaves[index],) + return tree + (tuple()) + + +class ParsingState(TypedDict): + """Class to create a state object. Mostly used to silence MyPy""" + tokens: list[str] + trees: list[tuple[int, int, str, int, int]]#(name, parent, operator, start, end) + open_parens: int + current_tree: int + leaves: dict[int, tuple[str, ...]]#[parent-tree, [index, index, ...]] + + +def __build_tree__(tree_state: ParsingState) -> ParseTree: + """Given computed state, build the actual tree.""" + _built = [] + for idx, tree in enumerate(tree_state["trees"]): + _built.append(__add_leaves__(idx, (tree[2],), tree_state["leaves"])) + + logger.debug("Number of built trees: %s, %s", len(_built), _built) + _num_trees = len(_built) + for idx in range(0, _num_trees): + _last_tree = _built.pop() + logger.debug("LAST TREE: %s, %s", _last_tree, len(_last_tree)) + if len(_last_tree) <= 1:# Has no leaves or subtrees + _last_tree = None# type: ignore[assignment] + continue# more evil + _name = tree_state["trees"][_num_trees - 1 - idx][0] + _parent = tree_state["trees"][ + tree_state["trees"][_num_trees - 1 - idx][1]] + _op = tree_state["trees"][_num_trees - 1 - idx][2] + logger.debug("TREE => name: %s, operation: %s, parent: %s", + _name, _op, _parent) + if _name != _parent[0]:# not root tree + if _op == _parent[2]: + _built[_parent[0]] = ( + _built[_parent[0]][0],# Operator + _built[_parent[0]][1] + _last_tree[1]# merge leaves + ) + _last_tree[2:]#Add any trees left over + else: + _built[_parent[0]] += (_last_tree,) + + if _last_tree is None: + raise _EMPTY_SPEC_ERROR_ + return _last_tree + + +def __parse_tree__(tokens: Iterator[str]) -> ParseTree: + """Parse the tokens into a tree.""" + _state = ParsingState( + tokens=[], trees=[], open_parens=0, current_tree=0, leaves={}) + for _idx, _token in enumerate(tokens): + _state["tokens"].append(_token) + + if _idx==0: + if _token[1:].upper() not in _OPERATORS_: + raise SpecificationValueError(f"Invalid operator: {_token[1:]}") + _state["open_parens"] += 1 + _state["trees"].append((0, 0, _token[1:].upper(), _idx, -1)) + _state["current_tree"] = 0 + continue# this is bad! + + if _token == ")":# end a tree + logger.debug("ENDING A TREE: %s", _state) + _state["open_parens"] -= 1 + _state["trees"][_state["current_tree"]] = ( + _state["trees"][_state["current_tree"]][0:-1] + (_idx,)) + # We go back to the parent below. + _state["current_tree"] = _state["trees"][_state["current_tree"]][1] + continue# still really bad! + + if _token[1:].upper() in _OPERATORS_:# new child tree + _state["open_parens"] += 1 + _state["trees"].append((len(_state["trees"]), + _state["current_tree"], + _token[1:].upper(), + _idx, + -1)) + _state["current_tree"] = len(_state["trees"]) - 1 + continue# more evil still + + logger.debug("state: %s", _state) + # leaves + _state["leaves"][_state["current_tree"]] = _state["leaves"].get( + _state["current_tree"], tuple()) + (_token,) + + # Build parse-tree from state + if _state["open_parens"] != 0: + raise SpecificationValueError("Unbalanced parentheses.") + return __build_tree__(_state) + + +def __tokenise__(spec: str) -> Iterator[str]: + """Clean up and tokenise the string.""" + return (token.strip() + for token in spec.replace( + "(", " (" + ).replace( + ")", " ) " + ).replace( + "( ", "(" + ).split()) + + +def parse(spec: str) -> ParseTree: + """Parse a string specification for privileges and return a tree of data + objects of the form ( ())""" + if spec.strip() == "": + raise _EMPTY_SPEC_ERROR_ + + return __parse_tree__(__tokenise__(spec)) + + +def __make_checker__(check_fn): + def __checker__(privileges, *checks): + def __check__(acc, curr): + if curr[0] in _OPERATORS_: + return acc + (_OPERATOR_FUNCTION_[curr[0]]( + privileges, *curr[1:]),) + return acc + (check_fn((priv in privileges) for priv in curr),) + results = reduce(__check__, checks, tuple()) + return len(results) > 0 and check_fn(results) + + return __checker__ + + +_OPERATOR_FUNCTION_ = { + "OR": __make_checker__(any), + "AND": __make_checker__(all) +} +def check(spec: str, privileges: tuple[str, ...]) -> bool: + """Check that the sequence of `privileges` satisfies `spec`.""" + _spec = parse(spec) + return _OPERATOR_FUNCTION_[_spec[0]](privileges, *_spec[1:]) diff --git a/gn_libs/privileges/checks.py b/gn_libs/privileges/checks.py new file mode 100644 index 0000000..3b52d35 --- /dev/null +++ b/gn_libs/privileges/checks.py @@ -0,0 +1,48 @@ +import uuid +import logging +from functools import partial + +from gn_libs.sqlite3 import DbConnection + +from .authspec import check + + +logger = logging.getLogger(__name__) + + +class PrivilegeCheckError(Exception): + """Raise when there's an error when checking for privileges.""" + + +def privileges_fulfill_specs( + queried_privileges: tuple[str, ...], + resource_spec: str, + system_spec: str +) -> bool: + """Check whether a user's privileges fulfill the given specs.""" + return (check(resource_spec, queried_privileges) or + check(system_spec, queried_privileges)) + + +can_view = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:view-resource system:resource:view)", + system_spec="(OR system:system-wide:data:view system:resource:view)") + + +can_edit = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:edit-resource system:resource:edit)", + system_spec="(OR system:system-wide:data:edit system:resource:edit)") + + +can_create = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:create-resource)", + system_spec="(OR system:system-wide:data:create)") + + +can_delete = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:delete-resource system:resource:delete)", + system_spec="(OR system:system-wide:data:delete system:resource:delete)") -- cgit 1.4.1