Skip to content

Commit 15984ae

Browse files
committed
feat: refactor GithubPolicyRepo by removing SSH key handling and cleanup methods
1 parent 5717770 commit 15984ae

File tree

2 files changed

+399
-186
lines changed

2 files changed

+399
-186
lines changed
Lines changed: 372 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,372 @@
1+
# import codecs
2+
# import logging
3+
# import os
4+
# import random
5+
# import shutil
6+
# import subprocess
7+
8+
# import requests
9+
# from git import GitCommandError, Repo
10+
# from github import Auth, Github
11+
# from testcontainers.core.utils import setup_logger
12+
13+
# from tests import utils
14+
# from tests.policy_repos.github_policy_settings import GithubPolicyRepoSettings
15+
# from tests.policy_repos.policy_repo_base import PolicyRepoBase
16+
# from tests.policy_repos.policy_repo_settings import PolicyRepoSettings
17+
18+
19+
# class GithubPolicyRepo(PolicyRepoBase):
20+
# def __init__(
21+
# self,
22+
# settings: GithubPolicyRepoSettings,
23+
# logger: logging.Logger = setup_logger(__name__),
24+
# ):
25+
# self.logger = logger
26+
# self.settings = settings
27+
28+
# self.load_ssh_key()
29+
30+
# def load_ssh_key(self):
31+
32+
# if self.settings.opal_policy_repo_ssh_key_public and self.settings.opal_policy_repo_ssh_key_private:
33+
# self.ssh_key = self.settings.opal_policy_repo_ssh_key_public
34+
# self.private_key = self.settings.opal_policy_repo_ssh_key_private
35+
# return
36+
37+
# if self.settings.ssh_key_path.startswith("~"):
38+
# self.settings.ssh_key_path = os.path.expanduser(self.settings.ssh_key_path)
39+
40+
# if not os.path.exists(self.settings.ssh_key_path):
41+
# self.logger.debug(f"SSH key file not found at {self.settings.ssh_key_path}")
42+
43+
# self.logger.debug("Generating new SSH key...")
44+
# ssh_keys = utils.generate_ssh_key_pair()
45+
# self.ssh_key = ssh_keys["public"]
46+
# self.private_key = ssh_keys["private"]
47+
# return
48+
49+
# try:
50+
# with open(self.settings.ssh_key_path, "r") as ssh_key_file:
51+
# self.ssh_key = ssh_key_file.read().strip()
52+
53+
# os.environ["OPAL_POLICY_REPO_SSH_KEY"] = self.ssh_key
54+
# except Exception as e:
55+
# self.logger.error(f"Error loading SSH key: {e}")
56+
57+
# def setup_webhook(self, host, port):
58+
# self.webhook_host = host
59+
# self.webhook_port = port
60+
61+
# def set_envvars(self):
62+
# # Update .env file
63+
# with open(".env", "a") as env_file:
64+
# env_file.write(f'OPAL_POLICY_REPO_URL="{self.get_repo_url()}"\n')
65+
# env_file.write(f'OPAL_POLICY_REPO_BRANCH="{self.test_branch}"\n')
66+
67+
# with open(".env", "a") as env_file:
68+
# env_file.write(f'OPAL_POLICY_REPO_SSH_KEY="{self.ssh_key}"\n')
69+
70+
# def get_repo_url(self):
71+
# return self.build_repo_url(self.settings.owner, self.settings.repo)
72+
73+
# def build_repo_url(self, owner, repo) -> str:
74+
# if owner is None:
75+
# raise Exception("Owner not set")
76+
77+
# if self.settings.protocol == "ssh" or self.settings.protocol == "git":
78+
# return f"git@{self.settings.host}:{owner}/{repo}.git"
79+
80+
# if self.settings.protocol == "http" or self.settings.protocol == "https":
81+
# if self.settings.github_pat:
82+
# return f"{self.settings.protocol}://{self.settings.host}/{owner}/{repo}.git"
83+
84+
# if self.settings.password is None and self.settings.github_pat is None and self.ssh_key is None:
85+
# raise Exception("No authentication method set")
86+
87+
# return f"{self.settings.protocol}://{self.settings.owner}:{self.settings.password}@{self.settings.host}:{self.settings.port}/{owner}/{repo}"
88+
89+
# def get_source_repo_url(self):
90+
# return self.build_repo_url(self.settings.source_repo_owner, self.settings.source_repo_name)
91+
92+
# def clone_initial_repo(self):
93+
# Repo.clone_from(self.get_source_repo_url(), self.settings.local_repo_path)
94+
95+
# def check_repo_exists(self):
96+
# try:
97+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
98+
# repo_list = gh.get_user().get_repos()
99+
# for repo in repo_list:
100+
# if repo.full_name == self.settings.repo:
101+
# self.logger.debug(f"Repository {self.settings.repo} already exists.")
102+
# return True
103+
104+
# except Exception as e:
105+
# self.logger.error(f"Error checking repository existence: {e}")
106+
107+
# return False
108+
109+
# def create_target_repo(self):
110+
# if self.check_repo_exists():
111+
# return
112+
113+
# try:
114+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
115+
# gh.get_user().create_repo(self.settings.repo)
116+
# self.logger.info(f"Repository {self.settings.repo} created successfully.")
117+
# except Exception as e:
118+
# self.logger.error(f"Error creating repository: {e}")
119+
120+
# def fork_target_repo(self):
121+
# if self.check_repo_exists():
122+
# return
123+
124+
# self.logger.debug(f"Forking repository {self.settings.source_repo_name}...")
125+
126+
# if self.settings.github_pat is None:
127+
# try:
128+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
129+
# gh.get_user().create_fork(self.settings.source_repo_owner, self.settings.source_repo_name)
130+
# self.logger.info(
131+
# f"Repository {self.settings.source_repo_name} forked successfully."
132+
# )
133+
# except Exception as e:
134+
# self.logger.error(f"Error forking repository: {e}")
135+
# return
136+
137+
# # Try with PAT
138+
# try:
139+
# headers = {"Authorization": f"token {self.settings.github_pat}"}
140+
# response = requests.post(
141+
# f"https://api.github.com/repos/{self.settings.source_repo_owner}/{self.settings.source_repo_name}/forks",
142+
# headers=headers,
143+
# )
144+
# if response.status_code == 202:
145+
# self.logger.info("Fork created successfully!")
146+
# else:
147+
# self.logger.error(f"Error creating fork: {response.status_code}")
148+
# self.logger.debug(response.json())
149+
150+
# except Exception as e:
151+
# self.logger.error(f"Error forking repository: {str(e)}")
152+
153+
# def cleanup(self):
154+
# self.delete_test_branches()
155+
156+
# def delete_test_branches(self):
157+
# """Deletes all branches starting with 'test-' from the specified
158+
# repository."""
159+
160+
# try:
161+
# self.logger.info(f"Deleting test branches from {self.settings.repo}...")
162+
163+
# # Initialize Github API
164+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
165+
166+
# # Get the repository
167+
# repo = gh.get_user().get_repo(self.settings.repo)
168+
169+
# # Enumerate branches and delete pytest- branches
170+
# branches = repo.get_branches()
171+
# for branch in branches:
172+
# if branch.name.startswith("test-"):
173+
# ref = f"heads/{branch.name}"
174+
# repo.get_git_ref(ref).delete()
175+
# self.logger.info(f"Deleted branch: {branch.name}")
176+
# else:
177+
# self.logger.info(f"Skipping branch: {branch.name}")
178+
179+
# self.logger.info("All test branches have been deleted successfully.")
180+
# except Exception as e:
181+
# self.logger.error(f"An error occurred: {e}")
182+
183+
# return
184+
185+
# def generate_test_branch(self):
186+
# self.test_branch = (
187+
# f"test-{random.randint(1000, 9999)}{random.randint(1000, 9999)}"
188+
# )
189+
# os.environ["OPAL_POLICY_REPO_BRANCH"] = self.test_branch
190+
191+
# def create_test_branch(self):
192+
# try:
193+
# # Initialize the repository
194+
# repo = Repo(self.settings.local_repo_path)
195+
196+
# # Ensure the repository is clean
197+
# if repo.is_dirty(untracked_files=True):
198+
# raise RuntimeError(
199+
# "The repository has uncommitted changes. Commit or stash them before proceeding."
200+
# )
201+
202+
# # Set the origin remote URL
203+
# remote_url = f"https://github.com/{self.settings.owner}/{self.settings.repo}.git"
204+
# if "origin" in repo.remotes:
205+
# origin = repo.remote(name="origin")
206+
# origin.set_url(remote_url) # Update origin URL if it exists
207+
# else:
208+
# origin = repo.create_remote(
209+
# "origin", remote_url
210+
# ) # Create origin remote if it doesn't exist
211+
212+
# self.logger.debug(f"Origin set to: {remote_url}")
213+
214+
# # Create and checkout the new branch
215+
# new_branch = repo.create_head(self.test_branch) # Create branch
216+
# new_branch.checkout() # Switch to the new branch
217+
218+
# # Push the new branch to the remote
219+
# origin.push(refspec=f"{self.test_branch}:{self.test_branch}")
220+
221+
# self.logger.info(
222+
# f"Branch '{self.test_branch}' successfully created and pushed."
223+
# )
224+
# except GitCommandError as e:
225+
# self.logger.error(f"Git command failed: {e}")
226+
# except Exception as e:
227+
# self.logger.error(f"An error occurred: {e}")
228+
229+
# def cleanup(self, delete_repo=True, delete_ssh_key=True):
230+
# subprocess.run(["rm", "-rf", self.settings.local_repo_path], check=True)
231+
232+
# self.delete_test_branches()
233+
234+
# if delete_repo:
235+
# self.delete_repo()
236+
237+
# if delete_ssh_key:
238+
# self.delete_ssh_key()
239+
240+
# def delete_ssh_key(self):
241+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
242+
# user = gh.get_user()
243+
# keys = user.get_keys()
244+
# for key in keys:
245+
# if key.title == self.settings.ssh_key_name:
246+
# key.delete()
247+
# self.logger.debug(f"SSH key deleted: {key.title}")
248+
# break
249+
250+
# self.logger.debug("All OPAL SSH keys have been deleted successfully.")
251+
252+
# return
253+
254+
# def delete_repo(self):
255+
# try:
256+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
257+
# repo = gh.get_user().get_repo(self.settings.repo)
258+
# repo.delete()
259+
# self.logger.debug(f"Repository {self.settings.repo} deleted successfully.")
260+
# except Exception as e:
261+
# self.logger.error(f"Error deleting repository: {e}")
262+
263+
# def setup(self):
264+
# self.clone_initial_repo()
265+
266+
# if self.settings.should_fork:
267+
# self.fork_target_repo()
268+
# else:
269+
# self.create_target_repo()
270+
271+
# self.generate_test_branch()
272+
# self.create_test_branch()
273+
274+
# def add_ssh_key(self):
275+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
276+
# user = gh.get_user()
277+
# keys = user.get_keys()
278+
# for key in keys:
279+
# if key.title == self.settings.ssh_key_name:
280+
# return
281+
282+
# key = user.create_key(self.settings.ssh_key_name, self.ssh_key)
283+
# self.logger.info(f"SSH key added: {key.title}")
284+
285+
# def create_webhook(self):
286+
# try:
287+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
288+
# self.logger.info(
289+
# f"Creating webhook for repository {self.settings.owner}/{self.settings.repo}"
290+
# )
291+
# repo = gh.get_user().get_repo(f"{self.settings.repo}")
292+
# url = utils.create_localtunnel(self.webhook_port)
293+
# self.logger.info(f"Webhook URL: {url}")
294+
# self.github_webhook = repo.create_hook(
295+
# "web",
296+
# {
297+
# "url": f"{url}/webhook",
298+
# "content_type": "json",
299+
# f"secret": "abc123",
300+
# "insecure_ssl": "1",
301+
# },
302+
# events=["push"],
303+
# active=True,
304+
# )
305+
# self.logger.info("Webhook created successfully.")
306+
# except Exception as e:
307+
# self.logger.error(f"Error creating webhook: {e}")
308+
309+
# def delete_webhook(self):
310+
# try:
311+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
312+
# repo = gh.get_user().get_repo(f"{self.settings.repo}")
313+
# repo.delete_hook(self.github_webhook.id)
314+
# self.logger.info("Webhook deleted successfully.")
315+
# except Exception as e:
316+
# self.logger.error(f"Error deleting webhook: {e}")
317+
318+
# def update_branch(self, file_name, file_content):
319+
# self.logger.info(
320+
# f"Updating branch '{self.test_branch}' with file '{file_name}' content..."
321+
# )
322+
323+
# # Decode escape sequences in the file content
324+
# if file_content is not None:
325+
# file_content = codecs.decode(file_content, "unicode_escape")
326+
327+
# # Create or update the specified file with the provided content
328+
# file_path = os.path.join(self.settings.local_repo_path, file_name)
329+
# with open(file_path, "w") as f:
330+
# f.write(file_content)
331+
332+
# # if file_content is None:
333+
# # with open(file_path, "r") as f:
334+
# # file_content = f.read()
335+
336+
# try:
337+
# # Stage the changes
338+
# self.logger.debug(f"Staging changes for branch {self.test_branch}...")
339+
# gh = Github(auth=Auth.Token(self.settings.github_pat))
340+
# repo = gh.get_user().get_repo(self.settings.repo)
341+
# branch_ref = f"heads/{self.test_branch}"
342+
# ref = repo.get_git_ref(branch_ref)
343+
# latest_commit = repo.get_git_commit(ref.object.sha)
344+
# base_tree = latest_commit.tree
345+
# from github.InputGitTreeElement import InputGitTreeElement
346+
# new_tree = repo.create_git_tree(
347+
# [
348+
# InputGitTreeElement(
349+
# path=file_name,
350+
# mode="100644",
351+
# type="blob",
352+
# content=file_content,
353+
# )
354+
# ],
355+
# base_tree,
356+
# )
357+
# new_commit = repo.create_git_commit(
358+
# f"Commit changes for branch {self.test_branch}",
359+
# new_tree,
360+
# [latest_commit],
361+
# )
362+
# ref.edit(new_commit.sha)
363+
# self.logger.debug(f"Changes pushed for branch {self.test_branch}.")
364+
365+
# except Exception as e:
366+
# self.logger.error(f"Error updating branch: {e}")
367+
# return False
368+
369+
# return True
370+
371+
# def remove_webhook(self):
372+
# self.github_webhook.delete()

0 commit comments

Comments
 (0)