arroba package

Reference documentation.

did

Utilities to create and resolve did:plcs, did:webs, and handles.

class arroba.did.DidPlc(did, signing_key, rotation_key, doc)

Bases: tuple

arroba.did.resolve(did, **kwargs)[source]

Resolves a did:plc or did:web.

Parameters:
Returns:

JSON DID document

Return type:

dict

Raises:
arroba.did.resolve_plc(did, get_fn=<function get>)[source]

Resolves a did:plc by fetching its DID document from a PLC registry.

The PLC registry hostname is specified in the PLC_HOST environment variable.

did:plc background:

Parameters:
  • did (str) –

  • get_fn (callable) – for making HTTP GET requests

Returns:

JSON DID document

Return type:

dict

Raises:
arroba.did.create_plc(handle, signing_key=None, rotation_key=None, pds_url=None, post_fn=<function post>)[source]

Creates a new did:plc in a PLC registry.

The PLC registry hostname is specified in the PLC_HOST environment variable.

did:plc background:

The DID document in the returned value is the new format DID doc, with the fully qualified verificationMethod.id and Multikey key encoding, ie did:key without the prefix. Details: https://github.com/bluesky-social/atproto/discussions/1510

Parameters:
  • handle (str) – domain handle to associate with this DID

  • signing_key (ec.EllipticCurvePrivateKey) – The curve must be SECP256K1. If omitted, a new keypair will be created.

  • rotation_key (ec.EllipticCurvePrivateKey) – The curve must be SECP256K1. If omitted, a new keypair will be created.

  • pds_url (str) – PDS base URL to associate with this DID. If omitted, defaults to https://[PDS_HOST]

  • post_fn (callable) – for making HTTP POST requests

Returns:

with the newly created did:plc, keys, and DID document

Return type:

DidPlc

Raises:
arroba.did.encode_did_key(pubkey)[source]

Encodes an ec.EllipticCurvePublicKey into a did:key string.

https://atproto.com/specs/did#public-key-encoding

Parameters:

pubkey (ec.EllipticCurvePublicKey) –

Returns:

encoded did:key

Return type:

str

arroba.did.decode_did_key(did_key)[source]

Decodes a did:key string into an ec.EllipticCurvePublicKey.

https://atproto.com/specs/did#public-key-encoding

Parameters:

did_key (str) –

Returns:

ec.EllipticCurvePublicKey

arroba.did.plc_operation_to_did_doc(op)[source]

Converts a PLC directory operation to a DID document.

https://github.com/bluesky-social/did-method-plc#presentation-as-did-document

The DID document in the returned value is the new format DID doc, with the fully qualified verificationMethod.id and Multikey key encoding, ie did:key without the prefix. Details: https://github.com/bluesky-social/atproto/discussions/1510

Parameters:

op – dict, PLC operation, https://github.com/did-method-plc/did-method-plc#operation-serialization-signing-and-validation

Returns:

DID document, https://www.w3.org/TR/did-core/#data-model

Return type:

dict

arroba.did.resolve_web(did, get_fn=<function get>)[source]

Resolves a did:web by fetching its DID document.

did:web spec: https://w3c-ccg.github.io/did-method-web/

Parameters:
  • did (str) –

  • get_fn (callable) – for making HTTP GET requests

Returns:

JSON DID document

Return type:

dict

Raises:
arroba.did.resolve_handle(handle, get_fn=<function get>)[source]

Resolves an ATProto handle to a DID.

Supports the DNS TXT record and HTTPS well-known methods.

https://atproto.com/specs/handle#handle-resolution

Parameters:
  • handle (str) –

  • get_fn (callable) – for making HTTP GET requests

Returns:

DID, or None if the handle can’t be resolved

Return type:

str or None

Raises:

ValueError – if handle is not a domain

diff

AT Protocol utility for diffing two MSTs.

Heavily based on: https://github.com/bluesky/atproto/blob/main/packages/repo/src/mst/diff.ts

Huge thanks to the Bluesky team for working in the public, in open source, and to Daniel Holmgren and Devin Ivy for this code specifically!

arroba.diff.mst_diff(cur, prev=None)[source]

Generates a diff between two MSTs.

Parameters:
  • cur (MST) –

  • prev (MST) – optional

Return type:

Diff

arroba.diff.null_diff(tree)[source]

Generates a “null” diff for a single MST with all adds and new CIDs.

Parameters:

tree (MST) –

Return type:

Diff

class arroba.diff.Change(key, cid, prev)

Bases: tuple

class arroba.diff.Diff[source]

Bases: object

A diff between two MSTs.

adds

maps str to Change

Type:

dict

updates

maps str to Change

Type:

dict

deletes

maps str to Change

Type:

dict

