arroba package

Reference documentation.

did

Utilities to create and resolve did:plcs, did:webs, and handles.

class DidPlc(did, signing_key, rotation_key, doc)

Bases: tuple

requests_get(*args, **kwargs)[source]

Used as get_fn below. Wrapped so that we can mock requests.get in tests.

resolve(did, **kwargs)[source]

Resolves a did:plc or did:web.

Parameters:
Returns:

JSON DID document

Return type:

dict

Raises:
resolve_plc(did, get_fn=<function requests_get>)[source]

Resolves a did:plc by fetching its DID document from a PLC directory.

The PLC directory hostname is specified in the PLC_HOST environment variable.

did:plc background:

Parameters:
  • did (str)

  • get_fn (callable) – for making HTTP GET requests

Returns:

JSON DID document

Return type:

dict

Raises:
create_plc(handle, **kwargs)[source]

Creates a new did:plc in a PLC directory.

Args are documented in write_plc().

update_plc(did, **kwargs)[source]

Updates an existing did:plc in a PLC directory.

Args are documented in write_plc().

write_plc(did=None, handle=None, signing_key=None, rotation_key=None, pds_url=None, also_known_as=None, prev=None, get_fn=<function requests_get>, post_fn=<function post>)[source]

Writes a PLC operation to a PLC directory.

Generally used to create a new did:plc or update an existing one.

The PLC directory hostname is specified in the PLC_HOST environment variable.

did:plc background:

The DID document in the returned value is the new format DID doc, with the fully qualified verificationMethod.id and Multikey key encoding, ie did:key without the prefix. Details: https://github.com/bluesky-social/atproto/discussions/1510

Parameters:
  • did (str) – if provided, updates an existing DID, otherwise creates a new one.

  • handle (str) – domain handle to associate with this DID

  • signing_key (ec.EllipticCurvePrivateKey) – The curve must be SECP256K1. If omitted, a new keypair will be created.

  • rotation_key (ec.EllipticCurvePrivateKey) – The curve must be SECP256K1. If omitted, a new keypair will be created.

  • pds_url (str) – PDS base URL to associate with this DID. If omitted, defaults to https://[$PDS_HOST]

  • also_known_as (str or sequence of str) – additional URI or URIs to add to alsoKnownAs

  • prev (str) – if an update, the CID of the previous operation for this DID

  • get_fn (callable) – for making HTTP GET requests

  • post_fn (callable) – for making HTTP POST requests

Returns:

with the DID, keys, and DID document

Return type:

DidPlc

Raises:
write_plc_operation(op, rotation_key, did=None, post_fn=<function post>)[source]

Signs and sends a PLC operation to the directory.

Parameters:
  • op (dict) – PLC operation

  • rotation_key (ec.EllipticCurvePrivateKey) – The curve must be SECP256K1. Must match the DID’s current rotation key in the PLC directory.

  • did (str) – if provided, updates an existing DID, otherwise creates a new one.

  • post_fn (callable) – for making HTTP POST requests

Returns:

with the DID and DID document, but not keys

Return type:

DidPlc

rollback_plc(did, rotation_key, num_operations=1, get_fn=<function requests_get>, post_fn=<function post>)[source]

Reverts a DID PLC document to its last version.

Reads a did:plc’s audit log from the directory, extracts its previous operation, and re-applies that operation. Reverts the DID to its last document, before the current one.

Parameters:
  • did (str)

  • rotation_key (ec.EllipticCurvePrivateKey) – The curve must be SECP256K1. Must match the DID’s current rotation key in the PLC directory.

  • num_operations (int) – how many operations back to revert. Defaults to 1.

  • get_fn (callable) – for making HTTP GET requests

  • post_fn (callable) – for making HTTP POST requests

Returns:

with the DID and DID document, but not keys

Return type:

DidPlc

encode_did_key(pubkey)[source]

Encodes an ec.EllipticCurvePublicKey into a did:key string.

https://atproto.com/specs/did#public-key-encoding

Parameters:

pubkey (ec.EllipticCurvePublicKey)

Returns:

encoded did:key

Return type:

str

decode_did_key(did_key)[source]

Decodes a did:key string into an ec.EllipticCurvePublicKey.

https://atproto.com/specs/did#public-key-encoding

Parameters:

did_key (str)

Returns:

ec.EllipticCurvePublicKey

get_handle(did_doc)[source]

Extracts and returns a DID’s handle.

Doesn’t do bidirectional handle resolution! Just returns the handle in the first at:// URI in alsoKnownAs.

Parameters:

did_doc (dict) – DID document

Returns:

handle, or None if the DID doc doens’t have one

Return type:

str

get_signing_key(did_doc)[source]

Extracts and returns a DID’s signing key.

Parameters:

did_doc (dict) – DID document

Returns:

ec.EllipticCurvePublicKey, or None if the DID doc has no ATProto signing key

plc_operation_to_did_doc(op)[source]

Converts a PLC directory operation to a DID document.

https://github.com/bluesky-social/did-method-plc#presentation-as-did-document

The DID document in the returned value is the new format DID doc, with the fully qualified verificationMethod.id and Multikey key encoding, ie did:key without the prefix. Details: https://github.com/bluesky-social/atproto/discussions/1510

Parameters:

op – dict, PLC operation, https://github.com/did-method-plc/did-method-plc#operation-serialization-signing-and-validation

Returns:

