Source code for arroba.storage

"""Bluesky repo storage base class and in-memory implementation.

Lightly based on:
https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/storage/repo-storage.ts
"""
from collections import namedtuple
from enum import auto, Enum

import dag_cbor
from multiformats import CID, multicodec, multihash

from .util import dag_cbor_cid, tid_to_int

SUBSCRIBE_REPOS_NSID = 'com.atproto.sync.subscribeRepos'


[docs]class Action(Enum): """Used in :meth:`Repo.format_commit`. TODO: switch to StrEnum once we can require Python 3.11. """ CREATE = auto() UPDATE = auto() DELETE = auto()
# TODO: Should this be a subclass of Block? CommitData = namedtuple('CommitData', [ 'commit', # Block 'blocks', # dict of CID to Block 'prev', # CID or None ], defaults=[None]) # for ops CommitOp = namedtuple('CommitOp', [ # for subscribeRepos 'action', # Action 'path', # str 'cid', # CID, or None for DELETE ]) # commit record format is: # https://atproto.com/specs/repository#commit-objects # # { # 'version': 3, # 'did': [repo], # 'rev': [str, TID], # 'data': [CID], # 'prev': [CID or None], # 'sig': [bytes], # }
[docs]class Block: r"""An ATProto block: a record, :class:`MST` entry, or commit. Can start from either encoded bytes or decoded object, with or without :class:`CID`. Decodes, encodes, and generates :class:`CID` lazily, on demand, on attribute access. Based on :class:`carbox.car.Block`. Attributes: cid (CID): lazy-loaded (dynamic property) decoded (dict): decoded object (dynamic property) encoded (bytes): DAG-CBOR encoded data (dynamic property) seq (int): ``com.atproto.sync.subscribeRepos`` sequence number ops (list): :class:`CommitOp`\s if this is a commit, otherwise None """
[docs] def __init__(self, *, cid=None, decoded=None, encoded=None, seq=None, ops=None): """Constructor. Args: cid (CID): optional decoded (dict): optional encoded (bytes): optional """ assert encoded or decoded self._cid = cid self._encoded = encoded self._decoded = decoded self.seq = seq self.ops = ops
[docs] def __str__(self): return f'<Block: {self.cid}>'
@property def cid(self): if self._cid is None: digest = multihash.digest(self.encoded, 'sha2-256') self._cid = CID('base58btc', 1, 'dag-cbor', digest) return self._cid @property def encoded(self): if self._encoded is None: self._encoded = dag_cbor.encode(self.decoded) return self._encoded @property def decoded(self): if self._decoded is None: self._decoded = dag_cbor.decode(self.encoded) return self._decoded def __eq__(self, other): """Compares by CID only.""" return self.cid == other.cid def __hash__(self): return hash(self.cid)
[docs]class Storage: """Abstract base class for storing nodes: records, MST entries, and commits. Concrete subclasses should implement this on top of physical storage, eg database, filesystem, in memory. TODO: batch operations? Attributes: head (CID) """ head = None
[docs] def create_repo(self, repo, *, signing_key, rotation_key=None): """Stores a new repo's metadata in storage. Only stores the repo's DID, handle, and head commit :class:`CID`, not blocks! If the repo already exists in storage, this should update it instead of failing. Args: repo (Repo) signing_key (ec.EllipticCurvePrivateKey) rotation_key (ec.EllipticCurvePrivateKey): optional """ raise NotImplementedError()
[docs] def load_repo(self, did_or_handle): """Loads a repo from storage. Args: did_or_handle (str): optional Returns: Repo, or None if the did or handle wasn't found: """ raise NotImplementedError()
[docs] def read(self, cid): """Reads a node from storage. Args: cid (CID) Returns: Block, or None if not found: """ raise NotImplementedError()
[docs] def read_many(self, cids, require_all=True): """Batch read multiple nodes from storage. Args: cids (sequence of CID) require_all (bool): whether to assert that all cids are found Returns: dict: {:class:`CID`: :class:`Block` or None if not found} """ raise NotImplementedError()
[docs] def read_blocks_by_seq(self, start=0): """Batch read blocks from storage by ``subscribeRepos`` sequence number. Args: seq (int): optional ``subscribeRepos`` sequence number to start from. Defaults to 0. Returns: iterable or generator: all :class:`Block` s starting from ``seq``, inclusive, in ascending ``seq`` order """ raise NotImplementedError()
[docs] def read_commits_by_seq(self, start=0): """Batch read commits from storage by ``subscribeRepos`` sequence number. Args: seq (int): optional ``subscribeRepos`` sequence number to start from, inclusive. Defaults to 0. Returns: generator: generator of :class:`CommitData`, starting from ``seq``, inclusive, in ascending ``seq`` order """ assert start >= 0 seq = commit_block = blocks = None for block in self.read_blocks_by_seq(start=start): assert block.seq if block.seq != seq: # switching to a new commit's blocks if commit_block: assert blocks yield CommitData(blocks=blocks, commit=commit_block, prev=commit_block.decoded.get('prev')) else: assert blocks is None # only the first commit seq = block.seq blocks = {} # maps CID to Block commit_block = None blocks[block.cid] = block commit_fields = ['version', 'did', 'rev', 'prev', 'data', 'sig'] if block.decoded.keys() == set(commit_fields): commit_block = block # final commit if blocks: assert blocks and commit_block yield CommitData(blocks=blocks, commit=commit_block, prev=commit_block.decoded.get('prev'))
[docs] def has(self, cid): """Checks if a given :class:`CID` is currently stored. Args: cid (CID) Returns: bool: """ raise NotImplementedError()
[docs] def write(self, repo_did, obj): """Writes a node to storage. Generates new sequence number(s) as necessary for newly stored blocks. TODO: remove? This seems unused. Args: repo_did (str): obj (dict): a record, commit, or serialized :class:`MST` node Returns: CID: """ raise NotImplementedError()
[docs] def apply_commit(self, commit_data): """Writes a commit to storage. Generates a new sequence number and uses it for all blocks in the commit. Args: commit (CommitData) """ raise NotImplementedError()
[docs] def allocate_seq(self, nsid): """Generates and returns a sequence number for the given NSID. Sequence numbers must be monotonically increasing positive integers, per NSID. They may have gaps. Background: https://atproto.com/specs/event-stream#sequence-numbers Args: nsid (str): subscription XRPC method this sequence number is for Returns: int: """ raise NotImplementedError()
[docs] def last_seq(self, nsid): """Returns the last (highest) stored sequence number for the given NSID. Args: nsid (str): subscription XRPC method this sequence number is for Returns: int: """ raise NotImplementedError()
[docs]class MemoryStorage(Storage): """In memory storage implementation. Attributes: repos (list of :class:`Repo`) blocks (dict): {:class:`CID`: :class:`Block`} head (CID) sequences (dict): {str NSID: int next sequence number} """ repos = None blocks = None head = None sequences = None
[docs] def __init__(self): self.blocks = {} self.repos = [] self.sequences = {}
[docs] def create_repo(self, repo, *, signing_key, rotation_key=None): if repo not in self.repos: self.repos.append(repo)
[docs] def load_repo(self, did_or_handle): assert did_or_handle for repo in self.repos: if did_or_handle in (repo.did, repo.handle): return repo
[docs] def read(self, cid): return self.blocks.get(cid)
[docs] def read_many(self, cids, require_all=True): cids = list(cids) found = {cid: self.blocks.get(cid) for cid in cids} if require_all: assert len(found) == len(cids), (len(found), len(cids)) return found
[docs] def read_blocks_by_seq(self, start=0): assert start >= 0 return sorted((b for b in self.blocks.values() if b.seq >= start), key=lambda b: b.seq)
[docs] def has(self, cid): return cid in self.blocks
[docs] def write(self, repo_did, obj): block = Block(decoded=obj, seq=self.allocate_seq(SUBSCRIBE_REPOS_NSID)) if block not in self.blocks: self.blocks.add(block) return block.cid
[docs] def apply_commit(self, commit_data): seq = tid_to_int(commit_data.commit.decoded['rev']) assert seq for block in commit_data.blocks.values(): block.seq = seq # only add new blocks so we don't wipe out any existing blocks' sequence # numbers. (occasionally we see existing blocks recur, eg MST nodes.) for cid, block in commit_data.blocks.items(): self.blocks.setdefault(cid, block) self.head = commit_data.commit.cid
# the Repo will generally already be in self.repos, and it updates its # own head cid, so no need to do that here manually.
[docs] def allocate_seq(self, nsid): assert nsid next = self.sequences.setdefault(nsid, 1) self.sequences[nsid] += 1 return next
[docs] def last_seq(self, nsid): assert nsid return self.sequences[nsid] - 1