new_cids
Type:

set of CID

removed_cids
Type:

set of CID

__init__()[source]
static of(cur, prev=None)[source]
Parameters:
  • cur (MST) –

  • prev (MST) – optional

Return type:

Diff

record_add(key, cid)[source]
Parameters:
  • key (str) –

  • cid (CID) –

record_update(key, prev, cid)[source]
Parameters:
  • key (str) –

  • prev (CID) –

  • cid (CID) –

record_delete(key, cid)[source]
Parameters:
  • key (str) –

  • cid (CID) –

record_new_cid(cid)[source]
Parameters:

cid (CID) –

record_removed_cid(cid)[source]
Parameters:

cid (CID) –

add_diff(diff)[source]
Parameters:

diff (Diff) –

mst

Bluesky / AT Protocol Merkle search tree implementation.

Heavily based on: https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/mst/mst.ts

Huge thanks to the Bluesky team for working in the public, in open source, and to Daniel Holmgren and Devin Ivy for this code specifically!

From that file:

This is an implementation of a Merkle Search Tree (MST) The data structure is described here: https://hal.inria.fr/hal-02303490/document The MST is an ordered, insert-order-independent, deterministic tree. Keys are laid out in alphabetic order. The key insight of an MST is that each key is hashed and starting 0s are counted to determine which layer it falls on (5 zeros for ~32 fanout). This is a merkle tree, so each subtree is referred to by it’s hash (CID). When a leaf is changed, ever tree on the path to that leaf is changed as well, thereby updating the root hash.

For atproto, we use SHA-256 as the key hashing algorithm, and ~4 fanout (2-bits of zero per layer).

A couple notes on CBOR encoding:

There are never two neighboring subtrees. Therefore, we can represent a node as an array of leaves & pointers to their right neighbor (possibly null), along with a pointer to the left-most subtree (also possibly null).

Most keys in a subtree will have overlap. We do compression on prefixes by describing keys as: * the length of the prefix that it shares in common with the preceding key * the rest of the string

For example:

If the first leaf in a tree is bsky/posts/abcdefg and the second is bsky/posts/abcdehi, then the first will be described as prefix: 0, key: 'bsky/posts/abcdefg', and the second will be described as prefix: 16, key: 'hi'.

class arroba.mst.Entry(p, k, v, t)

Bases: tuple

class arroba.mst.Data(l, e)

Bases: tuple

class arroba.mst.Leaf(key, value)

Bases: tuple

class arroba.mst.MST(*, storage=None, entries=None, pointer=None, layer=None)[source]

Bases: object

Merkle search tree class.

storage
Type:

Storage

entries
Type:

sequence of MST and Leaf

layer

this MST’s layer in the root MST

Type:

int

pointer
Type:

CID

outdated_pointer

whether pointer needs to be recalculated

Type:

bool

__init__(*, storage=None, entries=None, pointer=None, layer=None)[source]

Constructor.

Parameters:
  • storage (Storage) –

  • entries (sequence of MST and Leaf) –

  • pointer (CID) –

  • layer (int) –

Return type:

MST

classmethod create(*, storage=None, entries=None, layer=None)[source]
Parameters:
  • storage (Storage) –

  • entries (sequence of MST and Leaf) –

  • layer (int) –

Returns:

MST

new_tree(entries)[source]

We never mutate an MST, we just return a new MST with updated values.

Parameters:

entries (sequence of MST and Leaf) –

Return type:

MST

get_entries()[source]

We don’t want to load entries of every subtree, just the ones we need.

Return type:

sequence of MST and Leaf

get_pointer()[source]

Returns this MST’s root CID pointer. Calculates it if necessary.

We don’t hash the node on every mutation for performance reasons. Instead we keep track of whether the pointer is outdated and only (recursively) calculate when needed.

Return type:

CID

get_layer()[source]

Returns this MST’s layer, and sets self.layer.

In most cases, we get the layer of a node from a hint on creation. In the case of the topmost node in the tree, we look for a key in the node & determine the layer. In the case where we don’t find one, we recurse down until we do. If we still can’t find one, then we have an empty tree and the node is layer 0.

Return type:

int

attempt_get_layer()[source]

Returns this MST’s layer, and sets self.layer.

Return type:

int or None

get_unstored_blocks()[source]

Return the necessary blocks to persist the MST to repo storage.

Return type:

(CID root, dict mapping CID to Block) tuple

add(key, value=None, known_zeros=None)[source]

Adds a new leaf for the given key/value pair.

Parameters:
  • key (str) –

  • value (CID) –

  • known_zeros (int) –

Return type:

MST

Raises:

ValueError – if a leaf with that key already exists

get(key)[source]

Gets the value at the given key.

Parameters:

key (str) –