DID document, https://www.w3.org/TR/did-core/#data-model

Return type:

dict

resolve_web(did, get_fn=<function requests_get>)[source]

Resolves a did:web by fetching its DID document.

did:web spec: https://w3c-ccg.github.io/did-method-web/

Parameters:
  • did (str)

  • get_fn (callable) – for making HTTP GET requests

Returns:

JSON DID document

Return type:

dict

Raises:
resolve_handle(handle, get_fn=<function requests_get>)[source]

Resolves an ATProto handle to a DID.

Supports the DNS TXT record and HTTPS well-known methods.

https://atproto.com/specs/handle#handle-resolution

Parameters:
  • handle (str)

  • get_fn (callable) – for making HTTP GET requests

Returns:

DID, or None if the handle can’t be resolved

Return type:

str or None

Raises:

ValueError – if handle is not a valid handle

diff

AT Protocol utility for diffing two MSTs.

Heavily based on: https://github.com/bluesky/atproto/blob/main/packages/repo/src/mst/diff.ts

Huge thanks to the Bluesky team for working in the public, in open source, and to Daniel Holmgren and Devin Ivy for this code specifically!

mst_diff(cur, prev=None)[source]

Generates a diff between two MSTs.

Parameters:
  • cur (MST)

  • prev (MST) – optional

Return type:

Diff

null_diff(tree)[source]

Generates a “null” diff for a single MST with all adds and new CIDs.

Parameters:

tree (MST)

Return type:

Diff

class Change(key, cid, prev)

Bases: tuple

class Diff[source]

Bases: object

A diff between two MSTs.

adds

maps str to Change

Type:

dict

updates

maps str to Change

Type:

dict

deletes

maps str to Change

Type:

dict

new_cids
Type:

set of CID

removed_cids
Type:

set of CID

static of(cur, prev=None)[source]
Parameters:
  • cur (MST)

  • prev (MST) – optional

Return type:

Diff

record_add(key, cid)[source]
Parameters:
  • key (str)

  • cid (CID)

record_update(key, prev, cid)[source]
Parameters:
  • key (str)

  • prev (CID)

  • cid (CID)

record_delete(key, cid)[source]
Parameters:
  • key (str)

  • cid (CID)

record_new_cid(cid)[source]
Parameters:

cid (CID)

record_removed_cid(cid)[source]
Parameters:

cid (CID)

add_diff(diff)[source]
Parameters:

diff (Diff)

mst

Bluesky / AT Protocol Merkle search tree implementation.

Heavily based on: https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/mst/mst.ts

Huge thanks to the Bluesky team for working in the public, in open source, and to Daniel Holmgren and Devin Ivy for this code specifically!

From that file:

This is an implementation of a Merkle Search Tree (MST) The data structure is described here: https://hal.inria.fr/hal-02303490/document The MST is an ordered, insert-order-independent, deterministic tree. Data keys are laid out in alphabetic order. The key insight of an MST is that each key is hashed and starting 0s are counted to determine which layer it falls on (5 zeros for ~32 fanout). This is a merkle tree, so each subtree is referred to by its hash (CID). When a leaf is changed, ever tree on the path to that leaf is changed as well, thereby updating the root hash.

For atproto, we use SHA-256 as the key hashing algorithm, and ~4 fanout (2-bits of zero per layer).

A couple notes on CBOR encoding:

There are never two neighboring subtrees. Therefore, we can represent a node as an array of leaves & pointers to their right neighbor (possibly null), along with a pointer to the left-most subtree (also possibly null).

Most keys in a subtree will have overlap. We do compression on prefixes by describing keys as: * the length of the prefix that it shares in common with the preceding key * the rest of the string

For example:

If the first leaf in a tree is bsky/posts/abcdefg and the second is bsky/posts/abcdehi, then the first will be described as prefix: 0, key: 'bsky/posts/abcdefg', and the second will be described as prefix: 16, key: 'hi'.

class Entry(p, k, v, t)

Bases: tuple

class Data(l, e)

Bases: tuple

class Leaf(key, value)

Bases: tuple

class MST(*, storage=None, entries=None, pointer=None, layer=None)[source]

Bases: object

Merkle search tree class.

storage
Type:

Storage

entries
Type:

sequence of MST and Leaf

layer

this MST’s layer in the root MST

Type:

int

pointer
Type:

CID

outdated_pointer

whether pointer needs to be recalculated

Type:

bool

classmethod create(*, storage=None, entries=None, layer=None)[source]
Parameters:
  • storage (Storage)

  • entries (sequence of MST and Leaf)

  • layer (int)

Returns:

MST

new_tree(entries)[source]

We never mutate an MST, we just return a new MST with updated values.

Parameters:

entries (sequence of MST and Leaf)

Return type:

MST

get_entries()[source]

We don’t want to load entries of every subtree, just the ones we need.

Return type:

sequence of MST and Leaf

get_pointer()[source]

Returns this MST’s root CID pointer. Calculates it if necessary.

We don’t hash the node on every mutation for performance reasons. Instead we keep track of whether the pointer is outdated and only (recursively) calculate when needed.

Return type:

CID

get_layer()[source]

Returns this MST’s layer, and sets self.layer.

In most cases, we get the layer of a node from a hint on creation. In the case of the topmost node in the tree, we look for a key in the node & determine the layer. In the case where we don’t find one, we recurse down until we do. If we still can’t find one, then we have an empty tree and the node is layer 0.

