aboutsummaryrefslogtreecommitdiff
path: root/scripts/authentication/group.py
blob: b89bc3eca2e67840c9a06535649c83131457068a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""A script for adding users to a specific group.

Example:

Assuming there are no groups and 'test@bonfacemunyoki.com' does not
exist in Redis:

.. code-block:: bash
   python group.py -g "editors" -m "test@bonfacemunyoki.com"

results in::

   Successfully created the group: 'editors'
   Data: '{"admins": [], "members": []}'

If 'me@bonfacemunyoki.com' exists in 'users' in Redis and we run:

.. code-block:: bash
   python group.py -g "editors" -m "me@bonfacemunyoki.com"

now results in::

   No new group was created.
   Updated Data: {'admins': [], 'members': ['me@bonfacemunyoki.com']}

"""

import argparse
import redis
import json

from typing import Dict, List, Optional, Set
from glom import glom  # type: ignore


def create_group_data(users: Dict, target_group: str,
                      members: Optional[str] = None,
                      admins: Optional[str] = None) -> Dict:
    """Return a dictionary that contains the following keys: "key",
    "field", and "value" that can be used in a redis hash as follows:
    HSET key field value

    Parameters:

    - `users`: a list of users for example:

    {'8ad942fe-490d-453e-bd37-56f252e41603':
    '{"email_address": "me@test.com",
      "full_name": "John Doe",
      "organization": "Genenetwork",
      "password": {"algorithm": "pbkdf2",
                   "hashfunc": "sha256",
                    "salt": "gJrd1HnPSSCmzB5veMPaVk2ozzDlS1Z7Ggcyl1+pciA=",
                    "iterations": 100000, "keylength": 32,
                    "created_timestamp": "2021-09-22T11:32:44.971912",
                    "password": "edcdaa60e84526c6"},
                    "user_id": "8ad942fe", "confirmed": 1,
                    "registration_info": {
                        "timestamp": "2021-09-22T11:32:45.028833",
                        "ip_address": "127.0.0.1",
                        "user_agent": "Mozilla/5.0"}}'}

    - `target_group`: the group name that will be stored inside the
      "groups" hash in Redis.

    - `members`: a comma-separated list of values that contain members
      of the `target_group` e.g. "me@test1.com, me@test2.com,
      me@test3.com"

    - `admins`: a comma-separated list of values that contain
      administrators of the `target_group` e.g. "me@test1.com,
      me@test2.com, me@test3.com"

    """
    _members = "".join(members.split()).split(",") if members else []
    _admins: List = "".join(admins.split()).split(",") if admins else []

    user_emails: Set = {glom(json.loads(user_details), "email_address")
                        for _, user_details in users.items()}

    return {"key": "groups",
            "field": target_group,
            "value": json.dumps({
                "admins": [admin for admin in _admins
                           if admin in user_emails],
                "members": [member for member in _members
                            if member in user_emails]
            })}


if __name__ == "__main__":
    # Initialising the parser CLI arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("-g", "--group-name",
                        help="This is the name of the GROUP mask")
    parser.add_argument("-m", "--members",
                        help="Members of the GROUP mask")
    parser.add_argument("-a", "--admins",
                        help="Admins of the GROUP mask")
    args = parser.parse_args()

    if not args.group_name:
        exit("\nExiting. Please specify a group name to use!\n")

    members = args.members if args.members else None
    admins = args.admins if args.admins else None
    REDIS_CONN = redis.Redis()
    USERS = {key.decode(): val.decode()
             for key, val in REDIS_CONN.hgetall("users").items()}

    if not any([members, admins]):
        exit("\nExiting. Please provide a value for "
             "MEMBERS(-m) or ADMINS(-a)!\n")

    data = create_group_data(
        users=USERS,
        target_group=args.group_name,
        members=members,
        admins=admins)
    created_p = REDIS_CONN.hset(data.get("key", ""),
                                data.get("field", ""),
                                data.get("value", ""))

    groups = json.loads(REDIS_CONN.hget("groups",
                                        args.group_name))  # type: ignore
    if created_p:
        exit(f"\nSuccessfully created the group: '{args.group_name}'\n"
             f"Data: {groups}\n")
    exit("\nNo new group was created.\n"
         f"Updated Data: {groups}\n")