Return type:

CID or None

update(key, value)[source]

Edits the value at the given key.

Parameters:
  • key (str) –

  • value (CID) –

Return type:

MST

Raises:

KeyError – if key doesn’t exist

delete(key)[source]

Deletes the value at the given key.

Parameters:

key (str) –

Returns:

MST

Raises:

KeyError – if key doesn’t exist

delete_recurse(key)[source]

Deletes the value and subtree, if any, at the given key.

Parameters:

key (str) –

Returns:

MST

update_entry(index, entry)[source]

Updates an entry in place.

Parameters:
Return type:

MST

remove_entry(index)[source]

Removes the entry at a given index.

Parameters:

index (int) –

Return type:

MST

append(entry)[source]

Appends an entry to the end of the node.

Parameters:

entry (MST or Leaf) –

Return type:

MST

prepend(entry)[source]

Prepends an entry to the start of the node.

Parameters:

entry (MST or Leaf) –

Return type:

MST

at_index(index)[source]

Returns the entry at a given index.

Parameters:

index (int) –

Return type:

MST or Leaf or None

slice(start=None, end=None)[source]

Returns a slice of this node.

Parameters:
  • start (int) – optional, inclusive

  • end (int) – optional, exclusive

Return type:

sequence of MST and Leaf

splice_in(entry, index)[source]

Inserts an entry at a given index.

Parameters:
Return type:

MST

replace_with_split(index, left=None, leaf=None, right=None)[source]

Replaces an entry with [ Maybe(tree), Leaf, Maybe(tree) ].

Parameters:
Return type:

MST

trim_top()[source]

Trims the top and return its subtree, if necessary.

Only if the topmost node in the tree only points to another tree. Otherwise, does nothing.

Return type:

MST

split_around(key)[source]

Recursively splits a subtree around a given key.

Parameters:

key (str) –

Return type:

(MST or None, MST or None) tuple

append_merge(to_merge)[source]

Merges another tree with this one.

The simple merge case where every key in the right tree is greater than every key in the left tree. Used primarily for deletes.

Parameters:

to_merge (MST) –

Return type:

MST

create_child()[source]
Return type:

MST

create_parent()[source]
Return type:

MST

find_gt_or_equal_leaf_index(key)[source]

Finds the index of the first leaf node greater than or equal to value.

Parameters:

key (str) –

Return type:

int

walk_leaves_from(key)[source]

Walk tree starting at key.

Generator for leaves in the tree, starting at a given rkey.

Parameters:

key (str) –

Generates:

Leaf

list(after=None, before=None)[source]

Returns entries, optionally bounded within an rkey range.

Parameters:
  • after (str) – rkey, optional

  • before (str) – rkey, optional

Return type:

sequence of Leaf

list_with_prefix(prefix)[source]

Returns entries with a given rkey prefix.

Parameters:

prefix (str) – rkey prefix

Returns:

sequence of Leaf

walk()[source]

Walk full tree, depth first, and emit nodes.

Return type:

generator of MST and Leaf

all_nodes()[source]

Walks the tree and returns all nodes.

Return type:

sequence of MST and Leaf

leaves()[source]

Walks tree and returns all leaves.

Return type:

sequence of Leaf

leaf_count()[source]

Returns the total number of leaves in this MST.

Return type:

int

load_all()[source]

Generator. Used in xrpc_sync.get_checkout().

(The bluesky-social/atproto TS code calls this writeToCarStream.)

Returns:

generator of (CID, bytes) tuples

arroba.mst.leading_zeros_on_hash(key)[source]

Returns the number of leading zeros in a key’s hash.

Parameters:

key (str or bytes) –

Return type:

int

arroba.mst.layer_for_entries(entries)[source]
Parameters:

entries (MST or Leaf) –

Return type:

int or None

arroba.mst.deserialize_node_data(*, storage=None, data=None, layer=None)[source]
Parameters:
Return type:

sequence of MST and Leaf

arroba.mst.serialize_node_data(entries)[source]
Parameters:

entries (sequence of MST and Leaf) –

Return type:

Data

arroba.mst.common_prefix_len(a, b)[source]
Parameters:
Return type:

int

arroba.mst.cid_for_entries(entries)[source]
Parameters:

entries (sequence of MST and Leaf) –

Returns:

CID

arroba.mst.ensure_valid_key(key)[source]
Parameters:

key (str) –

Raises:

ValueError – if key is not a valid MST key

class arroba.mst.WalkStatus(done, cur, walking, index)

Bases: tuple

class arroba.mst.Walker(tree)[source]

Bases: object

Allows walking an MST manually.

stack
Type:

sequence of WalkStatus

status

current

Type:

WalkStatus

__init__(tree)[source]

Constructor.