Return type:

int

attempt_get_layer()[source]

Returns this MST’s layer, and sets self.layer.

Return type:

int or None

get_unstored_blocks()[source]

Return the necessary blocks to persist the MST to repo storage.

Return type:

(CID root, dict mapping CID to Block) tuple

add(key, value=None, known_zeros=None)[source]

Adds a new leaf for the given key/value pair.

Parameters:
  • key (str)

  • value (CID)

  • known_zeros (int)

Return type:

MST

Raises:

ValueError – if a leaf with that key already exists

get(key)[source]

Gets the value at the given key.

Parameters:

key (str)

Return type:

CID or None

update(key, value)[source]

Edits the value at the given key.

Parameters:
  • key (str)

  • value (CID)

Return type:

MST

Raises:

KeyError – if key doesn’t exist

delete(key)[source]

Deletes the value at the given key.

Parameters:

key (str)

Returns:

MST

Raises:

KeyError – if key doesn’t exist

delete_recurse(key)[source]

Deletes the value and subtree, if any, at the given key.

Parameters:

key (str)

Returns:

MST

update_entry(index, entry)[source]

Updates an entry in place.

Parameters:
Return type:

MST

remove_entry(index)[source]

Removes the entry at a given index.

Parameters:

index (int)

Return type:

MST

append(entry)[source]

Appends an entry to the end of the node.

Parameters:

entry (MST or Leaf)

Return type:

MST

prepend(entry)[source]

Prepends an entry to the start of the node.

Parameters:

entry (MST or Leaf)

Return type:

MST

at_index(index)[source]

Returns the entry at a given index.

Parameters:

index (int)

Return type:

MST or Leaf or None

slice(start=None, end=None)[source]

Returns a slice of this node.

Parameters:
  • start (int) – optional, inclusive

  • end (int) – optional, exclusive

Return type:

sequence of MST and Leaf

splice_in(entry, index)[source]

Inserts an entry at a given index.

Parameters:
Return type:

MST

replace_with_split(index, left=None, leaf=None, right=None)[source]

Replaces an entry with [ Maybe(tree), Leaf, Maybe(tree) ].

Parameters:
Return type:

MST

trim_top()[source]

Trims the top and return its subtree, if necessary.

Only if the topmost node in the tree only points to another tree. Otherwise, does nothing.

Return type:

MST

split_around(key)[source]

Recursively splits a subtree around a given key.

Parameters:

key (str)

Return type:

(MST or None, MST or None) tuple

append_merge(to_merge)[source]

Merges another tree with this one.

The simple merge case where every key in the right tree is greater than every key in the left tree. Used primarily for deletes.

Parameters:

to_merge (MST)

Return type:

MST

create_child()[source]
Return type:

MST

create_parent()[source]
Return type:

MST

find_gt_or_equal_leaf_index(key)[source]

Finds the index of the first leaf node greater than or equal to value.

Parameters:

key (str)

Return type:

int

walk_leaves_from(key)[source]

Walk tree starting at key.

Generator for leaves in the tree, starting at a given key.

Parameters:

key (str)

Generates:

Leaf

list(after=None, before=None)[source]

Returns entries, optionally bounded within a key range.

Parameters:
  • after (str) – key, optional

  • before (str) – key, optional

Return type:

sequence of Leaf

list_with_prefix(prefix)[source]

Returns entries with a given key prefix.

Parameters:

prefix (str) – key prefix

Returns:

sequence of Leaf

walk()[source]

Walk full tree, depth first, and emit nodes.

Return type:

generator of MST and Leaf

all_nodes()[source]

Walks the tree and returns all nodes.

Return type:

sequence of MST and Leaf

leaves()[source]

Walks tree and returns all leaves.

Return type:

sequence of Leaf

leaf_count()[source]

Returns the total number of leaves in this MST.

Return type:

int

load_all(start=0)[source]

Generator. Used in xrpc_sync.get_repo().

(The bluesky-social/atproto TS code calls this writeToCarStream.)

Parameters:

start (int) – optional subscribeRepos sequence number to start from, inclusive. Defaults to 0.

Returns:

generator of (CID, bytes) tuples

cids_for_path(key)[source]

Returns the CIDs in a given key path.

Parameters:

key (str)

Returns:

sequence of CID

add_covering_proofs(commit, blocks=None)[source]

Finds and adds blocks needed for covering proofs of a commit’s operations.

https://github.com/bluesky-social/proposals/tree/main/0006-sync-iteration#commit-validation-mst-operation-inversion

Parameters:
  • commit (CommitData)

  • blocks (dict, CID => Block) – optional; if provided, covering proof blocks will be added to this dict, in place.

Returns: dict, CID => Block: all blocks in covering proofs for this commit. If the

blocks arg was provided, this is it.

leading_zeros_on_hash(key)[source]

Returns the number of leading zeros in a key’s hash.

Parameters:

key (str or bytes)

Return type:

int

layer_for_entries(entries)[source]
Parameters:

entries (MST or Leaf)

Return type:

int or None

deserialize_node_data(*, storage=None, data=None, layer=None)[source]
Parameters:
Return type:

sequence of MST and Leaf

serialize_node_data(entries)[source]
Parameters:

entries (sequence of MST and Leaf)

Return type:

Data

common_prefix_len(a, b)[source]
Parameters:
Return type:

int

cid_for_entries(entries)[source]
Parameters:

entries (sequence of MST and Leaf)

Returns:

CID

ensure_valid_key(key)[source]
Parameters:

key (str)

Raises:

ValueError – if key is not a valid MST key

class WalkStatus(done, cur, walking, index)

Bases: tuple

class Walker(tree)[source]

Bases: object

Allows walking an MST manually.

stack
Type:

sequence of WalkStatus

status

current

Type:

WalkStatus

layer()[source]

Returns the curent layer of the node we’re on.

step_over()[source]

Moves to the next node in the subtree, skipping over the subtree.

step_into()[source]

Steps into a subtree.

Raises:

RuntimeError – if curently on a leaf

advance()[source]

Advances to the next node in the tree.

Steps into the curent node if necessary.

repo

Bluesky / AT Protocol repo implementation.

https://atproto.com/guides/data-repos

Heavily based on: https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts

Huge thanks to the Bluesky team for working in the public, in open source, and to Daniel Holmgren and Devin Ivy for this code specifically!

class Write(action, collection, rkey, record)

Bases: tuple

class Repo(*, storage=None, mst=None, head=None, handle=None, status=None, callback=None, signing_key=None, rotation_key=None)[source]

Bases: object

AT Protocol data repo implementation, storage agnostic.

did

repo DID (dynamic property)

Type:

str

version

AT Protocol version (dynamic property)

Type:

int

storage
Type:

Storage

mst
Type:

MST

head

head commit

Type:

Block

handle
Type:

str

status

None (if active) or 'deactivated', 'deleted', or 'tombstoned' (deprecated)

Type:

str

callback (callable

(data=CommitData | dict, lost_seq=int) => None): called on new commits and other repo events. May be set directly by clients. None means no callback. Both kwargs are optional. data is a CommitData for commits, or a dict record with $type for other com.atproto.sync.subscribeRepos messages. lost_seq is an integer sequence number that we allocated but then didn’t use, ie “lost.”

get_record(collection, rkey)[source]
Parameters:
Returns:

node, record or commit or serialized MST

Return type:

dict

get_contents()[source]
Return type:

dict mapping str collection to dict mapping str rkey to dict record

classmethod create(storage, did, *, signing_key, rotation_key=None, **kwargs)[source]
Parameters:
  • did (str)

  • storage (Storage)

  • signing_key (ec.EllipticCurvePrivateKey)

  • rotation_key (ec.EllipticCurvePrivateKey)

  • kwargs – passed through to Repo constructor

Return type:

Repo

classmethod load(storage, cid=None, **kwargs)[source]
Parameters:
  • storage (Storage)

  • cid (CID) – optional

  • kwargs – passed through to Repo constructor

Return type:

Repo

storage

Bluesky repo storage base class and in-memory implementation.

Lightly based on: https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/storage/repo-storage.ts

class Action(*values)[source]

Bases: Enum

Used in Storage.commit().

TODO: switch to StrEnum once we can require Python 3.11.

class CommitData(commit, blocks, prev)

Bases: tuple

class CommitOp(action, path, cid, prev_cid)

Bases: tuple

class Block(*, cid=None, decoded=None, encoded=None, seq=None, ops=None, time=None, repo=None)[source]

Bases: object

An ATProto block: a record, MST entry, commit, or other event.

Can start from either encoded bytes or decoded object, with or without CID. Decodes, encodes, and generates CID lazily, on demand, on attribute access.

Events should have a fully-qualified $type field that’s one of the message types in com.atproto.sync.subscribeRepos, eg com.atproto.sync.subscribeRepos#tombstone.

Based on carbox.car.Block.

cid

lazy-loaded (dynamic property)

Type:

CID

decoded

decoded object (dynamic property)

Type:

dict

encoded

DAG-CBOR encoded data (dynamic property)

Type:

bytes

seq

com.atproto.sync.subscribeRepos sequence number

Type:

int

ops

CommitOps if this is a commit, otherwise None

Type:

list

time

when this block was first created

Type:

datetime

repo

DID of a repo that includes this block. Occasionally, blocks may be included in more than one repo, so this may be any repo that includes it. In practice, it’s often the first or last repo that included it.

Type:

str

class Sequences(**kwargs)[source]

Bases: object

Abstract base class for managing sequence numbers for event streams.

…eg the com.atproto.sync.subscribeRepos firehose.

Background: https://atproto.com/specs/event-stream#sequence-numbers

allocate(nsid)[source]

Generates and returns a sequence number for a given NSID.

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int

last(nsid)[source]

Returns the last (highest) allocated sequence number for a given NSID.

…or None if no sequence number has ever been allocated for this NSID.

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int or None

class Storage(*, sequences, **kwargs)[source]

Bases: object

Abstract base class for storing nodes: records, MST entries, commits, etc.

Concrete subclasses should implement this on top of physical storage, eg database, filesystem, in memory.

sequences

Sequences subclass for allocating sequence numbers

Type:

class

create_repo(repo)[source]

Stores a new repo’s metadata in storage.

Only stores the repo’s handle and head commit CID, not blocks!

If the repo already exists in storage, this should update it instead of failing.

Parameters:

repo (Repo)

load_repo(did_or_handle)[source]

Loads a repo from storage.

Parameters:

did_or_handle (str) – optional

Return type:

Repo, or None if the did or handle wasn’t found

store_repo(repo)[source]

Writes a repo to storage.

Right now only writes some metadata: * handle * status * head

