diff options
| author | Frederick Muriuki Muriithi | 2026-03-30 11:13:49 -0500 |
|---|---|---|
| committer | Frederick Muriuki Muriithi | 2026-03-30 11:13:49 -0500 |
| commit | 89c21772afcf5bcb8390dc965387129eead38762 (patch) | |
| tree | 77e415e76a3e751f37b2bee2a8c199d8b86dfe03 /gn_libs/privileges | |
| parent | a9d5cc0ad0a9eeac2fb1276baa2ac8b294a18133 (diff) | |
| download | gn-libs-89c21772afcf5bcb8390dc965387129eead38762.tar.gz | |
Add checks for privileges.
* Make privileges a package rather than a module and rename previous module. * Add generic checks for most-common features of the system.
Diffstat (limited to 'gn_libs/privileges')
| -rw-r--r-- | gn_libs/privileges/__init__.py | 2 | ||||
| -rw-r--r-- | gn_libs/privileges/authspec.py | 166 | ||||
| -rw-r--r-- | gn_libs/privileges/checks.py | 48 |
3 files changed, 216 insertions, 0 deletions
diff --git a/gn_libs/privileges/__init__.py b/gn_libs/privileges/__init__.py new file mode 100644 index 0000000..9b2af85 --- /dev/null +++ b/gn_libs/privileges/__init__.py @@ -0,0 +1,2 @@ +from .authspec import check, parse, SpecificationValueError +from .checks import can_view, can_edit, can_create, can_delete diff --git a/gn_libs/privileges/authspec.py b/gn_libs/privileges/authspec.py new file mode 100644 index 0000000..32c943d --- /dev/null +++ b/gn_libs/privileges/authspec.py @@ -0,0 +1,166 @@ +"""Utilities for handling privileges.""" +import logging +from functools import reduce +from typing import Union, Sequence, Iterator, TypeAlias, TypedDict + +logger = logging.getLogger(__name__) + +Operator: TypeAlias = str # Valid operators: "AND", "OR" +Privilege: TypeAlias = str +PrivilegesList: TypeAlias = Sequence[Privilege] +ParseTree = tuple[Operator, + # Leaves (`PrivilegesList` objects) on the left, + # trees (`ParseTree` objects) on the right + Union[PrivilegesList, tuple[PrivilegesList, 'ParseTree']]] + + +class SpecificationValueError(ValueError): + """Raised when there is an error in the specification string.""" + + +_OPERATORS_ = ("OR", "AND") +_EMPTY_SPEC_ERROR_ = SpecificationValueError( + "Empty specification. I do not know what to do.") + + +def __add_leaves__( + index: int, + tree: tuple[Operator], + leaves: dict +) -> Union[tuple[Operator], Union[ParseTree, tuple]]: + """Add leaves to the tree.""" + if leaves.get(index): + return tree + (leaves[index],) + return tree + (tuple()) + + +class ParsingState(TypedDict): + """Class to create a state object. Mostly used to silence MyPy""" + tokens: list[str] + trees: list[tuple[int, int, str, int, int]]#(name, parent, operator, start, end) + open_parens: int + current_tree: int + leaves: dict[int, tuple[str, ...]]#[parent-tree, [index, index, ...]] + + +def __build_tree__(tree_state: ParsingState) -> ParseTree: + """Given computed state, build the actual tree.""" + _built = [] + for idx, tree in enumerate(tree_state["trees"]): + _built.append(__add_leaves__(idx, (tree[2],), tree_state["leaves"])) + + logger.debug("Number of built trees: %s, %s", len(_built), _built) + _num_trees = len(_built) + for idx in range(0, _num_trees): + _last_tree = _built.pop() + logger.debug("LAST TREE: %s, %s", _last_tree, len(_last_tree)) + if len(_last_tree) <= 1:# Has no leaves or subtrees + _last_tree = None# type: ignore[assignment] + continue# more evil + _name = tree_state["trees"][_num_trees - 1 - idx][0] + _parent = tree_state["trees"][ + tree_state["trees"][_num_trees - 1 - idx][1]] + _op = tree_state["trees"][_num_trees - 1 - idx][2] + logger.debug("TREE => name: %s, operation: %s, parent: %s", + _name, _op, _parent) + if _name != _parent[0]:# not root tree + if _op == _parent[2]: + _built[_parent[0]] = ( + _built[_parent[0]][0],# Operator + _built[_parent[0]][1] + _last_tree[1]# merge leaves + ) + _last_tree[2:]#Add any trees left over + else: + _built[_parent[0]] += (_last_tree,) + + if _last_tree is None: + raise _EMPTY_SPEC_ERROR_ + return _last_tree + + +def __parse_tree__(tokens: Iterator[str]) -> ParseTree: + """Parse the tokens into a tree.""" + _state = ParsingState( + tokens=[], trees=[], open_parens=0, current_tree=0, leaves={}) + for _idx, _token in enumerate(tokens): + _state["tokens"].append(_token) + + if _idx==0: + if _token[1:].upper() not in _OPERATORS_: + raise SpecificationValueError(f"Invalid operator: {_token[1:]}") + _state["open_parens"] += 1 + _state["trees"].append((0, 0, _token[1:].upper(), _idx, -1)) + _state["current_tree"] = 0 + continue# this is bad! + + if _token == ")":# end a tree + logger.debug("ENDING A TREE: %s", _state) + _state["open_parens"] -= 1 + _state["trees"][_state["current_tree"]] = ( + _state["trees"][_state["current_tree"]][0:-1] + (_idx,)) + # We go back to the parent below. + _state["current_tree"] = _state["trees"][_state["current_tree"]][1] + continue# still really bad! + + if _token[1:].upper() in _OPERATORS_:# new child tree + _state["open_parens"] += 1 + _state["trees"].append((len(_state["trees"]), + _state["current_tree"], + _token[1:].upper(), + _idx, + -1)) + _state["current_tree"] = len(_state["trees"]) - 1 + continue# more evil still + + logger.debug("state: %s", _state) + # leaves + _state["leaves"][_state["current_tree"]] = _state["leaves"].get( + _state["current_tree"], tuple()) + (_token,) + + # Build parse-tree from state + if _state["open_parens"] != 0: + raise SpecificationValueError("Unbalanced parentheses.") + return __build_tree__(_state) + + +def __tokenise__(spec: str) -> Iterator[str]: + """Clean up and tokenise the string.""" + return (token.strip() + for token in spec.replace( + "(", " (" + ).replace( + ")", " ) " + ).replace( + "( ", "(" + ).split()) + + +def parse(spec: str) -> ParseTree: + """Parse a string specification for privileges and return a tree of data + objects of the form (<operator> (<check>))""" + if spec.strip() == "": + raise _EMPTY_SPEC_ERROR_ + + return __parse_tree__(__tokenise__(spec)) + + +def __make_checker__(check_fn): + def __checker__(privileges, *checks): + def __check__(acc, curr): + if curr[0] in _OPERATORS_: + return acc + (_OPERATOR_FUNCTION_[curr[0]]( + privileges, *curr[1:]),) + return acc + (check_fn((priv in privileges) for priv in curr),) + results = reduce(__check__, checks, tuple()) + return len(results) > 0 and check_fn(results) + + return __checker__ + + +_OPERATOR_FUNCTION_ = { + "OR": __make_checker__(any), + "AND": __make_checker__(all) +} +def check(spec: str, privileges: tuple[str, ...]) -> bool: + """Check that the sequence of `privileges` satisfies `spec`.""" + _spec = parse(spec) + return _OPERATOR_FUNCTION_[_spec[0]](privileges, *_spec[1:]) diff --git a/gn_libs/privileges/checks.py b/gn_libs/privileges/checks.py new file mode 100644 index 0000000..3b52d35 --- /dev/null +++ b/gn_libs/privileges/checks.py @@ -0,0 +1,48 @@ +import uuid +import logging +from functools import partial + +from gn_libs.sqlite3 import DbConnection + +from .authspec import check + + +logger = logging.getLogger(__name__) + + +class PrivilegeCheckError(Exception): + """Raise when there's an error when checking for privileges.""" + + +def privileges_fulfill_specs( + queried_privileges: tuple[str, ...], + resource_spec: str, + system_spec: str +) -> bool: + """Check whether a user's privileges fulfill the given specs.""" + return (check(resource_spec, queried_privileges) or + check(system_spec, queried_privileges)) + + +can_view = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:view-resource system:resource:view)", + system_spec="(OR system:system-wide:data:view system:resource:view)") + + +can_edit = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:edit-resource system:resource:edit)", + system_spec="(OR system:system-wide:data:edit system:resource:edit)") + + +can_create = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:create-resource)", + system_spec="(OR system:system-wide:data:create)") + + +can_delete = partial( + privileges_fulfill_specs, + resource_spec="(OR group:resource:delete-resource system:resource:delete)", + system_spec="(OR system:system-wide:data:delete system:resource:delete)") |
