"""Bluesky / AT Protocol repo implementation.
https://atproto.com/guides/data-repos
Heavily based on:
https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts
Huge thanks to the Bluesky team for working in the public, in open source, and to
Daniel Holmgren and Devin Ivy for this code specifically!
"""
from collections import defaultdict, namedtuple
import logging
from cryptography.hazmat.primitives.asymmetric import ec
import dag_cbor
from multiformats import CID
from . import util
from .diff import Diff
from .mst import MST
from .storage import (
Action,
Block,
CommitData,
CommitOp,
Storage,
SUBSCRIBE_REPOS_NSID,
)
logger = logging.getLogger(__name__)
Write = namedtuple('Write', [
'action', # :class:`Action`
'collection', # str
'rkey', # str
'record', # dict
], defaults=[None] * 4)
[docs]def writes_to_commit_ops(writes):
r"""Converts :class:`Write`\s to :class:`CommitOp`\s.
Args:
write (iterable): of :class:`Write`
Returns:
list of :class:`repo.CommitOp`
"""
if not writes:
return writes
return [CommitOp(action=write.action,
path=f'{write.collection}/{write.rkey}',
cid=util.dag_cbor_cid(write.record) if write.record else None)
for write in writes]
[docs]class Repo:
"""AT Protocol data repo implementation, storage agnostic.
Attributes:
did (str): repo DID (dynamic property)
version (int): AT Protocol version (dynamic property)
storage (Storage)
mst (MST)
head (Block): head commit
handle (str)
callback (callable: (CommitData) => None): called on new commits.
May be set directly by clients. None means no callback.
"""
storage = None
mst = None
head = None
handle = None
callback = None
signing_key = None
rotation_key = None
[docs] def __init__(self, *, storage=None, mst=None, head=None, handle=None,
callback=None, signing_key=None, rotation_key=None):
"""Constructor.
Args:
storage (Storage): required
mst (MST)
commit (dict): head commit
cid (CID): head CID
callback (callable, CommitData => None)
signing_key (ec.EllipticCurvePrivateKey): required
rotation_key (ec.EllipticCurvePrivateKey)
"""
assert storage
assert signing_key
self.storage = storage
self.mst = mst
self.head = head
self.handle = handle
self.callback = callback
self.signing_key = signing_key
self.rotation_key = rotation_key
def __eq__(self, other):
return (self.head and other.head
and self.version == other.version
and self.did == other.did
and self.head == other.head)
@property
def did(self):
if self.head:
return self.head.decoded['did']
@property
def version(self):
if self.head:
return self.head.decoded['version']
[docs] def get_record(self, collection, rkey):
"""
Args:
collection (str)
rkey (str)
Returns:
dict: node, record or commit or serialized :class:`MST`
"""
cid = self.mst.get(f'{collection}/{rkey}')
if cid:
return self.storage.read(cid).decoded
[docs] def get_contents(self):
"""
Returns:
dict mapping str collection to dict mapping str rkey to dict record:
"""
entries = self.mst.list()
blocks = self.storage.read_many([e.value for e in entries])
contents = defaultdict(dict)
for entry in entries:
collection, rkey = entry.key.split('/', 2)
contents[collection][rkey] = blocks[entry.value].decoded
return contents
[docs] @classmethod
def create_from_commit(cls, storage, commit_data, *,
signing_key, rotation_key=None, **kwargs):
"""
Args:
storage (Storage)
commit_data (CommitData)
signing_key (ec.EllipticCurvePrivateKey): passed through to
:meth:`Storage.create_repo`
rotation_key (ec.EllipticCurvePrivateKey): optional, passed
through to :meth:`Storage.create_repo`
kwargs: passed through to :class:`Repo` constructor
Returns:
Repo:
"""
storage.apply_commit(commit_data)
# avoid reading from storage, since if we're in a transaction, those
# reads won't see writes that happened in apply_commit. instead,
# construct Repo and MST in memory from existing data.
mst = MST(storage=storage, pointer=commit_data.commit.decoded['data'])
repo = Repo(storage=storage, mst=mst, head=commit_data.commit,
signing_key=signing_key, rotation_key=rotation_key,
**kwargs)
storage.create_repo(repo, signing_key=signing_key, rotation_key=rotation_key)
if repo.callback:
repo.callback(commit_data)
return repo
[docs] @classmethod
def create(cls, storage, did, *, signing_key, rotation_key=None,
initial_writes=None, **kwargs):
"""
Args:
storage (Storage)
did (string)
signing_key (ec.EllipticCurvePrivateKey): passed through to
:class:`Storage.create_repo`
rotation_key (ec.EllipticCurvePrivateKey): optional, passed
through to :class:`Storage.create_repo`
initial_writes (sequence of Write)
kwargs: passed through to :class:`Repo` constructor
Returns:
Repo: self
"""
# initial commit
commit_data = cls.format_commit(storage=storage, repo_did=did,
signing_key=signing_key,
writes=initial_writes)
return cls.create_from_commit(storage, commit_data, signing_key=signing_key,
rotation_key=rotation_key, **kwargs)
[docs] @classmethod
def load(cls, storage, cid=None, **kwargs):
"""
Args:
storage (Storage)
cid (CID): optional
kwargs: passed through to :class:`Repo` constructor
Returns:
Repo:
"""
commit_cid = cid or storage.head
assert commit_cid, 'No cid provided and none in storage'
commit_block = storage.read(commit_cid)
mst = MST.load(storage=storage, cid=commit_block.decoded['data'])
logger.info(f'loaded repo for {commit_block.decoded["did"]} at commit {commit_cid}')
return Repo(storage=storage, mst=mst, head=commit_block, **kwargs)
[docs] def apply_commit(self, commit_data):
"""
Args:
commit_data (CommitData)
Returns:
Repo: self
"""
self.storage.apply_commit(commit_data)
self.head = commit_data.commit
if self.callback:
self.callback(commit_data)
return self
[docs] def apply_writes(self, writes):
"""
Args:
writes (Write or sequence of Write)
Returns:
Repo: self
"""
if isinstance(writes, Write):
writes = [writes]
commit_data = Repo.format_commit(repo=self, writes=writes)
self.apply_commit(commit_data)
return self