"""Bluesky repo storage base class and in-memory implementation.
Lightly based on:
https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/storage/repo-storage.ts
"""
from collections import namedtuple
from enum import auto, Enum
import dag_cbor
from multiformats import CID, multicodec, multihash
from .util import dag_cbor_cid, tid_to_int
SUBSCRIBE_REPOS_NSID = 'com.atproto.sync.subscribeRepos'
[docs]class Action(Enum):
"""Used in :meth:`Repo.format_commit`.
TODO: switch to StrEnum once we can require Python 3.11.
"""
CREATE = auto()
UPDATE = auto()
DELETE = auto()
# TODO: Should this be a subclass of Block?
CommitData = namedtuple('CommitData', [
'commit', # Block
'blocks', # dict of CID to Block
'prev', # CID or None
], defaults=[None]) # for ops
CommitOp = namedtuple('CommitOp', [ # for subscribeRepos
'action', # Action
'path', # str
'cid', # CID, or None for DELETE
])
# commit record format is:
# https://atproto.com/specs/repository#commit-objects
#
# {
# 'version': 3,
# 'did': [repo],
# 'rev': [str, TID],
# 'data': [CID],
# 'prev': [CID or None],
# 'sig': [bytes],
# }
[docs]class Block:
r"""An ATProto block: a record, :class:`MST` entry, or commit.
Can start from either encoded bytes or decoded object, with or without
:class:`CID`. Decodes, encodes, and generates :class:`CID` lazily, on
demand, on attribute access.
Based on :class:`carbox.car.Block`.
Attributes:
cid (CID): lazy-loaded (dynamic property)
decoded (dict): decoded object (dynamic property)
encoded (bytes): DAG-CBOR encoded data (dynamic property)
seq (int): ``com.atproto.sync.subscribeRepos`` sequence number
ops (list): :class:`CommitOp`\s if this is a commit, otherwise None
"""
[docs] def __init__(self, *, cid=None, decoded=None, encoded=None, seq=None,
ops=None):
"""Constructor.
Args:
cid (CID): optional
decoded (dict): optional
encoded (bytes): optional
"""
assert encoded or decoded
self._cid = cid
self._encoded = encoded
self._decoded = decoded
self.seq = seq
self.ops = ops
[docs] def __str__(self):
return f'<Block: {self.cid}>'
@property
def cid(self):
if self._cid is None:
digest = multihash.digest(self.encoded, 'sha2-256')
self._cid = CID('base58btc', 1, 'dag-cbor', digest)
return self._cid
@property
def encoded(self):
if self._encoded is None:
self._encoded = dag_cbor.encode(self.decoded)
return self._encoded
@property
def decoded(self):
if self._decoded is None:
self._decoded = dag_cbor.decode(self.encoded)
return self._decoded
def __eq__(self, other):
"""Compares by CID only."""
return self.cid == other.cid
def __hash__(self):
return hash(self.cid)
[docs]class Storage:
"""Abstract base class for storing nodes: records, MST entries, and commits.
Concrete subclasses should implement this on top of physical storage,
eg database, filesystem, in memory.
TODO: batch operations?
Attributes:
head (CID)
"""
head = None
[docs] def create_repo(self, repo, *, signing_key, rotation_key=None):
"""Stores a new repo's metadata in storage.
Only stores the repo's DID, handle, and head commit :class:`CID`, not
blocks!
If the repo already exists in storage, this should update it instead of
failing.
Args:
repo (Repo)
signing_key (ec.EllipticCurvePrivateKey)
rotation_key (ec.EllipticCurvePrivateKey): optional
"""
raise NotImplementedError()
[docs] def load_repo(self, did_or_handle):
"""Loads a repo from storage.
Args:
did_or_handle (str): optional
Returns:
Repo, or None if the did or handle wasn't found:
"""
raise NotImplementedError()
[docs] def read(self, cid):
"""Reads a node from storage.
Args:
cid (CID)
Returns:
Block, or None if not found:
"""
raise NotImplementedError()
[docs] def read_many(self, cids, require_all=True):
"""Batch read multiple nodes from storage.
Args:
cids (sequence of CID)
require_all (bool): whether to assert that all cids are found
Returns:
dict: {:class:`CID`: :class:`Block` or None if not found}
"""
raise NotImplementedError()
[docs] def read_blocks_by_seq(self, start=0):
"""Batch read blocks from storage by ``subscribeRepos`` sequence number.
Args:
seq (int): optional ``subscribeRepos`` sequence number to start from.
Defaults to 0.
Returns:
iterable or generator: all :class:`Block` s starting from ``seq``,
inclusive, in ascending ``seq`` order
"""
raise NotImplementedError()
[docs] def read_commits_by_seq(self, start=0):
"""Batch read commits from storage by ``subscribeRepos`` sequence number.
Args:
seq (int): optional ``subscribeRepos`` sequence number to start from,
inclusive. Defaults to 0.
Returns:
generator: generator of :class:`CommitData`, starting from ``seq``,
inclusive, in ascending ``seq`` order
"""
assert start >= 0
seq = commit_block = blocks = None
for block in self.read_blocks_by_seq(start=start):
assert block.seq
if block.seq != seq: # switching to a new commit's blocks
if commit_block:
assert blocks
yield CommitData(blocks=blocks, commit=commit_block,
prev=commit_block.decoded.get('prev'))
else:
assert blocks is None # only the first commit
seq = block.seq
blocks = {} # maps CID to Block
commit_block = None
blocks[block.cid] = block
commit_fields = ['version', 'did', 'rev', 'prev', 'data', 'sig']
if block.decoded.keys() == set(commit_fields):
commit_block = block
# final commit
if blocks:
assert blocks and commit_block
yield CommitData(blocks=blocks, commit=commit_block,
prev=commit_block.decoded.get('prev'))
[docs] def has(self, cid):
"""Checks if a given :class:`CID` is currently stored.
Args:
cid (CID)
Returns:
bool:
"""
raise NotImplementedError()
[docs] def write(self, repo_did, obj):
"""Writes a node to storage.
Generates new sequence number(s) as necessary for newly stored blocks.
TODO: remove? This seems unused.
Args:
repo_did (str):
obj (dict): a record, commit, or serialized :class:`MST` node
Returns:
CID:
"""
raise NotImplementedError()
[docs] def apply_commit(self, commit_data):
"""Writes a commit to storage.
Generates a new sequence number and uses it for all blocks in the commit.
Args:
commit (CommitData)
"""
raise NotImplementedError()
[docs] def allocate_seq(self, nsid):
"""Generates and returns a sequence number for the given NSID.
Sequence numbers must be monotonically increasing positive integers, per
NSID. They may have gaps. Background:
https://atproto.com/specs/event-stream#sequence-numbers
Args:
nsid (str): subscription XRPC method this sequence number is for
Returns:
int:
"""
raise NotImplementedError()
[docs] def last_seq(self, nsid):
"""Returns the last (highest) stored sequence number for the given NSID.
Args:
nsid (str): subscription XRPC method this sequence number is for
Returns:
int:
"""
raise NotImplementedError()
[docs]class MemoryStorage(Storage):
"""In memory storage implementation.
Attributes:
repos (list of :class:`Repo`)
blocks (dict): {:class:`CID`: :class:`Block`}
head (CID)
sequences (dict): {str NSID: int next sequence number}
"""
repos = None
blocks = None
head = None
sequences = None
[docs] def __init__(self):
self.blocks = {}
self.repos = []
self.sequences = {}
[docs] def create_repo(self, repo, *, signing_key, rotation_key=None):
if repo not in self.repos:
self.repos.append(repo)
[docs] def load_repo(self, did_or_handle):
assert did_or_handle
for repo in self.repos:
if did_or_handle in (repo.did, repo.handle):
return repo
[docs] def read(self, cid):
return self.blocks.get(cid)
[docs] def read_many(self, cids, require_all=True):
cids = list(cids)
found = {cid: self.blocks.get(cid) for cid in cids}
if require_all:
assert len(found) == len(cids), (len(found), len(cids))
return found
[docs] def read_blocks_by_seq(self, start=0):
assert start >= 0
return sorted((b for b in self.blocks.values() if b.seq >= start),
key=lambda b: b.seq)
[docs] def has(self, cid):
return cid in self.blocks
[docs] def write(self, repo_did, obj):
block = Block(decoded=obj, seq=self.allocate_seq(SUBSCRIBE_REPOS_NSID))
if block not in self.blocks:
self.blocks.add(block)
return block.cid
[docs] def apply_commit(self, commit_data):
seq = tid_to_int(commit_data.commit.decoded['rev'])
assert seq
for block in commit_data.blocks.values():
block.seq = seq
# only add new blocks so we don't wipe out any existing blocks' sequence
# numbers. (occasionally we see existing blocks recur, eg MST nodes.)
for cid, block in commit_data.blocks.items():
self.blocks.setdefault(cid, block)
self.head = commit_data.commit.cid
# the Repo will generally already be in self.repos, and it updates its
# own head cid, so no need to do that here manually.
[docs] def allocate_seq(self, nsid):
assert nsid
next = self.sequences.setdefault(nsid, 1)
self.sequences[nsid] += 1
return next
[docs] def last_seq(self, nsid):
assert nsid
return self.sequences[nsid] - 1