Parameters:

tree (MST) –

layer()[source]

Returns the curent layer of the node we’re on.

step_over()[source]

Moves to the next node in the subtree, skipping over the subtree.

step_into()[source]

Steps into a subtree.

Raises:

RuntimeError – if curently on a leaf

advance()[source]

Advances to the next node in the tree.

Steps into the curent node if necessary.

repo

Bluesky / AT Protocol repo implementation.

https://atproto.com/guides/data-repos

Heavily based on: https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts

Huge thanks to the Bluesky team for working in the public, in open source, and to Daniel Holmgren and Devin Ivy for this code specifically!

class arroba.repo.Write(action, collection, rkey, record)

Bases: tuple

arroba.repo.writes_to_commit_ops(writes)[source]

Converts Writes to CommitOps.

Parameters:

write (iterable) – of Write

Returns:

list of repo.CommitOp

class arroba.repo.Repo(*, storage=None, mst=None, head=None, handle=None, callback=None, signing_key=None, rotation_key=None)[source]

Bases: object

AT Protocol data repo implementation, storage agnostic.

did

repo DID (dynamic property)

Type:

str

version

AT Protocol version (dynamic property)

Type:

int

storage
Type:

Storage

mst
Type:

MST

head

head commit

Type:

Block

handle
Type:

str

callback (callable

(CommitData) => None): called on new commits. May be set directly by clients. None means no callback.

__init__(*, storage=None, mst=None, head=None, handle=None, callback=None, signing_key=None, rotation_key=None)[source]

Constructor.

Parameters:
  • storage (Storage) – required

  • mst (MST) –

  • commit (dict) – head commit

  • cid (CID) – head CID

  • callback (callable, CommitData => None) –

  • signing_key (ec.EllipticCurvePrivateKey) – required

  • rotation_key (ec.EllipticCurvePrivateKey) –

get_record(collection, rkey)[source]
Parameters:
  • collection (str) –

  • rkey (str) –

Returns:

node, record or commit or serialized MST

Return type:

dict

get_contents()[source]
Return type:

dict mapping str collection to dict mapping str rkey to dict record

classmethod create_from_commit(storage, commit_data, *, signing_key, rotation_key=None, **kwargs)[source]
Parameters:
  • storage (Storage) –

  • commit_data (CommitData) –

  • signing_key (ec.EllipticCurvePrivateKey) – passed through to Storage.create_repo()

  • rotation_key (ec.EllipticCurvePrivateKey) – optional, passed through to Storage.create_repo()

  • kwargs – passed through to Repo constructor

Return type:

Repo

classmethod create(storage, did, *, signing_key, rotation_key=None, initial_writes=None, **kwargs)[source]
Parameters:
  • storage (Storage) –

  • did (string) –

  • signing_key (ec.EllipticCurvePrivateKey) – passed through to Storage.create_repo

  • rotation_key (ec.EllipticCurvePrivateKey) – optional, passed through to Storage.create_repo

  • initial_writes (sequence of Write) –

  • kwargs – passed through to Repo constructor

Returns:

self

Return type:

Repo

classmethod load(storage, cid=None, **kwargs)[source]
Parameters:
  • storage (Storage) –

  • cid (CID) – optional

  • kwargs – passed through to Repo constructor

Return type:

Repo

classmethod format_commit(*, repo=None, storage=None, repo_did=None, signing_key=None, mst=None, cur_head=None, writes=None)[source]

Creates, but does not store, a new commit.

If repo is provided, all other kwargs should be omitted except (optionally) writes. Otherwise, storage, repo_did, and signing_key are required.

If repo is provided, its mst attribute is set to the new MST resulting from applying this commit.

Parameters:
  • repo (Repo) – optional

  • storage (Storage) – optional

  • repo_did (str) – optional

  • signing_key (ec.EllipticCurvePrivateKey) – optional

  • mst (MST) – optional

  • cur_head (CID) – optional

  • writes (sequence of Write) – optional

Return type:

CommitData

apply_commit(commit_data)[source]
Parameters:

commit_data (CommitData) –

Returns:

self

Return type:

Repo

apply_writes(writes)[source]
Parameters:

writes (Write or sequence of Write) –

Returns:

self

Return type:

Repo

storage

Bluesky repo storage base class and in-memory implementation.

Lightly based on: https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/storage/repo-storage.ts

class arroba.storage.Action(value)[source]

Bases: Enum

Used in Repo.format_commit().

TODO: switch to StrEnum once we can require Python 3.11.

class arroba.storage.CommitData(commit, blocks, prev)

Bases: tuple

class arroba.storage.CommitOp(action, path, cid)

Bases: tuple

class arroba.storage.Block(*, cid=None, decoded=None, encoded=None, seq=None, ops=None)[source]

