Skip to content

Commit 771677f

Browse files
committed
Sign packages concurrently when adding to repo
Assisted By: Claude Sonnet 4.5
1 parent d47ed1f commit 771677f

File tree

3 files changed

+133
-72
lines changed

3 files changed

+133
-72
lines changed

pulp_deb/app/models/signing_service.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,26 @@ def sign(
198198
_env_vars["PULP_SIGNING_KEY_FINGERPRINT"] = pubkey_fingerprint
199199
return super().sign(filename, _env_vars)
200200

201+
async def asign(
202+
self,
203+
filename: str,
204+
env_vars: Optional[dict] = None,
205+
pubkey_fingerprint: Optional[str] = None,
206+
):
207+
"""
208+
Asynchronously sign a package @filename using @pubkey_fingerprint.
209+
210+
Args:
211+
filename: The absolute path to the package to be signed.
212+
env_vars: (optional) Dict of env_vars to be passed to the signing script.
213+
pubkey_fingerprint: The V4 fingerprint that correlates with the private key to use.
214+
"""
215+
if not pubkey_fingerprint:
216+
raise ValueError("A pubkey_fingerprint must be provided.")
217+
_env_vars = env_vars or {}
218+
_env_vars["PULP_SIGNING_KEY_FINGERPRINT"] = pubkey_fingerprint
219+
return await super().asign(filename, _env_vars)
220+
201221
def validate(self):
202222
"""
203223
Validate a signing service for an Apt package signature.

pulp_deb/app/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@
99
FORBIDDEN_CHECKSUM_WARNINGS = True
1010
FORCE_IGNORE_MISSING_PACKAGE_INDICES = False
1111
PERMISSIVE_SYNC = False
12+
MAX_PACKAGE_SIGNING_WORKERS = 5

pulp_deb/app/tasks/signing.py

Lines changed: 112 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import asyncio
12
from pathlib import Path
23
from tempfile import NamedTemporaryFile
34

5+
from django.conf import settings
46
from django.db.models import Q
57

68
from pulpcore.plugin.models import (
@@ -43,12 +45,7 @@ def _save_upload(uploadobj, final_package):
4345
final_package.flush()
4446

4547

46-
def _sign_file(package_file, signing_service, signing_fingerprint):
47-
logging.info(
48-
_("Signing package %s with fingerprint %s"), package_file.name, signing_fingerprint
49-
)
50-
result = signing_service.sign(package_file.name, pubkey_fingerprint=signing_fingerprint)
51-
signed_package_path = Path(result["deb_package"])
48+
def _create_signed_artifact(signed_package_path, result):
5249
if not signed_package_path.exists():
5350
raise Exception(f"Signing script did not create the signed package: {result}")
5451
artifact = Artifact.init_and_validate(str(signed_package_path))
@@ -58,6 +55,15 @@ def _sign_file(package_file, signing_service, signing_fingerprint):
5855
return artifact
5956

6057

58+
async def _sign_file(package_file, signing_service, signing_fingerprint):
59+
logging.info(
60+
_("Signing package %s with fingerprint %s"), package_file.name, signing_fingerprint
61+
)
62+
result = await signing_service.asign(package_file.name, pubkey_fingerprint=signing_fingerprint)
63+
signed_package_path = Path(result["deb_package"])
64+
return await asyncio.to_thread(_create_signed_artifact, signed_package_path, result)
65+
66+
6167
def sign_and_create(
6268
app_label,
6369
serializer_name,
@@ -79,7 +85,9 @@ def sign_and_create(
7985
uploaded_package = Upload.objects.get(pk=temporary_file_pk)
8086
_save_upload(uploaded_package, final_package)
8187

82-
artifact = _sign_file(final_package, package_signing_service, signing_fingerprint)
88+
artifact = asyncio.run(
89+
_sign_file(final_package, package_signing_service, signing_fingerprint)
90+
)
8391
uploaded_package.delete()
8492
# Create Package content
8593
data["artifact"] = get_url(artifact)
@@ -93,35 +101,66 @@ def sign_and_create(
93101
general_create(app_label, serializer_name, data=data, context=context, *args, **kwargs)
94102

95103

96-
def _update_content_units(content_units, old_pk, new_pk):
97-
while str(old_pk) in content_units:
98-
content_units.remove(str(old_pk))
104+
def _sign_package(package, signing_service, signing_fingerprint, package_release_map):
105+
"""
106+
Sign a package or reuse an existing signed result.
99107
100-
if str(new_pk) not in content_units:
101-
content_units.append(str(new_pk))
108+
Returns None if already signed with the fingerprint, otherwise a
109+
tuple of (original_package_id, new_package_id, prcs_to_update).
110+
"""
111+
content_artifact = package.contentartifact_set.first()
112+
artifact_obj = content_artifact.artifact
113+
package_id = str(package.pk)
102114

103-
# Repoint PackageReleaseComponents included in this transaction to the new package.
104-
for prc in PackageReleaseComponent.objects.filter(pk__in=content_units, package_id=old_pk):
105-
new_prc, _ = PackageReleaseComponent.objects.get_or_create(
106-
release_component=prc.release_component,
107-
package_id=new_pk,
108-
_pulp_domain=prc._pulp_domain,
109-
)
110-
111-
while str(prc.pk) in content_units:
112-
content_units.remove(str(prc.pk))
115+
with NamedTemporaryFile(mode="wb", dir=".", delete=False) as final_package:
116+
artifact_file = artifact_obj.file
117+
_save_file(artifact_file, final_package)
113118

114-
if str(new_prc.pk) not in content_units:
115-
content_units.append(str(new_prc.pk))
119+
# check if the package is already signed with our fingerprint
120+
try:
121+
signing_service.validate_signature(final_package.name)
122+
return None
123+
except (UnsignedPackage, InvalidSignature, FingerprintMismatch):
124+
pass
125+
126+
# Collect PackageReleaseComponents that need to be updated
127+
prcs_to_update = list(
128+
PackageReleaseComponent.objects.filter(
129+
package_id=package_id, _pulp_domain=package._pulp_domain
130+
)
131+
)
116132

133+
# check if the package has been signed in the past with our fingerprint
134+
if existing_result := DebPackageSigningResult.objects.filter(
135+
sha256=content_artifact.artifact.sha256,
136+
package_signing_fingerprint=signing_fingerprint,
137+
).first():
138+
return (package_id, str(existing_result.result.pk), prcs_to_update)
139+
140+
# create a new signed version of the package
141+
log.info(f"Signing package {package.name}.")
142+
artifact = asyncio.run(_sign_file(final_package, signing_service, signing_fingerprint))
143+
signed_package = package
144+
signed_package.pk = None
145+
signed_package.pulp_id = None
146+
signed_package.sha256 = artifact.sha256
147+
signed_package.save()
148+
ContentArtifact.objects.create(
149+
artifact=artifact,
150+
content=signed_package,
151+
relative_path=content_artifact.relative_path,
152+
)
153+
DebPackageSigningResult.objects.create(
154+
sha256=artifact_obj.sha256,
155+
package_signing_fingerprint=signing_fingerprint,
156+
result=signed_package,
157+
)
117158

118-
def _check_package_signature(repository, package_path):
119-
try:
120-
repository.package_signing_service.validate_signature(package_path)
121-
except (UnsignedPackage, InvalidSignature, FingerprintMismatch):
122-
return False
159+
resource = CreatedResource(content_object=signed_package)
160+
resource.save()
161+
log.info(f"Signed package {package.name}.")
123162

124-
return True
163+
return (package_id, str(signed_package.pk), prcs_to_update)
125164

126165

127166
def signed_add_and_remove(
@@ -136,53 +175,54 @@ def signed_add_and_remove(
136175
).select_related("package", "release_component")
137176
package_release_map = {prc.package_id: prc.release_component.distribution for prc in prcs}
138177

139-
# sign each package and replace it in the add_content_units list
178+
# Prepare package list with their fingerprints
179+
packages = []
140180
for package in Package.objects.filter(pk__in=add_content_units):
141-
content_artifact = package.contentartifact_set.first()
142-
artifact_obj = content_artifact.artifact
143-
package_id = package.pk
144-
145181
# match the package's release to a fingerprint override if one exists
146182
fingerprint = repo.release_package_signing_fingerprint(
147-
package_release_map.get(package_id)
183+
package_release_map.get(package.pk)
148184
)
149-
150-
with NamedTemporaryFile(mode="wb", dir=".", delete=False) as final_package:
151-
artifact_file = artifact_obj.file
152-
_save_file(artifact_file, final_package)
153-
154-
# check if the package is already signed with our fingerprint
155-
if _check_package_signature(repo, final_package.name):
156-
continue
157-
158-
# check if the package has been signed in the past with our fingerprint
159-
if existing_result := DebPackageSigningResult.objects.filter(
160-
sha256=content_artifact.artifact.sha256,
161-
package_signing_fingerprint=fingerprint,
162-
).first():
163-
_update_content_units(add_content_units, package_id, existing_result.result.pk)
164-
continue
165-
166-
# create a new signed version of the package
167-
artifact = _sign_file(final_package, repo.package_signing_service, fingerprint)
168-
signed_package = package
169-
signed_package.pk = None
170-
signed_package.pulp_id = None
171-
signed_package.sha256 = artifact.sha256
172-
signed_package.save()
173-
ContentArtifact.objects.create(
174-
artifact=artifact,
175-
content=signed_package,
176-
relative_path=content_artifact.relative_path,
177-
)
178-
DebPackageSigningResult.objects.create(
179-
sha256=artifact_obj.sha256,
180-
package_signing_fingerprint=fingerprint,
181-
result=signed_package,
185+
packages.append((package, fingerprint))
186+
187+
async def _sign_packages():
188+
semaphore = asyncio.Semaphore(settings.MAX_PACKAGE_SIGNING_WORKERS)
189+
190+
async def _bounded_sign(pkg_tuple):
191+
pkg, fingerprint = pkg_tuple
192+
async with semaphore:
193+
return await asyncio.to_thread(
194+
_sign_package,
195+
pkg,
196+
repo.package_signing_service,
197+
fingerprint,
198+
package_release_map,
199+
)
200+
201+
return await asyncio.gather(*(_bounded_sign(pkg_tuple) for pkg_tuple in packages))
202+
203+
for result in asyncio.run(_sign_packages()):
204+
if not result:
205+
continue
206+
old_id, new_id, prcs_to_update = result
207+
208+
# Update the add_content_units list with the new package
209+
while old_id in add_content_units:
210+
add_content_units.remove(old_id)
211+
if new_id not in add_content_units:
212+
add_content_units.append(new_id)
213+
214+
# Repoint PackageReleaseComponents that were collected during signing
215+
for prc in prcs_to_update:
216+
new_prc, _ = PackageReleaseComponent.objects.get_or_create(
217+
release_component=prc.release_component,
218+
package_id=new_id,
219+
_pulp_domain=prc._pulp_domain,
182220
)
183221

184-
_update_content_units(add_content_units, package_id, signed_package.pk)
185-
resource = CreatedResource(content_object=signed_package)
186-
resource.save()
222+
while str(prc.pk) in add_content_units:
223+
add_content_units.remove(str(prc.pk))
224+
225+
if str(new_prc.pk) not in add_content_units:
226+
add_content_units.append(str(new_prc.pk))
187227

188228
return add_and_remove(repository_pk, add_content_units, remove_content_units, base_version_pk)

0 commit comments

Comments
 (0)