Parameters:

repo (Repo)

load_repos(after=None, limit=500)[source]

Loads multiple repos from storage.

Repos are returned in lexicographic order of their DIDs, ascending. Tombstoned repos are included.

Parameters:
  • after (str) – optional DID to start at, exclusive

  • limit (int) – maximum number of repos to return

Return type:

sequence of Repo

deactivate_repo(repo)[source]

Marks a repo as deactivated.

  • Stores a com.atproto.sync.subscribeRepos#account block with its own sequence number.

  • If Repo.callback is populated, calls it with the com.atproto.sync.subscribeRepos#account message.

  • Calls Repo._set_status() to mark the repo as deactivated in storage.

After this, any attempt to write to this repo will raise InactiveRepo.

Parameters:

repo (Repo)

activate_repo(repo)[source]

Marks a repo as active.

Only needed after deactivating. Does nothing if the repo is tombstoned.

  • Stores a com.atproto.sync.subscribeRepos#account block with its own sequence number.

  • If Repo.callback is populated, calls it with the com.atproto.sync.subscribeRepos#account message.

  • Calls Repo._set_status() to mark the repo as active in storage.

Parameters:

repo (Repo)

tombstone_repo(repo)[source]

Marks a repo as tombstoned.

  • Stores a com.atproto.sync.subscribeRepos#tombstone block with its own sequence number.

  • If Repo.callback is populated, calls it with the com.atproto.sync.subscribeRepos#tombstone message.

  • Calls Repo._set_status() to mark the repo as deactivated in storage.

After this, any attempt to write to this repo will raise InactiveRepo.

Parameters:

repo (Repo)

read(cid)[source]

Reads a node from storage.

Parameters:

cid (CID)

Return type:

Block, or None if not found

read_many(cids, require_all=True)[source]

Batch read multiple nodes from storage.

Parameters:
  • cids (sequence of CID)

  • require_all (bool) – whether to assert that all cids are found

Returns:

{CID: Block or None if not found}

Return type:

dict

read_blocks_by_seq(start=0, repo=None)[source]

Batch read blocks from storage by subscribeRepos sequence number.

Parameters:
  • seq (int) – optional subscribeRepos sequence number to start from. Defaults to 0.

  • repo (str) – optional repo DID. If not provided, all repos are included.

Returns:

all Block s starting from seq, inclusive, in ascending seq order

Return type:

iterable or generator

read_events_by_seq(start=0, repo=None)[source]

Batch read commits and other events by subscribeRepos sequence number.

Parameters:
  • start (int) – optional subscribeRepos sequence number to start from, inclusive. Defaults to 0.

  • repo (str) – optional repo DID. If not provided, all repos are included.

Returns:

generator of CommitData for commits and dict messages for other events, starting from seq, inclusive, in ascending seq order

Return type:

generator

has(cid)[source]

Checks if a given CID is currently stored.

Parameters:

cid (CID)

Return type:

bool

write(repo_did, obj, seq=None)[source]

Writes a node to storage.

Parameters:
  • repo_did (str)

  • obj (dict) – a record, commit, serialized MST node, or subscribeRepos event/message

  • seq (int or None) – sequence number. If not provided, a new one will be allocated.

Return type:

Block

Raises:

InactiveError – if the repo is not active

write_event(repo, type, **kwargs)[source]

Writes a subscribeRepos event to storage.

Parameters:
  • repo (Repo)

  • type (str) – account or identity

  • kwargs – included in the event, eg active, status`

Return type:

Block

Raises:

InactiveError – if the repo is not active

write_blocks(blocks)[source]

Batch write blocks to storage.

Does not allocate sequence numbers!

Parameters:

blocks (sequence of Block)

commit(repo, writes, repo_did=None)[source]

Commits zero or more writes to storage.

Allocates a new sequence number and uses it for all blocks in the commit.

Parameters:
  • repo (Repo)

  • writes (Write or sequence of Write)

  • repo_did (str) – optional, used if this is the repo’s first commit

Return type:

CommitData

Raises:
  • InactiveError – if the repo is not active

  • ValueError – if the commit is invalid, eg the path for an update or delete doesn’t currently exist

class MemorySequences[source]

Bases: Sequences

In memory sequence numbers.

sequences

{str NSID: int next sequence number}

Type:

dict

class MemoryStorage(*, sequences=None)[source]

Bases: Storage

In memory storage implementation.

repos
Type:

dict mapping str DID to Repo

blocks

{CID: Block}

Type:

dict

datastore_storage

Google Cloud Datastore implementation of repo storage.

class WriteOnce[source]

Bases: object

ndb.Property mix-in, prevents changing it once it’s set.

class JsonProperty(*args, **kwargs)[source]

Bases: TextProperty

Fork of ndb’s that subclasses ndb.TextProperty instead of ndb.BlobProperty.

This makes values show up as normal, human-readable, serialized JSON in the web console. https://github.com/googleapis/python-ndb/issues/874#issuecomment-1442753255

Duplicated in oauth-dropins/webutil: https://github.com/snarfed/webutil/blob/main/models.py

class ComputedJsonProperty(*args, **kwargs)[source]

Bases: JsonProperty, ComputedProperty

Custom ndb.ComputedProperty for JSON values that stores them as strings.

…instead of like ndb.StructuredProperty, with “entity” type, which bloats them unnecessarily in the datastore.

class CommitOp(**kwargs)[source]

Bases: Model

Repo operations - creates, updates, deletes - included in a commit.

Used in a StructuredProperty inside AtpBlock; not stored directly in the datastore.

https://googleapis.dev/python/python-ndb/latest/model.html#google.cloud.ndb.model.StructuredProperty

class AtpRepo(**kwargs)[source]

Bases: Model

An ATProto repo.

Key name is DID. Only stores the repo’s metadata. Blocks are stored in AtpBlocks.

Attributes: * handles (str): repeated, optional * head (str): CID * signing_key (str) * rotation_key (str) * status (str)

property signing_key

(ec.EllipticCurvePrivateKey)

property rotation_key

(ec.EllipticCurvePrivateKey` or None)