Bases: object

An ATProto block: a record, MST entry, or commit.

Can start from either encoded bytes or decoded object, with or without CID. Decodes, encodes, and generates CID lazily, on demand, on attribute access.

Based on carbox.car.Block.

cid

lazy-loaded (dynamic property)

Type:

CID

decoded

decoded object (dynamic property)

Type:

dict

encoded

DAG-CBOR encoded data (dynamic property)

Type:

bytes

seq

com.atproto.sync.subscribeRepos sequence number

Type:

int

ops

CommitOps if this is a commit, otherwise None

Type:

list

__init__(*, cid=None, decoded=None, encoded=None, seq=None, ops=None)[source]

Constructor.

Parameters:
  • cid (CID) – optional

  • decoded (dict) – optional

  • encoded (bytes) – optional

__str__()[source]

Return str(self).

class arroba.storage.Storage[source]

Bases: object

Abstract base class for storing nodes: records, MST entries, and commits.

Concrete subclasses should implement this on top of physical storage, eg database, filesystem, in memory.

TODO: batch operations?

head
Type:

CID

create_repo(repo, *, signing_key, rotation_key=None)[source]

Stores a new repo’s metadata in storage.

Only stores the repo’s DID, handle, and head commit CID, not blocks!

If the repo already exists in storage, this should update it instead of failing.

Parameters:
  • repo (Repo) –

  • signing_key (ec.EllipticCurvePrivateKey) –

  • rotation_key (ec.EllipticCurvePrivateKey) – optional

load_repo(did_or_handle)[source]

Loads a repo from storage.

Parameters:

did_or_handle (str) – optional

Return type:

Repo, or None if the did or handle wasn’t found

read(cid)[source]

Reads a node from storage.

Parameters:

cid (CID) –

Return type:

Block, or None if not found

read_many(cids, require_all=True)[source]

Batch read multiple nodes from storage.

Parameters:
  • cids (sequence of CID) –

  • require_all (bool) – whether to assert that all cids are found

Returns:

{CID: Block or None if not found}

Return type:

dict

read_blocks_by_seq(start=0)[source]

Batch read blocks from storage by subscribeRepos sequence number.

Parameters:

seq (int) – optional subscribeRepos sequence number to start from. Defaults to 0.

Returns:

all Block s starting from seq, inclusive, in ascending seq order

Return type:

iterable or generator

read_commits_by_seq(start=0)[source]

Batch read commits from storage by subscribeRepos sequence number.

Parameters:

seq (int) – optional subscribeRepos sequence number to start from, inclusive. Defaults to 0.

Returns:

generator of CommitData, starting from seq, inclusive, in ascending seq order

Return type:

generator

has(cid)[source]

Checks if a given CID is currently stored.

Parameters:

cid (CID) –

Return type:

bool

write(repo_did, obj)[source]

Writes a node to storage.

Generates new sequence number(s) as necessary for newly stored blocks.

TODO: remove? This seems unused.

Parameters:
  • repo_did (str) –

  • obj (dict) – a record, commit, or serialized MST node

Return type:

CID

apply_commit(commit_data)[source]

Writes a commit to storage.

Generates a new sequence number and uses it for all blocks in the commit.

Parameters:

commit (CommitData) –

allocate_seq(nsid)[source]

Generates and returns a sequence number for the given NSID.

Sequence numbers must be monotonically increasing positive integers, per NSID. They may have gaps. Background: https://atproto.com/specs/event-stream#sequence-numbers

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int

last_seq(nsid)[source]

Returns the last (highest) stored sequence number for the given NSID.

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int

class arroba.storage.MemoryStorage[source]

Bases: Storage

In memory storage implementation.

repos
Type:

list of Repo

blocks

{CID: Block}

Type:

dict

head
Type:

CID

sequences

{str NSID: int next sequence number}

Type:

dict

__init__()[source]
create_repo(repo, *, signing_key, rotation_key=None)[source]

Stores a new repo’s metadata in storage.

Only stores the repo’s DID, handle, and head commit CID, not blocks!

If the repo already exists in storage, this should update it instead of failing.

Parameters:
  • repo (Repo) –

  • signing_key (ec.EllipticCurvePrivateKey) –

  • rotation_key (ec.EllipticCurvePrivateKey) – optional

load_repo(did_or_handle)[source]

Loads a repo from storage.

Parameters:

did_or_handle (str) – optional

Return type:

Repo, or None if the did or handle wasn’t found

read(cid)[source]

Reads a node from storage.

Parameters:

cid (CID) –

Return type:

Block, or None if not found

read_many(cids, require_all=True)[source]

Batch read multiple nodes from storage.

