|
1 | 1 | import os |
| 2 | +from pathlib import Path |
| 3 | +import shutil |
| 4 | +import subprocess |
| 5 | +from typing import Optional |
2 | 6 | import gnupg |
3 | 7 | import tempfile |
4 | 8 |
|
5 | 9 | from pulpcore.plugin.models import SigningService |
| 10 | +from importlib.resources import files |
| 11 | + |
| 12 | + |
| 13 | +def prepare_gpg(temp_directory_name, public_key, pubkey_fingerprint): |
| 14 | + # Prepare GPG: |
| 15 | + # gpg = gnupg.GPG(gnupghome=temp_directory_name) |
| 16 | + gpg = gnupg.GPG(keyring=str(Path(temp_directory_name) / ".keyring")) |
| 17 | + gpg.import_keys(public_key) |
| 18 | + imported_keys = gpg.list_keys() |
| 19 | + |
| 20 | + if len(imported_keys) != 1: |
| 21 | + message = "We have imported more than one key! Aborting validation!" |
| 22 | + raise RuntimeError(message) |
| 23 | + |
| 24 | + if imported_keys[0]["fingerprint"] != pubkey_fingerprint: |
| 25 | + message = "The signing service fingerprint does not appear to match its public key!" |
| 26 | + raise RuntimeError(message) |
| 27 | + return gpg |
6 | 28 |
|
7 | 29 |
|
8 | 30 | class AptReleaseSigningService(SigningService): |
@@ -70,19 +92,7 @@ def validate(self): |
70 | 92 | raise RuntimeError(message.format(signature_file, signature_type)) |
71 | 93 |
|
72 | 94 | # Prepare GPG: |
73 | | - gpg = gnupg.GPG(gnupghome=temp_directory_name) |
74 | | - gpg.import_keys(self.public_key) |
75 | | - imported_keys = gpg.list_keys() |
76 | | - |
77 | | - if len(imported_keys) != 1: |
78 | | - message = "We have imported more than one key! Aborting validation!" |
79 | | - raise RuntimeError(message) |
80 | | - |
81 | | - if imported_keys[0]["fingerprint"] != self.pubkey_fingerprint: |
82 | | - message = ( |
83 | | - "The signing service fingerprint does not appear to match its public key!" |
84 | | - ) |
85 | | - raise RuntimeError(message) |
| 95 | + gpg = prepare_gpg(temp_directory_name, self.public_key, self.pubkey_fingerprint) |
86 | 96 |
|
87 | 97 | # Verify InRelease file |
88 | 98 | inline_path = signatures.get("inline") |
@@ -138,3 +148,110 @@ def validate(self): |
138 | 148 | if verified.pubkey_fingerprint != self.pubkey_fingerprint: |
139 | 149 | message = "'{}' appears to have been signed using the wrong key!" |
140 | 150 | raise RuntimeError(message.format(detached_path)) |
| 151 | + |
| 152 | + |
| 153 | +class AptPackageSigningService(SigningService): |
| 154 | + """ |
| 155 | + A model used for signing Apt packages. |
| 156 | +
|
| 157 | + The pubkey_fingerprint should be passed explicitly in the sign method. |
| 158 | + """ |
| 159 | + |
| 160 | + def _env_variables(self, env_vars=None): |
| 161 | + # Prevent the signing service pubkey to be used for signing a package. |
| 162 | + # The pubkey should be provided explicitly. |
| 163 | + _env_vars = {"PULP_SIGNING_KEY_FINGERPRINT": None} |
| 164 | + if env_vars: |
| 165 | + _env_vars.update(env_vars) |
| 166 | + return super()._env_variables(_env_vars) |
| 167 | + |
| 168 | + def sign( |
| 169 | + self, |
| 170 | + filename: str, |
| 171 | + env_vars: Optional[dict] = None, |
| 172 | + pubkey_fingerprint: Optional[str] = None, |
| 173 | + ): |
| 174 | + """ |
| 175 | + Sign a package @filename using @pubkey_fingerprint. |
| 176 | +
|
| 177 | + Args: |
| 178 | + filename: The absolute path to the package to be signed. |
| 179 | + env_vars: (optional) Dict of env_vars to be passed to the signing script. |
| 180 | + pubkey_fingerprint: The V4 fingerprint that correlates with the private key to use. |
| 181 | + """ |
| 182 | + if not pubkey_fingerprint: |
| 183 | + raise ValueError("A pubkey_fingerprint must be provided.") |
| 184 | + _env_vars = env_vars or {} |
| 185 | + _env_vars["PULP_SIGNING_KEY_FINGERPRINT"] = pubkey_fingerprint |
| 186 | + return super().sign(filename, _env_vars) |
| 187 | + |
| 188 | + def validate(self): |
| 189 | + """ |
| 190 | + Validate a signing service for an Apt package signature. |
| 191 | +
|
| 192 | + Specifically, it validates that self.signing_script can sign an apt package with |
| 193 | + the sample key self.pubkey and that the self.sign() method returns: |
| 194 | +
|
| 195 | + ```json |
| 196 | + {"apt_package": "<path/to/package.deb>"} |
| 197 | + ``` |
| 198 | +
|
| 199 | + Recreates the check that "debsig-verify" would be doing because debsig-verify is |
| 200 | + complicated to set up correctly, and doing so would add a dependency that is not available |
| 201 | + on rpm-based systems. |
| 202 | + """ |
| 203 | + with tempfile.TemporaryDirectory() as temp_directory_name: |
| 204 | + # copy test deb package |
| 205 | + sample_deb = shutil.copy( |
| 206 | + files("pulp_deb").joinpath("tests/functional/data/packages/frigg_1.0_ppc64.deb"), |
| 207 | + temp_directory_name, |
| 208 | + ) |
| 209 | + return_value = self.sign(sample_deb, pubkey_fingerprint=self.pubkey_fingerprint) |
| 210 | + try: |
| 211 | + signed_deb = return_value["deb_package"] |
| 212 | + except KeyError: |
| 213 | + raise Exception(f"Malformed output from signing script: {return_value}") |
| 214 | + |
| 215 | + # Prepare GPG: |
| 216 | + gpg = prepare_gpg(temp_directory_name, self.public_key, self.pubkey_fingerprint) |
| 217 | + |
| 218 | + self._validate_deb_package( |
| 219 | + signed_deb, self.pubkey_fingerprint, temp_directory_name, gpg |
| 220 | + ) |
| 221 | + |
| 222 | + @staticmethod |
| 223 | + def _validate_deb_package( |
| 224 | + deb_package_path: str, pubkey_fingerprint: str, temp_directory_name: str, gpg: gnupg.GPG |
| 225 | + ): |
| 226 | + """ |
| 227 | + Validate that the deb package at @deb_package_path is correctly signed. |
| 228 | +
|
| 229 | + This is a placeholder for future validation logic if needed. |
| 230 | + """ |
| 231 | + # unpack the archive |
| 232 | + cmd = ["ar", "x", deb_package_path] |
| 233 | + res = subprocess.run(cmd, cwd=temp_directory_name, capture_output=True) |
| 234 | + if res.returncode != 0: |
| 235 | + raise Exception(f"Failed to read package {deb_package_path}. Please check the package.") |
| 236 | + |
| 237 | + # cat the unpacked archive bits together |
| 238 | + temp_dir = Path(temp_directory_name) |
| 239 | + with (temp_dir / "combined").open("wb") as combined: |
| 240 | + for filename in ("debian-binary", "control.*", "data.*"): |
| 241 | + # There will only be one control.tar.gz (or whatever) file, but we have to glob |
| 242 | + # and iterate because the compression type can vary. |
| 243 | + for x in temp_dir.glob(filename): |
| 244 | + with x.open("rb") as f: |
| 245 | + shutil.copyfileobj(f, combined) |
| 246 | + |
| 247 | + # verify combined data with _gpgorigin detached signature |
| 248 | + with (temp_dir / "_gpgorigin").open("rb") as gpgorigin: |
| 249 | + verified = gpg.verify_file(gpgorigin, str(temp_dir / "combined")) |
| 250 | + if not verified.valid: |
| 251 | + raise Exception( |
| 252 | + f"GPG Verification of the signed package {deb_package_path} failed!" |
| 253 | + ) |
| 254 | + if verified.pubkey_fingerprint != pubkey_fingerprint: |
| 255 | + raise Exception( |
| 256 | + f"'{deb_package_path}' appears to have been signed using the wrong key!" |
| 257 | + ) |
0 commit comments