Source code for arroba.xrpc_repo

"""```com.atproto.repo.*`` XRPC methods."""
import itertools
import json
import logging
import os

from carbox import read_car
import dag_json
from flask import abort, make_response
from lexrpc import Client
from multiformats import CID
from requests import HTTPError, RequestException

from . import did
from .mst import MST
from .repo import Repo, Write
from . import server
from .storage import Action, Block
from . import server
from .util import at_uri, dag_cbor_cid, new_key, next_tid, USER_AGENT, verify_sig

logger = logging.getLogger(__name__)


def validate(input, **params):
    input.update(params)

    for field in 'swapCommit', 'swapRecord':
        if input.get(field):
            raise ValueError(f'{field} not supported yet')

    if not input.get('repo'):
        raise ValueError('Missing repo param')


[docs] @server.server.method('com.atproto.repo.createRecord') def create_record(input): """Handler for ``com.atproto.repo.createRecord`` XRPC method.""" validate(input) server.auth() repo = server.load_repo(input['repo']) input.setdefault('rkey', next_tid()) return put_record(input)
[docs] @server.server.method('com.atproto.repo.getRecord') def get_record(input, repo=None, collection=None, rkey=None, cid=None): """Handler for `com.atproto.repo.getRecord` XRPC method.""" # Largely duplicates xrpc_sync.get_record validate(input, repo=repo, collection=collection, rkey=rkey, cid=cid) if cid: raise ValueError(f'cid not supported yet') try: repo = server.load_repo(input['repo']) record = repo.get_record(collection, rkey) if record is not None: return json.loads(dag_json.encode({ 'uri': at_uri(repo.did, collection, rkey), 'cid': dag_cbor_cid(record).encode('base32'), 'value': record, }, dialect='atproto')) except ValueError as e: logger.info(e) pass # fall back to AppView if available av_host = os.environ.get('APPVIEW_HOST') jwt = os.environ.get('APPVIEW_JWT') if not av_host or not jwt: raise ValueError(f'{collection} {rkey} not found') logger.info(f'Falling back to AppView at {av_host}') appview = Client(f'https://{av_host}', access_token=jwt, headers={'User-Agent': USER_AGENT}) try: return appview.com.atproto.repo.getRecord( {}, repo=input['repo'], collection=collection, rkey=rkey) except HTTPError as e: body = e.response.json() logger.info(f'Returning AppView error to client: {e} {body}') status = e.response.status_code abort(status, response=make_response(body, status))
[docs] @server.server.method('com.atproto.repo.deleteRecord') def delete_record(input): """Handler for ``com.atproto.repo.deleteRecord`` XRPC method.""" validate(input) server.auth() repo = server.load_repo(input['repo']) record = repo.get_record(input['collection'], input['rkey']) if record is None: return # noop server.storage.commit(repo, [Write( action=Action.DELETE, collection=input['collection'], rkey=input['rkey'], )])
[docs] @server.server.method('com.atproto.repo.listRecords') def list_records(input, repo=None, collection=None, limit=50, cursor=None, reverse=None, # DEPRECATED rkeyStart=None, rkeyEnd=None): """Handler for `com.atproto.repo.listRecords` XRPC method. KNOWN ISSUE: cursor is interpreted as inclusive, so whenever a cursor is used, the response includes the last record returned in the previous response. """ validate(input, repo=repo, collection=collection, limit=limit, cursor=cursor) if rkeyStart or rkeyEnd: raise ValueError(f'rkeyStart/rkeyEnd not supported') elif reverse: raise ValueError(f'reverse not supported yet') elif not collection: raise ValueError(f'collection is required') repo = server.load_repo(input['repo']) start = cursor or f'{collection}/' entries = list(itertools.islice( itertools.takewhile(lambda entry: entry.key.startswith(f'{collection}/'), repo.mst.walk_leaves_from(key=start)), limit)) blocks = server.storage.read_many([e.value for e in entries]) records = [blocks[e.value].decoded for e in entries] records = [ json.loads(dag_json.encode({ 'uri': at_uri(repo.did, *entry.key.split('/', 2)), # collection, rkey 'cid': dag_cbor_cid(record).encode('base32'), 'value': record, }, dialect='atproto')) for entry, record in zip(entries, records)] ret = {'records': records} if len(entries) == limit: ret['cursor'] = entries[-1].key return ret
[docs] @server.server.method('com.atproto.repo.putRecord') def put_record(input): """Handler for ``com.atproto.repo.putRecord`` XRPC method.""" validate(input) server.auth() repo = server.load_repo(input['repo']) existing = repo.get_record(input['collection'], input['rkey']) server.storage.commit(repo, [Write( action=Action.CREATE if existing is None else Action.UPDATE, collection=input['collection'], rkey=input['rkey'], record=input['record'], )]) return { 'uri': at_uri(repo.did, input['collection'], input['rkey']), 'cid': dag_cbor_cid(input['record']).encode('base32'), }
[docs] @server.server.method('com.atproto.repo.describeRepo') def describe_repo(input, repo=None): """Handler for ``com.atproto.repo.describeRepo`` XRPC method.""" validate(input, repo=repo) repo = server.load_repo(input['repo']) try: did_doc = did.resolve(repo.did) except (ConnectionError, OSError, RequestException, TimeoutError) as e: raise ValueError(f"Couldn't resolve {repo.did}") return { 'did': repo.did, 'handle': repo.handle, 'didDoc': did_doc, 'collections': [ 'app.bsky.actor.profile', 'app.bsky.feed.like', 'app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.graph.block', 'app.bsky.graph.follow', 'app.bsky.graph.listblock', 'chat.bsky.actor.declaration', ], 'handleIsCorrect': True, }
[docs] @server.server.method('com.atproto.repo.importRepo') def import_repo(input): """Handler for ``com.atproto.repo.importRepo`` XRPC method. Requires that a repo doesn't already exist for this DID. """ server.auth() roots, car_blocks = read_car(input) if not roots: raise ValueError("CAR missing root CID") head_cid = roots[0] # read and prepare blocks blocks = [] repo_did = None head = None for car_block in car_blocks: # note seq 0, since we won't emit these over the firehose block = Block(cid=car_block.cid, encoded=car_block.data, seq=0) blocks.append(block) if block.cid == head_cid: # this is the commit. note that its signature is generated by the # old PDS's signing key. that doesn't matter since we make a new # commit below when we create the repo. head = block repo_did = car_block.decoded['did'] if server.storage.load_repo(repo_did): raise ValueError(f'repo already exists for DID {repo_did}') did_doc = did.resolve(repo_did) signing_key = did.get_signing_key(did_doc) if not signing_key or not verify_sig(car_block.decoded, signing_key): raise ValueError(f"Couldn't verify signature on head commit {head_cid.encode('base32')}") if not head: raise ValueError("Couldn't find head commit block") elif not repo_did: raise ValueError("Head commit block missing DID") logger.info(f'importing repo for {repo_did}') for block in blocks: block.repo = repo_did server.storage.write_blocks(blocks) mst = MST.load(storage=server.storage, cid=head.decoded['data']) handle = did.get_handle(did_doc) repo = Repo(storage=server.storage, mst=mst, head=head, handle=handle, status='deactivated', signing_key=new_key(), rotation_key=new_key()) server.storage.create_repo(repo)
[docs] @server.server.method('com.atproto.repo.applyWrites') def apply_writes(input): """Handler for ``com.atproto.repo.applyWrites`` XRPC method.""" validate(input) server.auth() return 'Not implemented', 501
[docs] @server.server.method('com.atproto.repo.uploadBlob') def upload_blob(input): """Handler for ``com.atproto.repo.uploadBlob`` XRPC method.""" # input: binary validate({}) server.auth() return 'Not implemented', 501