|
| 1 | +"""Example of using authlib crypto with file-based secrets storage.""" |
| 2 | + |
| 3 | +import asyncio |
| 4 | +import json |
| 5 | +import tempfile |
| 6 | +from pathlib import Path |
| 7 | + |
| 8 | +from authlib.jose import OKPKey |
| 9 | + |
| 10 | +from didcomm_messaging.crypto.backend.authlib import ( |
| 11 | + AuthlibCryptoService, |
| 12 | + AuthlibSecretKey, |
| 13 | +) |
| 14 | +from didcomm_messaging.crypto.backend.basic import FileBasedSecretsManager |
| 15 | +from didcomm_messaging.multiformats.multibase import Base64UrlEncoder |
| 16 | +from didcomm_messaging.packaging import PackagingService |
| 17 | +from didcomm_messaging.resolver import PrefixResolver |
| 18 | +from didcomm_messaging.resolver.jwk import JWKResolver |
| 19 | + |
| 20 | +b64 = Base64UrlEncoder() |
| 21 | + |
| 22 | + |
| 23 | +def create_jwk_did(jwk: dict) -> str: |
| 24 | + """Create a did:jwk from a JWK dict.""" |
| 25 | + encoded = b64.encode(json.dumps(jwk).encode()) |
| 26 | + return f"did:jwk:{encoded}" |
| 27 | + |
| 28 | + |
| 29 | +async def main(): |
| 30 | + """Run the example.""" |
| 31 | + # Create a temporary file for secrets storage |
| 32 | + secrets_file = tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False) |
| 33 | + secrets_path = secrets_file.name |
| 34 | + secrets_file.close() |
| 35 | + |
| 36 | + # Serializer: Convert AuthlibSecretKey to JWK dict |
| 37 | + def serialize_secret(secret: AuthlibSecretKey) -> dict: |
| 38 | + return secret.key.as_dict(is_private=True) |
| 39 | + |
| 40 | + # Deserializer: Convert JWK dict back to AuthlibSecretKey |
| 41 | + def deserialize_secret(kid: str, data: dict) -> AuthlibSecretKey: |
| 42 | + key = OKPKey.import_key(data) |
| 43 | + return AuthlibSecretKey(key, kid) |
| 44 | + |
| 45 | + # Create the file-based secrets manager |
| 46 | + secrets = FileBasedSecretsManager(secrets_path, serialize_secret, deserialize_secret) |
| 47 | + |
| 48 | + # Generate keys for sender and recipient |
| 49 | + # Using X25519 for both since it supports key agreement (encryption) |
| 50 | + sender_sk = OKPKey.generate_key("X25519", is_private=True) |
| 51 | + recipient_sk = OKPKey.generate_key("X25519", is_private=True) |
| 52 | + |
| 53 | + # Get JWKs and create DIDs |
| 54 | + # For 1PU (authenticated encryption), we need key agreement keys |
| 55 | + sender_jwk = {**sender_sk.as_dict(), "use": "enc"} |
| 56 | + recipient_jwk = {**recipient_sk.as_dict(), "use": "enc"} |
| 57 | + |
| 58 | + sender_did = create_jwk_did(sender_jwk) |
| 59 | + recipient_did = create_jwk_did(recipient_jwk) |
| 60 | + |
| 61 | + # Add keys to secrets manager with proper kids |
| 62 | + sender_secret = AuthlibSecretKey(sender_sk, f"{sender_did}#0") |
| 63 | + recipient_secret = AuthlibSecretKey(recipient_sk, f"{recipient_did}#0") |
| 64 | + |
| 65 | + await secrets.add_secret(sender_secret) |
| 66 | + await secrets.add_secret(recipient_secret) |
| 67 | + |
| 68 | + # Set up crypto and resolver |
| 69 | + crypto = AuthlibCryptoService() |
| 70 | + resolver = PrefixResolver({"did:jwk": JWKResolver()}) |
| 71 | + packer = PackagingService() |
| 72 | + |
| 73 | + message = b"Hello, secure world!" |
| 74 | + |
| 75 | + # Pack the message using authenticated encryption (ECDH-1PU) |
| 76 | + # Requires both sender and recipient to have key agreement keys |
| 77 | + packed = await packer.pack( |
| 78 | + crypto=crypto, |
| 79 | + resolver=resolver, |
| 80 | + secrets=secrets, |
| 81 | + message=message, |
| 82 | + to=[recipient_did], |
| 83 | + frm=sender_did, |
| 84 | + ) |
| 85 | + print("Packed message:") |
| 86 | + print(json.dumps(json.loads(packed), indent=2)) |
| 87 | + |
| 88 | + # Flush secrets to file |
| 89 | + await secrets.flush() |
| 90 | + |
| 91 | + # Show the contents of the secrets file |
| 92 | + print("\nSecrets file contents:") |
| 93 | + with open(secrets_path) as f: |
| 94 | + for line in f: |
| 95 | + print(line.strip()) |
| 96 | + |
| 97 | + # Create a new secrets manager that loads from the file |
| 98 | + # This exercises the deserializer |
| 99 | + print("\n--- Creating new secrets manager from file ---") |
| 100 | + secrets2 = FileBasedSecretsManager(secrets_path, serialize_secret, deserialize_secret) |
| 101 | + |
| 102 | + # Unpack the message using the newly loaded secrets |
| 103 | + plaintext, metadata = await packer.unpack( |
| 104 | + crypto=crypto, |
| 105 | + resolver=resolver, |
| 106 | + secrets=secrets2, |
| 107 | + enc_message=packed, |
| 108 | + ) |
| 109 | + print("\nUnpacked message:") |
| 110 | + print(plaintext) |
| 111 | + |
| 112 | + # Verify the message matches |
| 113 | + assert plaintext == message |
| 114 | + print("\nSuccess! Round-trip completed with deserialized secrets.") |
| 115 | + |
| 116 | + # Clean up |
| 117 | + Path(secrets_path).unlink() |
| 118 | + |
| 119 | + |
| 120 | +if __name__ == "__main__": |
| 121 | + asyncio.run(main()) |
0 commit comments