class AtpBlock(**kwargs)[source]

Bases: Model

A data record, MST node, repo commit, or other event.

Key name is the DAG-CBOR base32 CID of the data.

Events should have a fully-qualified $type field that’s one of the message types in com.atproto.sync.subscribeRepos, eg com.atproto.sync.subscribeRepos#tombstone.

Properties: * repo (google.cloud.ndb.Key): DID of the first repo that included this block * encoded (bytes): DAG-CBOR encoded value * data (dict): DAG-JSON value, only used for human debugging * seq (int): sequence number for the subscribeRepos event stream

static create(*, repo_did, data, seq)[source]

Writes a new AtpBlock to the datastore.

If the block already exists in the datastore, leave it untouched. Notably, leave its sequence number as is, since it will be lower than this current sequence number.

Parameters:
Returns:

AtpBlock

to_block()[source]

Converts to Block.

Returns:

Block

classmethod from_block(block)[source]

Converts a Block to an AtpBlock.

Parameters:

block (Block)

Return type:

AtpBlock

class AtpSequence(**kwargs)[source]

Bases: Model

A sequence number for a given event stream NSID.

Sequence numbers are monotonically increasing, without gaps (which ATProto doesn’t require), starting at 1.

Key name is XRPC method NSID.

At first, I considered using datastore allocated ids for sequence numbers, but they’re not guaranteed to be monotonically increasing, so I switched to this.

class NdbMixin(*, ndb_client=None, ndb_context_kwargs=None, **kwargs)[source]

Bases: object

Mixin class that supports the ndb_context() decorator.

ndb_context(fn)[source]

Enters an ndb context if one isn’t already active.

Must be used on NdbMixin subclasses’ methods.

class DatastoreSequences(**kwargs)[source]

Bases: Sequences, NdbMixin

Datastore-based sequence numbers.

Sequences are stored in :class:`AtpSequence`s.

allocate(nsid)[source]

Returns the next sequence number for a given NSID.

Creates a new AtpSequence entity if one doesn’t already exist for the given NSID.

Parameters:

nsid (str) – the subscription XRPC method for this sequence number

Returns:

integer, next sequence number for this NSID

last(nsid)[source]

Returns the last allocated sequence number for a given NSID.

Parameters:

nsid (str) – the subscription XRPC method for this sequence number

Returns:

integer, last sequence number for this NSID, or None if we don’t know it

class MemcacheSequences(*, memcache, **kwargs)[source]

Bases: Sequences, NdbMixin

Memcache-backed sequence numbers.

Allocates sequence numbers from memcache for better performance, backed by :class:`AtpSequence`s in the datastore that allocate in batches.

Attributes: * memcache (pymemcache.client.base.Client) * max_seqs (dict): maps string nsid to integer max sequence number, the lower

bound on the AtpSequence’s current value. this is the highest seq we can allocate from memcache without allocating a new batch from the datastore and updating the stored AtpSequence’s value.

  • max_seqs_lock (threading.Lock): for modifying max_seqs

allocate(nsid)[source]

Allocates a single sequence number from memcache.

The memcache key is [nsid]-last-seq. Its value is the last sequence number we’ve allocated.

Parameters:

nsid (str) – the subscription XRPC method for this sequence number

Returns:

integer, next sequence number for this NSID

last(nsid)[source]

Returns the last allocated sequence number for a given NSID.

Parameters:

nsid (str) – the subscription XRPC method for this sequence number

Returns:

integer, last sequence number for this NSID, or None if we don’t know it

class AtpRemoteBlob(**kwargs)[source]

Bases: Model

A blob available at a public HTTP URL that we don’t store ourselves.

Key ID is the URL, truncated if necessary.

TODO: * follow redirects, use final URL as key id * abstract this in Storage

url

full length URL

duration

in ms

status

None means active

classmethod get_or_create(*, url=None, repo=None, get_fn=<function get>, max_size=None, accept_types=None, name='')[source]

Returns a new or existing AtpRemoteBlob for a given URL.

If there isn’t an existing AtpRemoteBlob, or if the existing one needs to be reloaded, fetches the URL over the network.

Parameters:
  • url (str)

  • repo (AtpRepo) – optional

  • get_fn (callable) – for making HTTP GET requests

  • max_size (int, optional) – the maxSize parameter for this blob field in its lexicon, if any

  • accept_types (sequence of str, optional) – the accept parameter for this blob field in its lexicon, if any. The set of allowed MIME types.

  • name (str, optional) – blob field name in lexicon

Returns:

existing or newly created blob

Return type:

AtpRemoteBlob

Raises:
  • RequestException – if the HTTP request to fetch the blob failed

  • lexrpc.ValidationError – if the blob is over max_size, its type is not in accept_types or it is a video with a duration above the 3m limit