Parameters:
  • cids (sequence of CID) –

  • require_all (bool) – whether to assert that all cids are found

Returns:

{CID: Block or None if not found}

Return type:

dict

read_blocks_by_seq(start=0)[source]

Batch read blocks from storage by subscribeRepos sequence number.

Parameters:

seq (int) – optional subscribeRepos sequence number to start from. Defaults to 0.

Returns:

all Block s starting from seq, inclusive, in ascending seq order

Return type:

iterable or generator

has(cid)[source]

Checks if a given CID is currently stored.

Parameters:

cid (CID) –

Return type:

bool

write(repo_did, obj)[source]

Writes a node to storage.

Generates new sequence number(s) as necessary for newly stored blocks.

TODO: remove? This seems unused.

Parameters:
  • repo_did (str) –

  • obj (dict) – a record, commit, or serialized MST node

Return type:

CID

apply_commit(commit_data)[source]

Writes a commit to storage.

Generates a new sequence number and uses it for all blocks in the commit.

Parameters:

commit (CommitData) –

allocate_seq(nsid)[source]

Generates and returns a sequence number for the given NSID.

Sequence numbers must be monotonically increasing positive integers, per NSID. They may have gaps. Background: https://atproto.com/specs/event-stream#sequence-numbers

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int

last_seq(nsid)[source]

Returns the last (highest) stored sequence number for the given NSID.

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int

datastore_storage

Google Cloud Datastore implementation of repo storage.

class arroba.datastore_storage.WriteOnce[source]

Bases: object

ndb.Property mix-in, prevents changing it once it’s set.

class arroba.datastore_storage.JsonProperty(*args, **kwargs)[source]

Bases: TextProperty

Fork of ndb’s that subclasses ndb.TextProperty instead of ndb.BlobProperty.

This makes values show up as normal, human-readable, serialized JSON in the web console. https://github.com/googleapis/python-ndb/issues/874#issuecomment-1442753255

Duplicated in oauth-dropins/webutil: https://github.com/snarfed/webutil/blob/main/models.py

class arroba.datastore_storage.ComputedJsonProperty(*args, **kwargs)[source]

Bases: JsonProperty, ComputedProperty

Custom ndb.ComputedProperty for JSON values that stores them as strings.

…instead of like ndb.StructuredProperty, with “entity” type, which bloats them unnecessarily in the datastore.

__init__(*args, **kwargs)[source]

Constructor.

Args:

func: A function that takes one argument, the model instance, and

returns a calculated value.

class arroba.datastore_storage.WriteOnceBlobProperty(name=None, compressed=None, indexed=None, repeated=None, required=None, default=None, choices=None, validator=None, verbose_name=None, write_empty_list=None)[source]

Bases: WriteOnce, BlobProperty

class arroba.datastore_storage.CommitOp(**kwargs)[source]

Bases: Model

Repo operations - creates, updates, deletes - included in a commit.

Used in a StructuredProperty inside AtpBlock; not stored directly in the datastore.

https://googleapis.dev/python/python-ndb/latest/model.html#google.cloud.ndb.model.StructuredProperty

class arroba.datastore_storage.AtpRepo(**kwargs)[source]

Bases: Model

An ATProto repo.

Key name is DID. Only stores the repo’s metadata. Blocks are stored in AtpBlocks.

Attributes: * handles (str): repeated, optional * head (str): CID * signing_key (str) * rotation_key (str)

property signing_key

(ec.EllipticCurvePrivateKey)

property rotation_key

