"""Utilities to create and resolve did:plcs, did:webs, and handles.
* https://www.w3.org/TR/did-core/
* https://atproto.com/specs/did-plc
* https://github.com/bluesky-social/did-method-plc
* https://w3c-ccg.github.io/did-method-web/
* https://atproto.com/specs/handle#handle-resolution
"""
import base64
from collections import namedtuple
import json
import logging
import os
import urllib.parse
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.hashes import Hash, SHA256
from cryptography.hazmat.primitives import serialization
from dns.exception import DNSException
from dns.rdatatype import TXT
import dns.resolver
import dag_cbor
from multiformats import multibase, multicodec
import requests
from . import util
DidPlc = namedtuple('DidPlc', [
'did', # str
'signing_key', # ec.EllipticCurvePrivateKey
'rotation_key', # ec.EllipticCurvePrivateKey
'doc', # dict, DID document
])
logger = logging.getLogger(__name__)
[docs]def resolve(did, **kwargs):
"""Resolves a ``did:plc`` or ``did:web``.
Args:
did (str):
kwargs: passed through to :func:`resolve_plc`/:func:`resolve_web`
Returns:
dict: JSON DID document
Raises:
ValueError: if the input is not a ``did:plc`` or ``did:web``
requests.RequestException: if an HTTP request fails
"""
if did:
if did.startswith('did:plc:'):
return resolve_plc(did, **kwargs)
elif did.startswith('did:web:'):
return resolve_web(did, **kwargs)
raise ValueError(f'{did} is not a did:plc or did:web')
[docs]def resolve_plc(did, get_fn=requests.get):
"""Resolves a ``did:plc`` by fetching its DID document from a PLC registry.
The PLC registry hostname is specified in the ``PLC_HOST`` environment
variable.
``did:plc`` background:
* https://atproto.com/specs/did-plc
* https://github.com/bluesky-social/did-method-plc
Args:
did (str)
get_fn (callable): for making HTTP GET requests
Returns:
dict: JSON DID document
Raises:
ValueError: if the input did is not a ``did:plc`` str
requests.RequestException: if the HTTP request fails
"""
if not isinstance(did, str) or not did.startswith('did:plc:'):
raise ValueError(f'{did} is not a did:plc')
resp = get_fn(f'https://{os.environ["PLC_HOST"]}/{did}')
resp.raise_for_status()
return resp.json()
[docs]def create_plc(handle, signing_key=None, rotation_key=None, pds_url=None,
post_fn=requests.post):
"""Creates a new ``did:plc`` in a PLC registry.
The PLC registry hostname is specified in the ``PLC_HOST`` environment
variable.
``did:plc`` background:
* https://atproto.com/specs/did-plc
* https://github.com/bluesky-social/did-method-plc
The DID document in the returned value is the *new format* DID doc, with the
fully qualified ``verificationMethod.id`` and ``Multikey`` key encoding, ie
``did:key`` without the prefix. Details:
https://github.com/bluesky-social/atproto/discussions/1510
Args:
handle (str): domain handle to associate with this DID
signing_key (ec.EllipticCurvePrivateKey): The curve must be SECP256K1.
If omitted, a new keypair will be created.
rotation_key (ec.EllipticCurvePrivateKey): The curve must be SECP256K1.
If omitted, a new keypair will be created.
pds_url (str): PDS base URL to associate with this DID. If omitted,
defaults to ``https://[PDS_HOST]``
post_fn (callable): for making HTTP POST requests
Returns:
DidPlc: with the newly created ``did:plc``, keys, and DID document
Raises:
ValueError: if any inputs are invalid
requests.RequestException: if the HTTP request to the PLC registry fails
"""
plc_host = os.environ.get('PLC_WRITE_HOST') or os.environ.get('PLC_HOST')
if not isinstance(handle, str) or not handle:
raise ValueError(f'{handle} is not a valid handle')
if not pds_url:
pds_url = f'https://{os.environ["PDS_HOST"]}'
for key in signing_key, rotation_key:
if key and not isinstance(key.curve, ec.SECP256K1):
raise ValueError(f'Expected SECP256K1 key; got {key.curve}')
if not signing_key:
logger.info('Generating new k256 signing key')
signing_key = util.new_key()
if not rotation_key:
logger.info('Generating new k256 rotation key')
rotation_key = util.new_key()
logger.info('Generating and signing PLC directory genesis operation...')
# this is a PLC directory genesis operation for creating a new DID.
# it's *not* a DID document. similar but not the same!
# https://github.com/bluesky-social/did-method-plc#presentation-as-did-document
create = {
'type': 'plc_operation',
'rotationKeys': [encode_did_key(rotation_key.public_key())],
'verificationMethods': {
'atproto': encode_did_key(signing_key.public_key()),
},
'alsoKnownAs': [
f'at://{handle}',
],
'services': {
'atproto_pds': {
'type': 'AtprotoPersonalDataServer',
'endpoint': f'{pds_url}',
}
},
'prev': None,
}
create = util.sign(create, rotation_key)
create['sig'] = base64.urlsafe_b64encode(create['sig']).decode().rstrip('=')
sha256 = Hash(SHA256())
sha256.update(dag_cbor.encode(create))
hash = sha256.finalize()
did_plc = 'did:plc:' + base64.b32encode(hash)[:24].lower().decode()
logger.info(f' {did_plc}')
plc_url = f'https://{plc_host}/{did_plc}'
logger.info(f'Publishing to {plc_url} ...')
resp = post_fn(plc_url, json=create)
resp.raise_for_status()
logger.info(f'{resp} {resp.content}')
create['did'] = did_plc
return DidPlc(did=did_plc, doc=plc_operation_to_did_doc(create),
signing_key=signing_key, rotation_key=rotation_key)
[docs]def encode_did_key(pubkey):
"""Encodes an :class:`ec.EllipticCurvePublicKey` into a ``did:key`` string.
https://atproto.com/specs/did#public-key-encoding
Args:
pubkey (ec.EllipticCurvePublicKey)
Returns:
str: encoded ``did:key``
"""
if isinstance(pubkey.curve, ec.SECP256K1):
codec = 'secp256k1-pub'
elif isinstance(pubkey.curve, ec.SECP256R1):
codec = 'p256-pub'
else:
raise ValueError(f'Expected secp256k1 or secp256r1 curve, got {pubkey.curve}')
pubkey_bytes = pubkey.public_bytes(serialization.Encoding.X962,
serialization.PublicFormat.CompressedPoint)
pubkey_multibase = multibase.encode(multicodec.wrap(codec, pubkey_bytes),
'base58btc')
return f'did:key:{pubkey_multibase}'
[docs]def decode_did_key(did_key):
"""Decodes a ``did:key`` string into an :class:`ec.EllipticCurvePublicKey`.
https://atproto.com/specs/did#public-key-encoding
Args:
did_key (str)
Returns:
ec.EllipticCurvePublicKey
"""
assert did_key.startswith('did:key:')
wrapped_bytes = multibase.decode(did_key.removeprefix('did:key:'))
codec, data = multicodec.unwrap(wrapped_bytes)
if codec.name == 'secp256k1-pub':
curve = ec.SECP256K1()
elif codec.name == 'p256-pub':
curve = ec.SECP256R1()
else:
raise ValueError(f'Expected secp256k1 or secp256r1 curve, got {codec.name}')
return ec.EllipticCurvePublicKey.from_encoded_point(curve, data)
[docs]def plc_operation_to_did_doc(op):
"""Converts a PLC directory operation to a DID document.
https://github.com/bluesky-social/did-method-plc#presentation-as-did-document
The DID document in the returned value is the *new format* DID doc, with the
fully qualified ``verificationMethod.id`` and ``Multikey`` key encoding, ie
``did:key`` without the prefix. Details:
https://github.com/bluesky-social/atproto/discussions/1510
Args:
op: dict, PLC operation, https://github.com/did-method-plc/did-method-plc#operation-serialization-signing-and-validation
Returns:
dict: DID document, https://www.w3.org/TR/did-core/#data-model
"""
assert op
signing_did_key = op['verificationMethods']['atproto']
return {
'@context': [
'https://www.w3.org/ns/did/v1',
'https://w3id.org/security/multikey/v1',
'https://w3id.org/security/suites/secp256k1-2019/v1',
],
'id': op['did'],
'alsoKnownAs': op['alsoKnownAs'],
'verificationMethod': [{
'id': f'{op["did"]}#atproto',
'type': 'EcdsaSecp256r1VerificationKey2019',
'controller': op['did'],
'publicKeyMultibase': signing_did_key.removeprefix('did:key:'),
}],
'service': [{
'id': '#atproto_pds',
'type': 'AtprotoPersonalDataServer',
'serviceEndpoint': op['services']['atproto_pds']['endpoint'],
}],
}
[docs]def resolve_web(did, get_fn=requests.get):
"""Resolves a ``did:web`` by fetching its DID document.
``did:web`` spec: https://w3c-ccg.github.io/did-method-web/
Args:
did (str)
get_fn (callable): for making HTTP GET requests
Returns:
dict: JSON DID document
Raises:
ValueError: if the input did is not a ``did:web`` str
requests.RequestException: if the HTTP request fails
"""
if not isinstance(did, str) or not did.startswith('did:web:'):
raise ValueError(f'{did} is not a did:web')
did = did.removeprefix('did:web:')
if ':' in did:
did = did.replace(':', '/')
else:
did += '/.well-known'
resp = get_fn(f'https://{urllib.parse.unquote(did)}/did.json')
resp.raise_for_status()
return resp.json()
[docs]def resolve_handle(handle, get_fn=requests.get):
"""Resolves an ATProto handle to a DID.
Supports the DNS TXT record and HTTPS well-known methods.
https://atproto.com/specs/handle#handle-resolution
Args:
handle (str)
get_fn (callable): for making HTTP GET requests
Returns:
str or None: DID, or None if the handle can't be resolved
Raises:
ValueError: if handle is not a domain
"""
if not handle or not isinstance(handle, str) or not util.DOMAIN_RE.match(handle):
raise ValueError(f"{handle} doesn't look like a domain")
logger.info(f'Resolving handle {handle}')
# DNS method
name = f'_atproto.{handle}.'
try:
logger.info(f'Querying DNS TXT for {name}')
answer = dns.resolver.resolve(name, TXT)
logger.info(f'Got: {answer.response}')
if answer.canonical_name.to_text() == name:
for rdata in answer:
if rdata.rdtype == TXT:
text = rdata.to_text()
if text.startswith('"did=did:'):
return text.strip('"').removeprefix('did=')
except DNSException as e:
logger.info(repr(e))
# HTTPS well-known method
try:
resp = get_fn(f'https://{handle}/.well-known/atproto-did')
except requests.RequestException as e:
logger.info(f'HTTPS handle resolution failed: {e}')
return None
if resp.ok and resp.headers.get('Content-Type', '').split(';')[0] == 'text/plain':
return resp.text.strip()
return None