maybe_fetch(get_fn=<function get>)[source]

Fetches the blob from its URL and updates its metadata, if necessary.

Parameters:

get_fn (callable, optional) – for making HTTP GET requests

as_object()[source]

Returns an ATProto blob object for this blob.

https://atproto.com/specs/data-model#blob-type

Returns:

with $type: blob and ref, mimeType, and

size fields. If cid is unset, returns None

Return type:

dict or None

generate_metadata(content)[source]

Extracts and stores metadata from an image or video.

Uses self.mime_type to determine whether/how to parse the content.

Parameters:

content (bytes)

validate(max_size=None, accept_types=None, name='')[source]

Checks that this blob satisfies size and type constraints.

Parameters:
  • max_size (int, optional) – the maxSize parameter for this blob field in its lexicon, if any

  • accept_types (sequence of str, optional) – the accept parameter for this blob field in its lexicon, if any. The set of allowed MIME types.

  • name (str, optional) – blob field name in lexicon

class DatastoreStorage(*, sequences=None, ndb_client=None, ndb_context_kwargs=None)[source]

Bases: Storage, NdbMixin

Google Cloud Datastore implementation of Storage.

Sequence numbers in :class:`AtpBlock`s are allocated per commit; all blocks in a given commit will have the same sequence number.

See Storage for method details.

util

Misc AT Protocol utils. TIDs, CIDs, etc.

exception InactiveRepo(did, status, *args, **kwargs)[source]

Bases: ValueError

Raised when loading a repo that’s not active.

status
Type:

str

now(tz=datetime.timezone.utc, **kwargs)[source]

Wrapper for datetime.datetime.now() that lets us mock it out in tests.

time_ns()[source]

Wrapper for time.time_ns() that lets us mock it out in tests.

dag_cbor_cid(obj)[source]

Returns the DAG-CBOR CID for a given object.

Parameters:

obj – CBOR-compatible native object or value

Return type:

CID

s32encode(num)[source]

Base32 encode with encoding variant sort.

Based on https://github.com/bluesky-social/atproto/blob/main/packages/common-web/src/tid.ts

Parameters:

num (int or Integral)

Return type:

str

s32decode(val)[source]

Base32 decode with encoding variant sort.

Based on https://github.com/bluesky-social/atproto/blob/main/packages/common-web/src/tid.ts

Parameters:

val (str)

Return type:

int or Integral

datetime_to_tid(dt, clock_id=None)[source]

Converts a datetime to a TID.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:
  • dt (datetime)

  • clock_id – 0, optional. If not specified, uses this runtime’s clock id

Returns:

base32-encoded TID

Return type:

str

int_to_tid(num, clock_id=None)[source]

Converts an integer to a TID.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:
  • seq (int)

  • clock_id (int) – optional. If not specified, uses this runtime’s clock id

Returns:

base32-encoded TID

Return type:

str

tid_to_datetime(tid)[source]

Converts an TID to a datetime.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:

tid (bytes) – base32-encoded TID

Return type:

datetime

Raises:

ValueError – if tid is not bytes or not 13 characters long

tid_to_int(tid)[source]

Converts an TID to an integer.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:

tid (bytes) – base32-encoded TID

Return type:

int

Raises:

ValueError – if tid is not bytes or not 13 characters long

next_tid()[source]

Returns the TID corresponding to the current time.

A TID is a base32-sortable-encoded UNIX timestamp (ie time since the epoch) in microseconds. Returned tids are guaranteed to monotonically increase across calls.

https://atproto.com/specs/atp#timestamp-ids-tid https://atproto.com/specs/record-key#record-key-type-tid https://github.com/bluesky-social/atproto/blob/main/packages/common-web/src/tid.ts

Returns:

TID

Return type:

str

at_uri(did, collection, rkey)[source]

Returns the at:// URI for a given DID, collection, and rkey.

https://atproto.com/specs/at-uri-scheme

Parameters:
Returns:

at:// URI

Return type:

str

parse_at_uri(uri)[source]

Parses the repo DID, collection, and rkey out of an at:// URI.

https://atproto.com/specs/at-uri-scheme

Parameters:

uri (str)

Returns:

(did, collection, rkey)

Return type:

tuple of str

new_key(seed=None)[source]

Generates a new ECC K-256 keypair.

https://atproto.com/specs/cryptography

Parameters:

seed (int) – optional deterministic value to derive private key from. Don’t use in production!

Return type:

ec.EllipticCurvePrivateKey

sign(obj, private_key)[source]

Signs an object, eg a repo commit or DID document.

Adds the signature in the sig field.

https://atproto.com/specs/cryptography

The signature is ECDSA around SHA-256 of the input, including a custom second pass to enforce that it’s the “low-S” variant: https://atproto.com/specs/cryptography#ecdsa-signature-malleability

Parameters:
Returns:

obj with new sig field

Return type:

dict

apply_low_s_mitigation(signature, curve)[source]

Low-S signature mitigation.

This prevents signature malleability. (It doesn’t guarantee deterministic signatures though!)

https://atproto.com/specs/cryptography#ecdsa-signature-malleability

From picopds. Thank you David! https://github.com/DavidBuchanan314/picopds/blob/main/signing.py

Parameters:
  • signature (bytes)

  • curve (ec.EllipticCurve)

Return type:

bytes