(ec.EllipticCurvePrivateKey` or None)

class arroba.datastore_storage.AtpBlock(**kwargs)[source]

Bases: Model

A data record, MST node, or commit.

Key name is the DAG-CBOR base32 CID of the data.

Properties: * encoded (bytes): DAG-CBOR encoded value * data (dict): DAG-JSON value, only used for human debugging * seq (int): sequence number for the subscribeRepos event stream

static create(*, repo_did, data, seq)[source]

Writes a new AtpBlock to the datastore.

If the block already exists in the datastore, leave it untouched. Notably, leave its sequence number as is, since it will be lower than this current sequence number.

Parameters:
  • repo_did (str) –

  • data (dict) – value

  • seq (int) –

Returns:

AtpBlock

to_block()[source]

Converts to Block.

Returns:

Block

classmethod from_block(*, repo_did, block)[source]

Converts a Block to an AtpBlock.

Parameters:
Returns:

AtpBlock

class arroba.datastore_storage.AtpSequence(**kwargs)[source]

Bases: Model

A sequence number for a given event stream NSID.

Sequence numbers are monotonically increasing, without gaps (which ATProto doesn’t require), starting at 1. Background: https://atproto.com/specs/event-stream#sequence-numbers

Key name is XRPC method NSID.

At first, I considered using datastore allocated ids for sequence numbers, but they’re not guaranteed to be monotonically increasing, so I switched to this.

classmethod allocate(nsid)[source]

Returns the next sequence number for a given NSID.

Creates a new AtpSequence entity if one doesn’t already exist for the given NSID.

Parameters:

nsid (str) – the subscription XRPC method for this sequence number

Returns:

integer, next sequence number for this NSID

classmethod last(nsid)[source]

Returns the last sequence number for a given NSID.

Creates a new AtpSequence entity if one doesn’t already exist for the given NSID.

Parameters:

nsid (str) – the subscription XRPC method for this sequence number

Returns:

integer, last sequence number for this NSID

class arroba.datastore_storage.AtpRemoteBlob(**kwargs)[source]

Bases: Model

A blob available at a public HTTP URL that we don’t store ourselves.

Key ID is the URL.

TODO: * follow redirects, use final URL as key id * abstract this in Storage

classmethod get_or_create(*, url=None, get_fn=<function get>)[source]

Returns a new or existing AtpRemoteBlob for a given URL.

If there isn’t an existing AtpRemoteBlob, fetches the URL over the network and creates a new one for it.

Parameters:
  • url (str) –

  • get_fn (callable) – for making HTTP GET requests

Returns:

existing or newly created AtpRemoteBlob

Return type:

AtpRemoteBlob

Raises:

requests.RequestException – if the HTTP request to fetch the blob failed

as_object()[source]

Returns an ATProto blob object for this blob.

https://atproto.com/specs/data-model#blob-type

Return type:

dict

class arroba.datastore_storage.DatastoreStorage[source]

Bases: Storage

Google Cloud Datastore implementation of Storage.

Sequence numbers in AtpBlock are allocated per commit; all blocks in a given commit will have the same sequence number. They’re currently sequential counters, starting at 1, stored in an AtpSequence entity.

See Storage for method details.

create_repo(repo, *, signing_key, rotation_key=None)[source]

Stores a new repo’s metadata in storage.

Only stores the repo’s DID, handle, and head commit CID, not blocks!

If the repo already exists in storage, this should update it instead of failing.

Parameters:
  • repo (Repo) –

  • signing_key (ec.EllipticCurvePrivateKey) –

  • rotation_key (ec.EllipticCurvePrivateKey) – optional

load_repo(did_or_handle)[source]

Loads a repo from storage.

Parameters:

did_or_handle (str) – optional

Return type:

Repo, or None if the did or handle wasn’t found

read(cid)[source]

Reads a node from storage.

Parameters:

cid (CID) –

Return type:

Block, or None if not found

read_many(cids)[source]

Batch read multiple nodes from storage.

Parameters:
  • cids (sequence of CID) –

  • require_all (bool) – whether to assert that all cids are found

Returns:

{CID: Block or None if not found}

Return type:

dict

read_blocks_by_seq(start=0)[source]

Batch read blocks from storage by subscribeRepos sequence number.

Parameters:

seq (int) – optional subscribeRepos sequence number to start from. Defaults to 0.

Returns:

all Block s starting from seq, inclusive, in ascending seq order

Return type:

iterable or generator

has(cid)[source]

Checks if a given CID is currently stored.

Parameters:

cid (CID) –

Return type:

bool

write(repo_did, obj)[source]

Writes a node to storage.

Generates new sequence number(s) as necessary for newly stored blocks.

TODO: remove? This seems unused.

Parameters:
  • repo_did (str) –

  • obj (dict) – a record, commit, or serialized MST node

Return type:

CID

apply_commit(commit_data)[source]

Writes a commit to storage.

Generates a new sequence number and uses it for all blocks in the commit.

Parameters:

commit (CommitData) –

allocate_seq(nsid)[source]

Generates and returns a sequence number for the given NSID.

Sequence numbers must be monotonically increasing positive integers, per NSID. They may have gaps. Background: https://atproto.com/specs/event-stream#sequence-numbers

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int

last_seq(nsid)[source]

Returns the last (highest) stored sequence number for the given NSID.

Parameters:

nsid (str) – subscription XRPC method this sequence number is for

Return type:

int

util

Misc AT Protocol utils. TIDs, CIDs, etc.

arroba.util.now(tz=datetime.timezone.utc, **kwargs)[source]

Wrapper for datetime.datetime.now() that lets us mock it out in tests.

arroba.util.time_ns()[source]

Wrapper for time.time_ns() that lets us mock it out in tests.

arroba.util.dag_cbor_cid(obj)[source]

Returns the DAG-CBOR CID for a given object.

Parameters:

obj – CBOR-compatible native object or value

Return type:

CID

arroba.util.s32encode(num)[source]

Base32 encode with encoding variant sort.

Based on https://github.com/bluesky-social/atproto/blob/main/packages/common-web/src/tid.ts

Parameters:

num (int or Integral) –

Return type:

str

arroba.util.s32decode(val)[source]

Base32 decode with encoding variant sort.

Based on https://github.com/bluesky-social/atproto/blob/main/packages/common-web/src/tid.ts

Parameters:

val (str) –

Return type:

int or Integral

arroba.util.datetime_to_tid(dt, clock_id=None)[source]

Converts a datetime to a TID.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:
  • dt (datetime.datetime) –

  • clock_id – 0, optional. If not specified, uses this runtime’s clock id

Returns:

base32-encoded TID

Return type:

str

arroba.util.int_to_tid(num, clock_id=None)[source]

Converts an integer to a TID.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:
  • seq (int) –

  • clock_id (int) – optional. If not specified, uses this runtime’s clock id

Returns:

base32-encoded TID

Return type:

str

arroba.util.tid_to_datetime(tid)[source]

Converts an TID to a datetime.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:

tid (bytes) – base32-encoded TID

Return type:

datetime.datetime

Raises:

ValueError – if tid is not bytes or not 13 characters long

arroba.util.tid_to_int(tid)[source]

Converts an TID to an integer.

https://atproto.com/specs/record-key#record-key-type-tid

Parameters:

tid (bytes) – base32-encoded TID

Return type:

int

Raises:

ValueError – if tid is not bytes or not 13 characters long

arroba.util.next_tid()[source]

Returns the TID corresponding to the current time.

A TID is a base32-sortable-encoded UNIX timestamp (ie time since the epoch) in microseconds. Returned tids are guaranteed to monotonically increase across calls.

https://atproto.com/specs/atp#timestamp-ids-tid https://atproto.com/specs/record-key#record-key-type-tid https://github.com/bluesky-social/atproto/blob/main/packages/common-web/src/tid.ts

Returns:

TID

Return type:

str

arroba.util.at_uri(did, collection, rkey)[source]

Returns the at:// URI for a given DID, collection, and rkey.

https://atproto.com/specs/at-uri-scheme

Parameters:
  • did (str) –

  • collection (str) –

  • rkey (str) –

Returns:

at:// URI

Return type:

str

arroba.util.parse_at_uri(uri)[source]

Parses the repo DID, collection, and rkey out of an at:// URI.

https://atproto.com/specs/at-uri-scheme

Parameters:

uri (str) –

Returns:

(did, collection, rkey)

Return type:

tuple of str

arroba.util.new_key(seed=None)[source]

Generates a new ECC K-256 keypair.

https://atproto.com/specs/cryptography

Parameters:

seed (int) – optional deterministic value to derive private key from. Don’t use in production!

Return type:

ec.EllipticCurvePrivateKey

arroba.util.sign(obj, private_key)[source]

Signs an object, eg a repo commit or DID document.

Adds the signature in the sig field.

https://atproto.com/specs/cryptography

The signature is ECDSA around SHA-256 of the input, including a custom second pass to enforce that it’s the “low-S” variant: https://atproto.com/specs/cryptography#ecdsa-signature-malleability

Parameters:
Returns:

obj with new sig field

Return type:

dict

arroba.util.apply_low_s_mitigation(signature, curve)[source]

Low-S signature mitigation.

This prevents signature malleability. (It doesn’t guarantee deterministic signatures though!)

https://atproto.com/specs/cryptography#ecdsa-signature-malleability

From picopds. Thank you David! https://github.com/DavidBuchanan314/picopds/blob/main/signing.py

Parameters:
  • signature (bytes) –

  • curve (ec.EllipticCurve) –

Return type:

bytes

arroba.util.verify_sig(obj, public_key)[source]

Returns True if obj’s signature is valid, False otherwise.

See sign() for more background.

Parameters:
  • obj (dict) – repo commit

  • public_key (ec.EllipticCurvePublicKey) –

Raises:

KeyError – if obj isn’t signed, ie doesn’t have a sig field

arroba.util.service_jwt(host, repo_did, privkey, expiration=datetime.timedelta(seconds=600))[source]

Generates an inter-service JWT, eg for a relay or AppView.

https://atproto.com/specs/xrpc#inter-service-authentication-temporary-specification

Parameters:
  • host (str) – hostname of the service this JWT is for, eg bgs.bsky-sandbox.dev

  • repo_did (str) – DID of the repo this JWT is for

  • privkey (ec.EllipticCurvePrivateKey) – repo’s signing key

  • expiration (timedelta) – length of time this JWT will be valid, defaults to 10m

Returns:

JWT

Return type:

str

xrpc_repo

xrpc_server

xrpc_sync