Source code for arroba.repo

"""Bluesky / AT Protocol repo implementation.

https://atproto.com/guides/data-repos

Heavily based on:
https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts

Huge thanks to the Bluesky team for working in the public, in open source, and to
Daniel Holmgren and Devin Ivy for this code specifically!
"""
from collections import defaultdict, namedtuple
import copy
import logging

from carbox import car
from cryptography.hazmat.primitives.asymmetric import ec
import dag_cbor
from multiformats import CID

from . import util
from . import mst
from . import storage as storage_mod

logger = logging.getLogger(__name__)


Write = namedtuple('Write', [
    'action',      # :class:`Action`
    'collection',  # str
    'rkey',        # str
    'record',      # dict
], defaults=[None] * 4)


[docs] class Repo: """AT Protocol data repo implementation, storage agnostic. Attributes: did (str): repo DID (dynamic property) version (int): AT Protocol version (dynamic property) storage (Storage) mst (MST) head (Block): head commit handle (str) status (str): None (if active) or ``'deactivated'``, ``'deleted'``, or ``'tombstoned'`` (deprecated) callback (callable: (data=CommitData | dict, lost_seq=int) => None): called on new commits and other repo events. May be set directly by clients. None means no callback. Both kwargs are optional. ``data`` is a :class:`CommitData` for commits, or a dict record with ``$type`` for other ``com.atproto.sync.subscribeRepos`` messages. ``lost_seq`` is an integer sequence number that we allocated but then didn't use, ie "lost." """ storage = None mst = None head = None handle = None callback = None signing_key = None rotation_key = None status = None def __init__(self, *, storage=None, mst=None, head=None, handle=None, status=None, callback=None, signing_key=None, rotation_key=None): """Constructor. Args: storage (Storage): required mst (MST) head (Block): head commit handle (str) status (str): None (if active) or ``'deactivated'``, ``'deleted'``, or ``'tombstoned'`` (deprecated) callback (callable, (CommitData | dict) => None) signing_key (ec.EllipticCurvePrivateKey): required rotation_key (ec.EllipticCurvePrivateKey) """ assert storage assert signing_key self.storage = storage self.mst = mst self.head = head self.handle = handle self.status = status self.callback = callback self.signing_key = signing_key self.rotation_key = rotation_key def __eq__(self, other): return (self.head and other.head and self.version == other.version and self.did == other.did and self.head == other.head) @property def did(self): if self.head: return self.head.decoded['did'] @property def version(self): if self.head: return self.head.decoded['version']
[docs] def get_record(self, collection, rkey): """ Args: collection (str) rkey (str) Returns: dict: node, record or commit or serialized :class:`MST` """ cid = self.mst.get(f'{collection}/{rkey}') if cid: return self.storage.read(cid).decoded
[docs] def get_contents(self): """ Returns: dict mapping str collection to dict mapping str rkey to dict record: """ entries = self.mst.list() blocks = self.storage.read_many([e.value for e in entries]) contents = defaultdict(dict) for entry in entries: collection, rkey = entry.key.split('/', 2) contents[collection][rkey] = blocks[entry.value].decoded return contents
[docs] @classmethod def create(cls, storage, did, *, signing_key, rotation_key=None, **kwargs): """ Args: did (str) storage (Storage) signing_key (ec.EllipticCurvePrivateKey): rotation_key (ec.EllipticCurvePrivateKey): kwargs: passed through to :class:`Repo` constructor Returns: Repo: """ repo = Repo(storage=storage, mst=mst.MST.create(storage=storage), signing_key=signing_key, rotation_key=rotation_key, **kwargs) initial_commit = storage.commit(repo, [], repo_did=did) assert repo.head assert repo.did storage.write_event(repo=repo, type='identity', handle=kwargs.get('handle')) storage.write_event(repo=repo, type='account', active=True) # TODO: #sync event should be after #account/#identity but before first #commit # https://github.com/bluesky-social/proposals/tree/main/0006-sync-iteration#staying-synchronized-sync-event-auto-repair-and-account-status # https://github.com/snarfed/arroba/issues/52#issuecomment-2816324912 commit = initial_commit.commit sync_blocks = [car.Block(cid=commit.cid, data=commit.encoded, decoded=commit.decoded)] blocks_bytes = car.write_car([commit.cid], sync_blocks) storage.write_event(repo=repo, type='sync', blocks=blocks_bytes, rev=commit.decoded['rev']) storage.create_repo(repo) if repo.callback: repo.callback(data=initial_commit) return repo
[docs] @classmethod def load(cls, storage, cid=None, **kwargs): """ Args: storage (Storage) cid (CID): optional kwargs: passed through to :class:`Repo` constructor Returns: Repo: """ commit_cid = cid or storage.head assert commit_cid, 'No cid provided and none in storage' commit_block = storage.read(commit_cid) tree = mst.MST.load(storage=storage, cid=commit_block.decoded['data']) logger.info(f'loaded repo for {commit_block.decoded["did"]} at commit {commit_cid}') return Repo(storage=storage, mst=tree, head=commit_block, **kwargs)