verify_sig(obj, public_key)[source]

Returns True if obj’s signature is valid, False otherwise.

See sign() for more background.

Parameters:
  • obj (dict) – repo commit

  • public_key (ec.EllipticCurvePublicKey)

Raises:

KeyError – if obj isn’t signed, ie doesn’t have a sig field

service_jwt(host, repo_did, privkey, expiration=datetime.timedelta(seconds=600), aud=None, **claims)[source]

Generates an inter-service JWT, eg for a relay or AppView.

https://atproto.com/specs/xrpc#inter-service-authentication-temporary-specification

Parameters:
  • host (str) – hostname of the service this JWT is for, eg bsky.network

  • repo_did (str) – DID of the repo this JWT is for

  • privkey (ec.EllipticCurvePrivateKey) – repo’s signing key

  • expiration (timedelta) – length of time this JWT will be valid, defaults to 10m

  • aud (str) – JWT audience. Default is did:web:[host], which works for relays and AppViews, but others (eg mod services) have did:plc instead.

  • claims (dict) – additional claims to include in the JWT, eg lxm

Returns:

JWT

Return type:

str

xrpc_repo

`com.atproto.repo.* XRPC methods.

create_record(input)[source]

Handler for com.atproto.repo.createRecord XRPC method.

get_record(input, repo=None, collection=None, rkey=None, cid=None)[source]

Handler for com.atproto.repo.getRecord XRPC method.

delete_record(input)[source]

Handler for com.atproto.repo.deleteRecord XRPC method.

list_records(input, repo=None, collection=None, limit=50, cursor=None, reverse=None, rkeyStart=None, rkeyEnd=None)[source]

Handler for com.atproto.repo.listRecords XRPC method.

KNOWN ISSUE: cursor is interpreted as inclusive, so whenever a cursor is used, the response includes the last record returned in the previous response.

put_record(input)[source]

Handler for com.atproto.repo.putRecord XRPC method.

describe_repo(input, repo=None)[source]

Handler for com.atproto.repo.describeRepo XRPC method.

import_repo(input)[source]

Handler for com.atproto.repo.importRepo XRPC method.

Requires that a repo doesn’t already exist for this DID.

apply_writes(input)[source]

Handler for com.atproto.repo.applyWrites XRPC method.

upload_blob(input)[source]

Handler for com.atproto.repo.uploadBlob XRPC method.

xrpc_server

com.atproto.server.* XRPC methods.

create_session(input)[source]

Handler for com.atproto.server.createSession XRPC method.

get_session(input)[source]

Handler for com.atproto.server.getSession XRPC method.

refresh_session(input, did=None, commit=None)[source]

Handler for com.atproto.server.refreshSession XRPC method.

describe_server(input)[source]

Handler for com.atproto.server.describeServer XRPC method.

get_account_invite_codes(input, includeUsed=None, createAvailable=None)[source]

Handler for com.atproto.server.getAccountInviteCodes XRPC method.

list_app_passwords(input)[source]

Handler for com.atproto.server.listAppPasswords XRPC method.

xrpc_sync

com.atproto.sync.* XRPC methods.

get_checkout(input, did=None)[source]

Handler for com.atproto.sync.getCheckout XRPC method.

Deprecated! Use getRepo instead.

Gets a checkout, either head or a specific commit.

get_repo(input, did=None, since=None)[source]

Handler for com.atproto.sync.getRepo XRPC method.

get_repo_status(input, did=None)[source]

Handler for com.atproto.sync.getRepoStatus XRPC method.

list_repos(input, limit=500, cursor=None)[source]

Handler for com.atproto.sync.listRepos XRPC method.

subscribe_repos(cursor=None)[source]

Firehose event stream XRPC (ie type: subscription) for all new commits.

Event stream details: https://atproto.com/specs/event-stream#framing

This function serves forever, which ties up a runtime context, so it’s not automatically registered with the XRPC server. Instead, clients should choose how to register and serve it themselves, eg asyncio vs threads vs WSGI workers.

See firehose.send_events() for an example thread-based callback to register with repo.Repo to deliver all new commits to subscribers. Here’s how to register that callback and this XRPC method in a threaded context:

def callback(data=None, lost_seq=None):
if data:

firehose.send_events()

elif lost_seq:

firehose.mark_seq_lost(lost_seq)

server.repo.callback = callback server.server.register(‘com.atproto.sync.subscribeRepos’, xrpc_sync.subscribe_repos)

Parameters:

cursor (int) – try to serve commits from this sequence number forward

Returns:

(header, payload)

Return type:

(dict, dict) tuple

get_blocks(input, did=None, cids=())[source]

Handler for com.atproto.sync.getBlocks XRPC method.

get_head(input, did=None)[source]

Handler for com.atproto.sync.getHead XRPC method.

Deprecated! Use getLatestCommit instead.

get_latest_commit(input, did=None)[source]

Handler for com.atproto.sync.getLatestCommit XRPC method.

get_record(input, did=None, collection=None, rkey=None)[source]

Handler for com.atproto.sync.getRecord XRPC method.

get_blob(input, did=None, cid=None)[source]

Handler for com.atproto.sync.getBlob XRPC method.

Right now only supports redirecting to “remote” blobs based on stored AtpRemoteBlobs.

list_blobs(input, did=None, since=None, limit=500, cursor=None)[source]

Handler for com.atproto.sync.listBlobs XRPC method.