Python implementation of Bluesky PDS and AT Protocol, including data repository, Merkle search tree, and XRPC methods.
You can build your own PDS on top of arroba with just a few lines of Python and run it in any WSGI server. You can build a more involved PDS with custom logic and behavior. Or you can build a different ATProto service, eg an AppView, relay (née BGS), or something entirely new!
Install from PyPI with pip install arroba.
Arroba is the Spanish word for the @ character ("at sign").
License: This project is placed in the public domain. You may also use it under the CC0 License.
Here's minimal example code for a multi-repo PDS on top of arroba and Flask:
from flask import Flask
from google.cloud import ndb
from lexrpc.flask_server import init_flask
from arroba import server
from arroba.datastore_storage import DatastoreStorage
from arroba.firehose import send_events
# for Google Cloud Datastore
ndb_client = ndb.Client()
server.storage = DatastoreStorage(ndb_client=ndb_client)
server.repo.callback = lambda _: send_events() # to subscribeRepos
app = Flask('my-pds')
init_flask(server.server, app)
def ndb_context_middleware(wsgi_app):
def wrapper(environ, start_response):
with ndb_client.context():
return wsgi_app(environ, start_response)
return wrapper
app.wsgi_app = ndb_context_middleware(app.wsgi_app)See app.py for a more comprehensive example, including a CORS handler for OPTIONS preflight requests and a catch-all app.bsky.* XRPC handler that proxies requests to the AppView.
Arroba consists of these parts:
- Data structures:
- Storage:
Storageabstract base classDatastoreStorage(uses Google Cloud Datastore)- TODO: filesystem storage
- XRPC handlers:
- Utilities:
did: create and resolvedid:plcs,did:webs, and domain handlesdiff: find the deterministic minimal difference between twoMSTsutil: miscellaneous utilities for TIDs, AT URIs, signing and verifying signatures, generating JWTs, encoding/decoding, and more
Configure arroba with these environment variables:
APPVIEW_HOST, defaultapi.bsky-sandbox.devRELAY_HOST, defaultbgs.bsky-sandbox.devPLC_HOST, defaultplc.bsky-sandbox.devPDS_HOST, where you're running your PDS
Optional, only used in com.atproto.repo, .server, and .sync XRPC handlers:
REPO_TOKEN, static token to use as bothaccessJwtandrefreshJwt, defaults to contents ofrepo_tokenfile. Not required to be an actual JWT. If not set, XRPC methods that require auth will return HTTP 501 Not Implemented.ROLLBACK_WINDOW, number of events to serve in thesubscribeReposrollback window, as an integer. Defaults to 50k.PRELOAD_WINDOW, number of events to preload into thesubscribeReposrollback window at startup, as an integer. Defaults to 4k.SUBSCRIBE_REPOS_BATCH_DELAY, minimum time to wait between datastore queries incom.atproto.sync.subscribeRepos, in seconds, as a float. Defaults to 0 if unset.BLOB_MAX_BYTES, maximum allowed size of blobs, in bytes. Defaults to 100MB.BLOB_REFETCH_DAYS, how often in days to refetch remote URL-based blobs datastore to check that they're still serving. May be integer or float. Defaults to 7. These re-fetches happen on demand, duringcom.atproto.sync.getBlobrequests.BLOB_REFETCH_TYPES, comma-separated list of MIME types (without subtypes, ie the part after/) to refetch blobs for. Defaults toimage.
Breaking changes:
- When creating a new repo, the first commit is now always empty.
Repo.create_from_commithas been removed; all repos should now be created withRepo.create. - Removed
Repo.apply_writes,format_commit,apply_commit, andwrites_to_commit_ops. Use the newStorage.commitmethod instead.
Non-breaking changes:
AtpRemoteBlob:- Add
reposproperty to track which repos have which blobs. - Switch image handling to pymediainfo, drop Pillow dependency.
- Add
did:- Add new
rollback_plcfunction.
- Add new
xrpc_sync:get_blob: periodically check remote blobs with HTTP GET requests to see if they're still serving.get_record: include MST covering proof blocks for record.- Implement
listBlobs. subscribeRepos/firehose: handle uncaught exceptions and continue serving (snarfed/bridgy-fed#2150).
- Add server side support for sync v1.1 aka inductive firehose.
xrpc_sync.subscribe_reposnow includes covering proof blocks and newprevandprevDatafields. MST:- Add new
cids_for_path,add_covering_proofsmethods.
- Add new
Repo:apply_writes: skip no-op update operations where the new record value is the same as the existing stored record. (No-op updates are evidently illegal in ATProto.)- Emit new
#syncevent when a new repo is created.
Storage:read_events_by_seq: always include the MST root block in every commit event.
DatastoreStorage:AtpRemoteBlob.get_or_create: truncate URLs to 1500 characters.- Extract out new
AtpRemoteBlob.generate_private_keymethod. - Optimize
write_blocksto batch get and put.
did:resolve_handle: supportdid:webs in the HTTPS/.well-known/atproto-didmethod.
xrpc_sync:- Drastically redesign
subscribeReposto unify event stream generation across all subscribers. This significantly improves scalability and reduces CPU and I/O to near constant, with minimal additional overhead per subscriber (#52).
- Drastically redesign
Breaking changes:
repo:apply_commit,apply_writes: raise an exception if the repo is inactive.
storage:create_repo: removesigning_keyandrotation_keykwargs, read them from input repo instead.load_repo: don't raise an exception if the repo is tombstoned.
datastore_storage:- Stop storing
AtpBlock.decodedin the datastore, it's now just an in memory@property.
- Stop storing
util:- Rename
TombstonedRepotoInactiveRepo.
- Rename
Non-breaking changes:
datastore_storage:DatastoreStorage:- Add new
ndb_context_kwargsconstructor kwarg. apply_commit: handle deactivated repos.create_repo: propagateRepo.statusintoAtpRepo.
- Add new
AtpRemoteBlob:get_or_create: drop datastore transaction.- Add
widthandheightproperties, populated for images and videos, to be used in image/video embedaspectRatio(snarfed/bridgy-fed#1571). - Check video length, raise
ValidationErroron videos over 3 minutes.
did:- Add new
get_signing_key,get_handlefunctions. create_plc: remove trailing slash fromservices.atproto_pds.endpoint.
- Add new
storage:Storage: add newwrite_blocksmethod, implement inMemoryStorageandDatastoreStorage.
xrpc_repo:describe_server: include allapp.bskycollections and others likechat.bsky.actor.declaration; fetch and include DID doc.- Implement
com.atproto.repo.importRepo.
xrpc_sync:get_blob:- If we have more than one blob URL for the same CID, serve the latest one (bridgy-fed#1650.
- Add HTTP
Cache-Controlto cache for 1h.
list_repos:- Bug fix: Use string TID for
rev, not integer sequence number. - Bug fix: don't set status to
nullif the account is active.
- Bug fix: Use string TID for
Breaking changes:
- Add much more lexicon schema validation for records and XRPC method input, output, and parameters.
storage:- Switch
Storage.writeto returnBlockinstead ofCID.
- Switch
Non-breaking changes:
did:- Add new
update_plcmethod. create_plc: add newalso_known_askwarg.resolve_handle: dropContent-Type: text/plainrequirement for HTTPS method.
- Add new
mst:- Add new optional
startkwarg toload_all.
- Add new optional
repo:- Emit new #identity and #account events to
subscribeReposwhen creating new repos.
- Emit new #identity and #account events to
storage:- Add new
deactivate_repo,activate_repo, andwrite_eventmethods. - Add new optional
repokwarg toread_blocks_by_seqandread_events_by_seqto limit returned results to a single repo.
- Add new
datastore_storage:- Add new
max_sizeandaccept_typeskwarg toAtpRemoteBlob.get_or_createfor the blob'smaxSizeandacceptparameters in its lexicon. If the fetched file doesn't satisfy those constraints, raiseslexrpc.ValidationError. DatastoreStorage.read_blocks_by_seq: use strong consistency for datastore query. May fix occasionalAssertionErrorwhen servingsubscribeRepos.
- Add new
xrpc_sync:- Switch
getBlobfrom returning HTTP 302 to 301. - Implement
sinceparam ingetRepo. subscribeRepos: wait up to 60s on a skipped sequence number before giving up and emitting it as a gap.
- Switch
util:service_jwt: add new**claimsparameter for additional JWT claims, eglxm.
Breaking changes:
datastore_storage:DatastoreStorage: add new requiredndb_clientkwarg to constructor, used to get new context in lexrpc websocket subscription handlers that run server methods likesubscribeReposin separate threads (snarfed/lexrpc#8).DatastoreStorage.read_blocks_by_seq: if the ndb context gets closed while we're still running, log a warning and return. (This can happen in egflask_serverif the websocket client disconnects early.)AtpRemoteBlob: if the blob URL doesn't return theContent-Typeheader, infer type from the URL, or fall back toapplication/octet-stream(bridgy-fed#1073).
did:- Cache
resolve_plc,resolve_web, andresolve_handlefor 6h, up to 5000 total results per call.
- Cache
storage: renameStorage.read_commits_by_seqtoread_events_by_seqfor new account tombstone support.xrpc_sync: renamesend_new_commitstosend_events, ditto.xrpc_repo: stop requiring auth for read methods:getRecord,listRecords,describeRepo.
Non-breaking changes:
did:- Add
HANDLE_REregexp for handle validation.
- Add
storage:- Add new
Storage.tombstone_repomethod, implemented inMemoryStorageandDatastoreStorage. Used to delete accounts. (bridgy-fed#783) - Add new
Storage.load_reposmethod, implemented inMemoryStorageandDatastoreStorage. Used forcom.atproto.sync.listRepos.
- Add new
util:service_jwt: add optionalaudkwarg.
xrpc_sync:subscribeRepos:- Add support for non-commit events, starting with account tombstones.
- Add
ROLLBACK_WINDOWenvironment variable to limit size of rollback window. Defaults to no limit. - For commits with create or update operations, always include the record block, even if it already existed in the repo beforehand (snarfed/bridgy-fed#1016).
- Bug fix, populate the time each commit was created in
timeinstead of the current time (snarfed/bridgy-fed#1015).
- Start serving
getRepoqueries with thesinceparameter.sincestill isn't actually implemented, but we now serve the entire repo instead of returning an error. - Implement
getRepoStatusmethod. - Implement
listReposmethod. getRepobug fix: include the repo head commit block.
xrpc_repo:getRecord: encoded returned records correctly as ATProto-flavored DAG-JSON.
xrpc_*: returnRepoNotFoundandRepoDeactivatederrors when appropriate (snarfed/bridgy-fed#1083).
- Bug fix: base32-encode TIDs in record keys,
at://URIs, commitrevs, etc. Before, we were using the integer UNIX timestamp directly, which happened to be the same 13 character length. Oops. - Switch from
BGS_HOSTenvironment variable toRELAY_HOST.BGS_HOSTis still supported for backward compatibility. datastore_storage:- Bug fix for
DatastoreStorage.last_seq, handle new NSID. - Add new
AtpRemoteBlobclass for storing "remote" blobs, available at public HTTP URLs, that we don't store ourselves.
- Bug fix for
did:create_plc: strip padding from genesis operation signature (for did-method-plc#54, atproto#1839).resolve_handle: return None on bad domain, eg.foo.com.resolve_handlebug fix: handlecharsetspecifier in HTTPS method responseContent-Type.
util:new_key: addseedkwarg to allow deterministic key generation.
xrpc_repo:getRecord: try to load record locally first; if not available, forward to AppView.
xrpc_sync:- Implement
getBlob, right now only based on "remote" blobs stored inAtpRemoteBlobs in datastore storage.
- Implement
- Migrate to ATProto repo v3. Specifically, the existing
subscribeRepossequence number is reused as the newrevfield in commits. (Discussion.). - Add new
didmodule with utilities to create and resolvedid:plcs and resolvedid:webs. - Add new
util.service_jwtfunction that generates ATProto inter-service JWTs. Repo:- Add new
signing_key/rotation_keyattributes. Generate store, and load both indatastore_storage. - Remove
format_init_commit, migrate existing calls toformat_commit.
- Add new
Storage:- Rename
read_from_seq=>read_blocks_by_seq(and inMemoryStorageandDatastoreStorage), add newread_commits_by_seqmethod. - Merge
load_repodid/handlekwargs intodid_or_handle.
- Rename
- XRPCs:
- Make
subscribeReposcheck storage for all new commits every time it wakes up.- As part of this, replace
xrpc_sync.enqueue_commitwith newsend_new_commitsfunction that takes no parameters.
- As part of this, replace
- Drop bundled
app.bsky/com.atprotolexicons, use lexrpc's instead.
- Make
Big milestone: arroba is successfully federating with the ATProto sandbox! See app.py for the minimal demo code needed to wrap arroba in a fully functional PDS.
- Add Google Cloud Datastore implementation of repo storage.
- Implement
com.atprotoXRPC methods needed to federate with sandbox, including most ofrepoandsync.- Notably, includes
subscribeReposserver side over websocket.
- Notably, includes
- ...and much more.
Implement repo and commit chain in new Repo class, including pluggable storage. This completes the first pass at all PDS data structures. Next release will include initial implementations of the com.atproto.sync.* XRPC methods.
Initial release! Still very in progress. MST, Walker, and Diff classes are mostly complete and working. Repo, commits, and sync XRPC methods are still in progress.
Here's how to package, test, and ship a new release.
- Pull from remote to make sure we're at head.
git checkout main git pull
- Run the unit tests.
source local/bin/activate.csh python -m unittest discover python -m unittest arroba.tests.mst_test_suite # more extensive, slower tests (deliberately excluded from autodiscovery)
- Bump the version number in
pyproject.tomlanddocs/conf.py.git grepthe old version number to make sure it only appears in the changelog. Change the current changelog entry inREADME.mdfor this new version from unreleased to the current date. - Build the docs. If you added any new modules, add them to the appropriate file(s) in
docs/source/. Then run./docs/build.sh. Check that the generated HTML looks fine by openingdocs/_build/html/index.htmland looking around. -
setenv ver X.Y git commit -am "release v$ver" - Upload to test.pypi.org for testing.
python -m build twine upload -r pypitest dist/arroba-$ver*
- Install from test.pypi.org.
cd /tmp python -m venv local source local/bin/activate.csh # make sure we force pip to use the uploaded version pip uninstall arroba pip install --upgrade pip pip install -i https://test.pypi.org/simple --extra-index-url https://pypi.org/simple arroba==$ver
- Smoke test that the code trivially loads and runs.
python from arroba import did did.resolve_handle('snarfed.org') - Tag the release in git. In the tag message editor, delete the generated comments at bottom, leave the first line blank (to omit the release "title" in github), put
### Notable changeson the second line, then copy and paste this version's changelog contents below it.git tag -a v$ver --cleanup=verbatim git push && git push --tags
- Click here to draft a new release on GitHub. Enter
vX.Yin the Tag version box. Leave Release title empty. Copy### Notable changesand the changelog contents into the description text box. - Upload to pypi.org!
twine upload dist/arroba-$ver*
- Wait for the docs to build on Read the Docs, then check that they look ok.
- On the Versions page, check that the new version is active, If it's not, activate it in the Activate a Version section.