-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathaccess_manager_extension.py
More file actions
executable file
·338 lines (277 loc) · 11.7 KB
/
access_manager_extension.py
File metadata and controls
executable file
·338 lines (277 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
#!/usr/bin/env python3
"""
The access manager is intended to be run as a regular cronjob which takes care of updating the user
role bindings in the database. This is required to reflect changes in the origin of the respective
role bindings, e.g. a change of the GitHub organisation and/or team membership.
"""
import asyncio
import atexit
import collections
import collections.abc
import enum
import logging
import urllib.parse
import dacite
import github3.github
import sqlalchemy as sa
import sqlalchemy.ext.asyncio as sqlasync
import sqlalchemy.orm
import ci.log
import ctx_util
import deliverydb
import deliverydb.model as dm
import k8s.logging
import lookups
import odg.extensions_cfg
import odg.util
import secret_mgmt.oauth_cfg
import util
logger = logging.getLogger(__name__)
ci.log.configure_default_logging()
k8s.logging.configure_kubernetes_logging()
github_host = str
github_org_name = str
github_team_name = str
github_members = set[str]
def resolve_github_orgs_and_teams(
oauth_cfgs: collections.abc.Iterable[secret_mgmt.oauth_cfg.OAuthCfg],
github_api_lookup: collections.abc.Callable[[str], github3.github.GitHub | None],
) -> tuple[
dict[github_host, dict[github_org_name, github_members]],
dict[github_host, dict[github_team_name, github_members]],
]:
"""
Resolves all members of the GitHub organisations and/or teams specified in the role bindings of
the specified `oauth_cfgs`.
"""
github_orgs_by_hostname = collections.defaultdict(dict)
github_teams_by_hostname = collections.defaultdict(dict)
for oauth_cfg in oauth_cfgs:
github_host = urllib.parse.urlparse(oauth_cfg.api_url).hostname.lower()
for role_binding in oauth_cfg.role_bindings:
for subject in role_binding.subjects:
if subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_ORG:
github_org = util.urljoin(github_host, subject.name)
github_api = github_api_lookup(github_org)
organisation = github_api.organization(subject.name)
github_orgs_by_hostname[github_host][subject.name] = {
member.login for member in organisation.members()
}
elif subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_TEAM:
org_name, team_name = subject.name.split('/')
github_org = util.urljoin(github_host, org_name)
github_api = github_api_lookup(github_org)
organisation = github_api.organization(org_name)
team = organisation.team_by_name(team_name)
github_teams_by_hostname[github_host][subject.name] = {
member.login for member in team.members()
}
return github_orgs_by_hostname, github_teams_by_hostname
def iter_github_user_role_bindings(
identifier: dm.GitHubUserIdentifier,
oauth_cfgs: collections.abc.Iterable[secret_mgmt.oauth_cfg.OAuthCfg],
github_orgs_by_hostname: dict[github_host, dict[github_org_name, github_members]],
github_teams_by_hostname: dict[github_host, dict[github_team_name, github_members]],
) -> collections.abc.Iterable[dm.RoleBinding]:
for oauth_cfg in oauth_cfgs:
github_host = urllib.parse.urlparse(oauth_cfg.api_url).hostname.lower()
if identifier.hostname != github_host:
continue
def find_github_subject(
subjects: list[secret_mgmt.oauth_cfg.Subject],
) -> secret_mgmt.oauth_cfg.Subject | None:
for subject in subjects:
if subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_USER:
if subject.matches(identifier.username):
return subject
elif subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_ORG:
for member in github_orgs_by_hostname[github_host][subject.name]:
if member == identifier.username:
return subject
elif subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_TEAM:
for member in github_teams_by_hostname[github_host][subject.name]:
if member == identifier.username:
return subject
for role_binding in oauth_cfg.role_bindings:
if not (subject := find_github_subject(subjects=role_binding.subjects)):
continue
github_username = None
github_organisation = None
github_team = None
if subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_USER:
github_username = subject.name
elif subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_ORG:
github_organisation = subject.name
elif subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_TEAM:
github_team = subject.name
else:
raise ValueError(subject.type)
yield from (
dm.RoleBinding(
name=role,
origin=dm.GitHubRoleBindingOrigin(
hostname=github_host,
organisation=github_organisation,
team=github_team,
username=github_username,
),
)
for role in role_binding.roles
)
def iter_github_app_role_bindings(
identifier: dm.GitHubAppIdentifier,
oauth_cfgs: collections.abc.Iterable[secret_mgmt.oauth_cfg.OAuthCfg],
) -> collections.abc.Iterable[dm.RoleBinding]:
for oauth_cfg in oauth_cfgs:
github_host = urllib.parse.urlparse(oauth_cfg.api_url).hostname.lower()
if identifier.hostname != github_host:
continue
def find_github_subject(
subjects: list[secret_mgmt.oauth_cfg.Subject],
) -> secret_mgmt.oauth_cfg.Subject | None:
for subject in subjects:
if subject.type is secret_mgmt.oauth_cfg.SubjectType.GITHUB_APP:
if subject.matches(identifier.app_name):
return subject
for role_binding in oauth_cfg.role_bindings:
if not (subject := find_github_subject(subjects=role_binding.subjects)):
continue
yield from (
dm.RoleBinding(
name=role,
origin=dm.GitHubRoleBindingOrigin(
hostname=github_host,
app=subject.name,
),
)
for role in role_binding.roles
)
def update_github_role_bindings(
identifiers: collections.abc.Iterable[dm.UserIdentifiers],
role_bindings: collections.abc.Sequence[dm.RoleBinding],
oauth_cfgs: collections.abc.Iterable[secret_mgmt.oauth_cfg.OAuthCfg],
github_orgs_by_hostname: dict[github_host, dict[github_org_name, github_members]],
github_teams_by_hostname: dict[github_host, dict[github_team_name, github_members]],
) -> list[dm.RoleBinding]:
"""
Returns an updated list of `role_bindings` by removing existing role bindings which originate
from GitHub and adding new role bindings based on currently active memberships in GitHub
organisations and/or teams.
"""
role_bindings = [
role_binding
for role_binding in role_bindings
if role_binding.origin.type != dm.RoleBindingOriginType.GITHUB
]
github_oauth_cfgs = [
oauth_cfg
for oauth_cfg in oauth_cfgs
if oauth_cfg.type is secret_mgmt.oauth_cfg.OAuthCfgTypes.GITHUB
]
github_role_bindings = set()
for identifier in identifiers:
if identifier.type != secret_mgmt.oauth_cfg.OAuthCfgTypes.GITHUB:
continue
identifier = identifier.deserialised_identifier
if isinstance(identifier, dm.GitHubUserIdentifier):
github_role_bindings.update(
iter_github_user_role_bindings(
identifier=identifier,
oauth_cfgs=github_oauth_cfgs,
github_orgs_by_hostname=github_orgs_by_hostname,
github_teams_by_hostname=github_teams_by_hostname,
),
)
elif isinstance(identifier, dm.GitHubAppIdentifier):
github_role_bindings.update(
iter_github_app_role_bindings(
identifier=identifier,
oauth_cfgs=github_oauth_cfgs,
),
)
role_bindings.extend(github_role_bindings)
return role_bindings
async def update_user_role_bindings(
oauth_cfgs: collections.abc.Iterable[secret_mgmt.oauth_cfg.OAuthCfg],
db_session: sqlasync.session.AsyncSession,
github_api_lookup: collections.abc.Callable[[str], github3.github.GitHub | None],
):
db_statement = sa.select(dm.User).options(sqlalchemy.orm.selectinload(dm.User.identifiers))
users = (await db_session.execute(db_statement)).all()
github_orgs_by_hostname, github_teams_by_hostname = resolve_github_orgs_and_teams(
oauth_cfgs=oauth_cfgs,
github_api_lookup=github_api_lookup,
)
try:
for user in users:
user: dm.User = user[0]
role_bindings = [
dacite.from_dict(
data_class=dm.RoleBinding,
data=role_binding_raw,
config=dacite.Config(
cast=[enum.Enum],
),
)
for role_binding_raw in user.role_bindings
]
len_role_bindings_before = len(role_bindings)
role_bindings = update_github_role_bindings(
identifiers=user.identifiers,
role_bindings=role_bindings,
oauth_cfgs=oauth_cfgs,
github_orgs_by_hostname=github_orgs_by_hostname,
github_teams_by_hostname=github_teams_by_hostname,
)
len_role_bindings_after = len(role_bindings)
user.role_bindings = util.dict_serialisation(role_bindings)
logger.info(
f'updated user {user.id} ({len_role_bindings_before=}, {len_role_bindings_after=})',
)
await db_session.commit()
except:
await db_session.rollback()
raise
async def main():
parsed_arguments = odg.util.parse_args(
arguments=(
odg.util.Arguments.K8S_CFG_NAME,
odg.util.Arguments.KUBECONFIG,
odg.util.Arguments.K8S_NAMESPACE,
),
)
namespace = parsed_arguments.k8s_namespace
secret_factory = ctx_util.secret_factory()
kubernetes_api = odg.util.kubernetes_api(parsed_arguments, secret_factory=secret_factory)
k8s.logging.init_logging_thread(
service=odg.extensions_cfg.Services.ACCESS_MANAGER,
namespace=namespace,
kubernetes_api=kubernetes_api,
)
atexit.register(
k8s.logging.log_to_crd,
service=odg.extensions_cfg.Services.ACCESS_MANAGER,
namespace=namespace,
kubernetes_api=kubernetes_api,
)
delivery_db_secrets = secret_factory.delivery_db()
if len(delivery_db_secrets) != 1:
raise ValueError(
f'There must be exactly one delivery-db secret, found {len(delivery_db_secrets)}',
)
db_url = delivery_db_secrets[0].connection_url(
namespace=namespace,
)
oauth_cfgs = secret_factory.oauth_cfg()
github_api_lookup = lookups.github_api_lookup()
db_session = await deliverydb.sqlalchemy_session(db_url)
try:
await update_user_role_bindings(
oauth_cfgs=oauth_cfgs,
db_session=db_session,
github_api_lookup=github_api_lookup,
)
finally:
await db_session.close()
if __name__ == '__main__':
asyncio.run(main())