Move shared libraries into workspace; Spin rsky-repo out into its own crate

This commit is contained in:
Rudy Fraser
2025-02-07 20:02:31 -05:00
parent d13d2cc0c3
commit 6b61e0ae47
123 changed files with 2492 additions and 2426 deletions

1
.idea/rsky.iml generated
View File

@ -19,6 +19,7 @@
<sourceFolder url="file://$MODULE_DIR$/rsky-common/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rsky-labeler/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rsky-jetstream-subscriber/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/rsky-repo/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" />
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>

View File

@ -1,14 +1,28 @@
[workspace]
members = [ "rsky-common", "rsky-crypto","rsky-feedgen", "rsky-firehose", "rsky-identity", "rsky-labeler", "rsky-lexicon", "rsky-pds", "rsky-syntax", "rsky-jetstream-subscriber"]
members = [ "rsky-common", "rsky-crypto","rsky-feedgen", "rsky-firehose", "rsky-identity", "rsky-labeler", "rsky-lexicon", "rsky-pds", "rsky-syntax", "rsky-jetstream-subscriber", "rsky-repo"]
resolver = "2"
[workspace.dependencies]
cargo = { version = "0.84.0",features = ["vendored-openssl"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_derive = "^1.0"
serde_ipld_dagcbor = { version = "0.6.1" ,features = ["codec"]}
lexicon_cid = { package = "cid", version = "0.10.1", features = ["serde-codec"] }
libipld = "0.16.0"
serde_cbor = "0.11.2"
serde_bytes = "0.11.15"
tokio = { version = "1.28.2",features = ["full"] }
sha2 = "0.11.0-pre.3"
rand = "0.8.5"
rand_core = "0.6.4"
secp256k1 = { version = "0.28.2", features = ["global-context", "serde", "rand", "hashes","rand-std"] }
serde_json = { version = "1.0.96",features = ["preserve_order"] }
rsky-lexicon = {path = "rsky-lexicon", version = "0.2.3"}
rsky-identity = {path = "rsky-identity", version = "0.1.0"}
rsky-crypto = {path = "rsky-crypto", version = "0.1.1"}
rsky-syntax = {path = "rsky-syntax", version = "0.1.0"}
rsky-common = {path = "rsky-common", version = "0.1.0"}
rsky-common = {path = "rsky-common", version = "0.1.1"}
rsky-repo = {path = "rsky-repo", version = "0.0.1"}
[profile.release]
debug = 2 # Or any level from 0 to 2

View File

@ -37,12 +37,14 @@ rsky (/ˈrɪski/) is intended to be a full implementation of [AT Protocol](https
| `rsky-lexicon`: schema definition language | [README](./rsky-lexicon/README.md) | [![Crate](https://img.shields.io/crates/v/rsky-lexicon?logo=rust&style=flat-square&logoColor=E05D44&color=E05D44)](https://crates.io/crates/rsky-lexicon) |
| `rsky-syntax`: string parsers for identifiers | [README](./rsky-syntax/README.md) | [![Crate](https://img.shields.io/crates/v/rsky-syntax?logo=rust&style=flat-square&logoColor=E05D44&color=E05D44)](https://crates.io/crates/rsky-syntax) |
| `rsky-common`: shared code | [README](./rsky-common/README.md) | [![Crate](https://img.shields.io/crates/v/rsky-common?logo=rust&style=flat-square&logoColor=E05D44&color=E05D44)](https://crates.io/crates/rsky-common) |
| `rsky-repo`: data storage structure, including MST | [README](./rsky-repo/README.md) | [![Crate](https://img.shields.io/crates/v/rsky-repo?logo=rust&style=flat-square&logoColor=E05D44&color=E05D44)](https://crates.io/crates/rsky-repo) |
**Rust Services:**
- `rsky-pds`: "Personal Data Server", hosting repo content for atproto accounts. It differs from the canonical Typescript implementation by using Postgres instead of SQLite, s3 compatible blob storage instead of on-disk, and mailgun for emailing. All to make the PDS easier to migrate between cloud hosting providers and more maintainable.
- `rsky-feedgen`: Bluesky feed generator that closely follows the use cases of the Blacksky community.
- `rsky-firehose`: Firehose consumer.
- `rsky-jetstream-subscriber`: Firehose consumer for Jetstream.
- `rsky-labeler`: Firehose consumer that labels content.
## About AT Protocol

View File

@ -1,6 +1,6 @@
[package]
name = "rsky-common"
version = "0.1.0"
version = "0.1.1"
authors = ["Rudy Fraser <him@rudyfraser.com>"]
description = "Shared code for rsky"
license = "Apache-2.0"
@ -13,6 +13,25 @@ documentation = "https://docs.rs/rsky-common"
[dependencies]
regex = "1.8.4"
serde = { version = "1.0.217", features = ["derive"] }
thiserror = "2.0.11"
serde_ipld_dagcbor = { workspace = true }
anyhow = "1.0.79"
chrono = "0.4.39"
rand = {workspace = true}
rand_core = { workspace = true }
url = "2.5.4"
serde_json = "1.0.138"
tracing = "0.1.41" # @TODO: Remove anyhow in lib
rsky-identity = {workspace = true}
base64ct = "1.6.0"
urlencoding = "2.1.3"
futures = "0.3.28"
libipld = {workspace = true}
indexmap = { version = "1.9.3",features = ["serde-1"] }
secp256k1 = {workspace = true}
sha2 = {workspace = true}
lexicon_cid = {workspace = true}
[dev-dependencies]
temp-env = { version = "0.3.6"}

View File

@ -1,5 +1,5 @@
use crate::common::time::SECOND;
use crate::common::wait;
use crate::time::SECOND;
use crate::wait;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use std::cmp;

View File

@ -1,4 +1,3 @@
use crate::common;
use anyhow::Result;
use lexicon_cid::Cid;
use libipld::cbor::DagCborCodec;
@ -8,7 +7,7 @@ use libipld::raw::RawCodec;
use serde::Serialize;
pub fn cid_for_cbor<T: Serialize>(data: &T) -> Result<Cid> {
let bytes = common::struct_to_cbor(data)?;
let bytes = crate::struct_to_cbor(data)?;
let cid = Cid::new_v1(
u64::from(DagCborCodec),
Code::Sha2_256.digest(bytes.as_slice()),

View File

@ -1,2 +1,177 @@
use anyhow::Result;
use base64ct::{Base64, Encoding};
use chrono::offset::Utc as UtcOffset;
use chrono::DateTime;
use rand::{distributions::Alphanumeric, Rng};
use rsky_identity::did::atproto_data::VerificationMaterial;
use rsky_identity::types::DidDocument;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::thread;
use std::time::{Duration, SystemTime};
use thiserror::Error;
use url::Url;
use urlencoding::encode;
pub const RFC3339_VARIANT: &str = "%Y-%m-%dT%H:%M:%S%.3fZ";
#[derive(Error, Debug)]
pub enum BadContentTypeError {
#[error("BadType: `{0}`")]
BadType(String),
#[error("Content-Type header is missing")]
MissingType,
}
#[derive(Debug)]
pub struct GetServiceEndpointOpts {
pub id: String,
pub r#type: Option<String>,
}
pub fn now() -> String {
let system_time = SystemTime::now();
let dt: DateTime<UtcOffset> = system_time.into();
format!("{}", dt.format(RFC3339_VARIANT))
}
pub fn wait(ms: u64) {
thread::sleep(Duration::from_millis(ms))
}
pub fn beginning_of_time() -> String {
let beginning_of_time = SystemTime::UNIX_EPOCH;
let dt: DateTime<UtcOffset> = beginning_of_time.into();
format!("{}", dt.format(RFC3339_VARIANT))
}
pub fn get_random_str() -> String {
let token: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect();
token
}
pub fn struct_to_cbor<T: Serialize>(obj: &T) -> Result<Vec<u8>> {
Ok(serde_ipld_dagcbor::to_vec(obj)?)
}
pub fn cbor_to_struct<T: DeserializeOwned>(bytes: Vec<u8>) -> Result<T> {
Ok(serde_ipld_dagcbor::from_slice::<T>(bytes.as_slice())?)
}
pub fn json_to_b64url<T: Serialize>(obj: &T) -> Result<String> {
Ok(Base64::encode_string((&serde_json::to_string(obj)?).as_ref()).replace("=", ""))
}
pub fn encode_uri_component(input: &String) -> String {
encode(input).to_string()
}
// based on did-doc.ts
pub fn get_did(doc: &DidDocument) -> String {
doc.id.clone()
}
pub fn get_handle(doc: &DidDocument) -> Option<String> {
match &doc.also_known_as {
None => None,
Some(aka) => {
let found = aka.into_iter().find(|name| name.starts_with("at://"));
match found {
None => None,
// strip off at:// prefix
Some(found) => Some(found[5..].to_string()),
}
}
}
}
pub fn get_verification_material(
doc: &DidDocument,
key_id: &String,
) -> Option<VerificationMaterial> {
let did = get_did(doc);
let keys = &doc.verification_method;
if let Some(keys) = keys {
let found = keys
.into_iter()
.find(|key| key.id == format!("#{key_id}") || key.id == format!("{did}#{key_id}"));
match found {
Some(found) if found.public_key_multibase.is_some() => {
let found = found.clone();
Some(VerificationMaterial {
r#type: found.r#type,
public_key_multibase: found.public_key_multibase.unwrap(),
})
}
_ => None,
}
} else {
None
}
}
pub fn get_notif_endpoint(doc: DidDocument) -> Option<String> {
get_service_endpoint(
doc,
GetServiceEndpointOpts {
id: "#bsky_notif".to_string(),
r#type: Some("BskyNotificationService".to_string()),
},
)
}
#[tracing::instrument(skip_all)]
pub fn get_service_endpoint(doc: DidDocument, opts: GetServiceEndpointOpts) -> Option<String> {
tracing::info!(
"@LOG: common::get_service_endpoint() doc: {:?}; opts: {:?}",
doc,
opts
);
let did = get_did(&doc);
match doc.service {
None => None,
Some(services) => {
let found = services.iter().find(|service| {
service.id == opts.id || service.id == format!("{}{}", did, opts.id)
});
match found {
None => None,
Some(found) => match opts.r#type {
None => None,
Some(opts_type) if found.r#type == opts_type => {
validate_url(&found.service_endpoint)
}
_ => None,
},
}
}
}
}
// Check protocol and hostname to prevent potential SSRF
pub fn validate_url(url_str: &String) -> Option<String> {
match Url::parse(url_str) {
Err(_) => None,
Ok(url) => {
return if !vec!["http", "https"].contains(&url.scheme()) {
None
} else if url.host().is_none() {
None
} else {
Some(url_str.clone())
}
}
}
}
pub mod r#async;
pub mod env;
pub mod explicit_slurs;
pub mod ipld;
pub mod sign;
pub mod tid;
pub mod time;

View File

@ -28,7 +28,7 @@ pub fn dedash(str: String) -> String {
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct TID(pub(crate) String);
pub struct TID(pub String);
impl TID {
pub fn new(str: String) -> Result<Self> {

View File

@ -1,4 +1,4 @@
use crate::common::RFC3339_VARIANT;
use crate::RFC3339_VARIANT;
use anyhow::Result;
use chrono::offset::Utc as UtcOffset;
use chrono::{DateTime, NaiveDateTime, Utc};

View File

@ -1,17 +1,18 @@
[package]
name = "rsky-crypto"
version = "0.1.1"
version = "0.1.2"
authors = ["Rudy Fraser <him@rudyfraser.com>"]
description = "Rust library providing basic cryptographic helpers as needed in atproto"
license = "Apache-2.0"
edition = "2021"
publish = false
publish = true
homepage = "https://blackskyweb.xyz"
repository = "https://github.com/blacksky-algorithms/rsky/tree/main/rsky-crypto"
documentation = "https://docs.rs/rsky-crypto"
[dependencies]
multibase = "0.9.1"
secp256k1 = { version = "0.28.2", features = ["global-context", "serde", "rand", "hashes","rand-std"] }
secp256k1 = { workspace = true }
anyhow = "1.0.79"
p256 = { version = "0.13.2", features = ["ecdsa","arithmetic","alloc"] }
unsigned-varint = "0.8.0"

View File

@ -1,8 +1,11 @@
use crate::constants::{BASE58_MULTIBASE_PREFIX, DID_KEY_PREFIX};
use anyhow::{bail, Result};
use multibase::decode;
use multibase::Base::Base58Btc;
use secp256k1::rand::rngs::OsRng;
use secp256k1::rand::RngCore;
use secp256k1::PublicKey;
use unsigned_varint::encode::u16 as encode_varint;
pub fn extract_multikey(did: &String) -> Result<String> {
if !did.starts_with(DID_KEY_PREFIX) {
@ -28,3 +31,26 @@ pub fn random_bytes(len: usize) -> Vec<u8> {
OsRng.fill_bytes(&mut buf);
buf
}
/// https://github.com/gnunicorn/rust-multicodec/blob/master/src/lib.rs#L249-L260
pub fn multicodec_wrap(bytes: Vec<u8>) -> Vec<u8> {
let mut buf = [0u8; 3];
encode_varint(0xe7, &mut buf);
let mut v: Vec<u8> = Vec::new();
for b in &buf {
v.push(*b);
// varint uses first bit to indicate another byte follows, stop if not the case
if *b <= 127 {
break;
}
}
v.extend(bytes);
v
}
pub fn encode_did_key(pubkey: &PublicKey) -> String {
let pk_compact = pubkey.serialize();
let pk_wrapped = multicodec_wrap(pk_compact.to_vec());
let pk_multibase = multibase::encode(Base58Btc, pk_wrapped.as_slice());
format!("{DID_KEY_PREFIX}{pk_multibase}")
}

View File

@ -1,6 +1,6 @@
[package]
name = "rsky-lexicon"
version = "0.2.3"
version = "0.2.4"
edition = "2021"
publish = true
description = "Bluesky API library"
@ -17,12 +17,13 @@ chrono = { version = "0.4.24", features = ["serde"] }
derive_builder = "0.12.0"
miette = "5.8.0"
parking_lot = "0.12.1"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
serde_cbor = "0.11.2"
serde = {workspace = true}
serde_json = {workspace = true}
serde_cbor = {workspace = true}
serde_derive = "^1.0"
serde_bytes = "0.11.9"
thiserror = "1.0.40"
secp256k1 = { version = "0.28.2", features = ["serde", "rand"] }
libipld = "0.16.0"
lexicon_cid = { package = "cid", version = "0.10.1", features = ["serde-codec"] }
secp256k1 = {workspace = true}
libipld = {workspace = true}
lexicon_cid = {workspace = true}
anyhow = "1.0.79" # @TODO: Remove anyhow in lib

View File

@ -5,5 +5,6 @@ extern crate serde;
extern crate serde_json;
pub mod app;
pub mod blob_refs;
pub mod chat;
pub mod com;

View File

@ -1,6 +1,6 @@
[package]
name = "rsky-pds"
version = "0.0.1"
version = "0.1.0"
authors = ["Rudy Fraser <him@rudyfraser.com>"]
description = "Rust reference implementation of an atproto PDS."
license = "Apache-2.0"
@ -11,11 +11,7 @@ repository = "https://github.com/blacksky-algorithms/rsky/tree/main/rsky-pds"
documentation = "https://docs.rs/rsky-pds"
[dependencies]
# for vendored iroh-car
integer-encoding = { version = "3", features = ["tokio_async"] }
tokio = { version = "1.28.2",features = ["full"] }
# for everything else
tokio = {workspace = true}
rocket = { version = "=0.5.1", features = ["json","tls"] }
dotenvy = "0.15"
rsky-lexicon = { workspace = true }
@ -23,31 +19,29 @@ rsky-identity = { workspace = true }
rsky-crypto = { workspace = true }
rsky-common = {workspace = true }
rsky-syntax = { workspace = true }
rsky-repo = { workspace = true }
diesel = { version = "=2.1.5", features = ["chrono", "postgres"] }
chrono = "0.4.26"
serde = { version = "1.0.160", features = ["derive"] }
serde = { workspace = true }
serde_repr = "0.1"
serde_derive = "^1.0"
rand = "0.8.5"
serde_derive = {workspace = true}
rand = {workspace = true}
email_address = "0.2.4"
anyhow = "1.0.79"
multibase = "0.9.1"
unsigned-varint = "0.8.0"
serde_cbor = "0.11.2"
serde_cbor = { workspace = true }
base64 = "0.22.0"
data-encoding = "2.5.0"
reqwest = { version = "0.12.3",features = ["json","blocking"] }
serde_json = { version = "1.0.96",features = ["preserve_order"] }
serde_ipld_dagcbor = { version = "0.6.1" ,features = ["codec"]}
serde_bytes = "0.11.15"
serde_json = {workspace = true}
serde_ipld_dagcbor = { workspace = true }
serde_bytes = { workspace = true }
base64-url = "2.0.2"
secp256k1 = { version = "0.28.2", features = ["global-context", "serde", "rand", "hashes"] }
rand_core = "0.6.4"
sha2 = "0.11.0-pre.3"
secp256k1 = {workspace = true}
rand_core = {workspace = true}
sha2 = {workspace = true}
indexmap = { version = "1.9.3",features = ["serde-1"] }
hex = "0.4.3"
libipld = "0.16.0"
ipld-cbor = { package = "libipld-cbor", version = "0.16.0" }
libipld = { workspace = true }
lazy_static = "1.4.0"
regex = "1.10.3"
thiserror = "1.0.40"
@ -61,7 +55,6 @@ mailgun-rs = "0.1.10"
mailchecker = "6.0.1"
image = "0.25.1"
infer = "0.15.0"
urlencoding = "2.1.3"
toml = "0.8.12"
ws = { package = "rocket_ws", version = "0.1.1" }
atrium-api = "0.24.6"
@ -72,13 +65,10 @@ url = "2.5.2"
async-event-emitter = "0.1.3"
event-emitter-rs = "0.1.4"
webpki-roots = { version = "0.26.0-alpha.1" }
lexicon_cid = { package = "cid", version = "0.10.1", features = ["serde-codec"] }
async-recursion = "1.1.1"
lexicon_cid = { workspace = true }
once_cell = "1.19.0"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
async-stream = "0.3.5"
[dependencies.rocket_sync_db_pools]
version = "=0.1.0"

View File

@ -1,5 +1,3 @@
use crate::common;
use crate::common::RFC3339_VARIANT;
use crate::db::establish_connection;
use crate::schema::pds::account::dsl as AccountSchema;
use crate::schema::pds::account::table as AccountTable;
@ -13,6 +11,8 @@ use diesel::helper_types::{Eq, IntoBoxed};
use diesel::pg::Pg;
use diesel::result::{DatabaseErrorKind, Error as DieselError};
use diesel::*;
use rsky_common;
use rsky_common::RFC3339_VARIANT;
use rsky_lexicon::com::atproto::admin::StatusAttr;
use std::ops::Add;
use std::time::SystemTime;
@ -227,7 +227,7 @@ pub fn register_actor(did: String, handle: String, deactivated: Option<bool>) ->
pub fn register_account(did: String, email: String, password: String) -> Result<()> {
let conn = &mut establish_connection()?;
let created_at = common::now();
let created_at = rsky_common::now();
// @TODO record recovery key for bring your own recovery key
let _: String = insert_into(AccountSchema::account)
@ -272,7 +272,7 @@ pub async fn update_account_takedown_status(did: &String, takedown: StatusAttr)
let takedown_ref: Option<String> = match takedown.applied {
true => match takedown.r#ref {
Some(takedown_ref) => Some(takedown_ref),
None => Some(common::now()),
None => Some(rsky_common::now()),
},
false => None,
};
@ -289,7 +289,7 @@ pub async fn deactivate_account(did: &String, delete_after: Option<String>) -> R
update(ActorSchema::actor)
.filter(ActorSchema::did.eq(did))
.set((
ActorSchema::deactivatedAt.eq(common::now()),
ActorSchema::deactivatedAt.eq(rsky_common::now()),
ActorSchema::deleteAfter.eq(delete_after),
))
.execute(conn)?;

View File

@ -1,11 +1,11 @@
use crate::auth_verifier::AuthScope;
use crate::common::time::{from_micros_to_utc, MINUTE};
use crate::common::{get_random_str, json_to_b64url, RFC3339_VARIANT};
use crate::db::establish_connection;
use crate::models;
use anyhow::Result;
use diesel::*;
use jwt_simple::prelude::*;
use rsky_common::time::{from_micros_to_utc, MINUTE};
use rsky_common::{get_random_str, json_to_b64url, RFC3339_VARIANT};
use secp256k1::{Keypair, Message, SecretKey};
use sha2::{Digest, Sha256};
use std::time::SystemTime;

View File

@ -1,17 +1,17 @@
use crate::apis::com::atproto::server::get_random_token;
use crate::common;
use crate::common::time::{from_str_to_utc, less_than_ago_s, MINUTE};
use crate::db::establish_connection;
use crate::models::models::EmailTokenPurpose;
use crate::models::EmailToken;
use anyhow::{bail, Result};
use diesel::*;
use rsky_common;
use rsky_common::time::{from_str_to_utc, less_than_ago_s, MINUTE};
pub async fn create_email_token(did: &String, purpose: EmailTokenPurpose) -> Result<String> {
use crate::schema::pds::email_token::dsl as EmailTokenSchema;
let conn = &mut establish_connection()?;
let token = get_random_token().to_uppercase();
let now = common::now();
let now = rsky_common::now();
insert_into(EmailTokenSchema::email_token)
.values((

View File

@ -1,9 +1,9 @@
use crate::account_manager::DisableInviteCodesOpts;
use crate::common;
use crate::db::establish_connection;
use crate::models::models;
use anyhow::{bail, Result};
use diesel::*;
use rsky_common;
use rsky_lexicon::com::atproto::server::AccountCodes;
use rsky_lexicon::com::atproto::server::{
InviteCode as LexiconInviteCode, InviteCodeUse as LexiconInviteCodeUse,
@ -67,7 +67,7 @@ pub async fn create_invite_codes(to_create: Vec<AccountCodes>, use_count: i32) -
use crate::schema::pds::invite_code::dsl as InviteCodeSchema;
let conn = &mut establish_connection()?;
let created_at = common::now();
let created_at = rsky_common::now();
let rows: Vec<models::InviteCode> = to_create
.into_iter()
@ -102,7 +102,7 @@ pub async fn create_account_invite_codes(
use crate::schema::pds::invite_code::dsl as InviteCodeSchema;
let conn = &mut establish_connection()?;
let now = common::now();
let now = rsky_common::now();
let rows: Vec<models::InviteCode> = codes
.into_iter()

View File

@ -1,4 +1,3 @@
use crate::common::{get_random_str, now};
use crate::db::establish_connection;
use crate::models;
use crate::models::AppPassword;
@ -9,6 +8,7 @@ use argon2::{
};
use base64ct::{Base64, Encoding};
use diesel::*;
use rsky_common::{get_random_str, now};
use rsky_lexicon::com::atproto::server::CreateAppPasswordOutput;
use sha2::{Digest, Sha256};

View File

@ -1,15 +1,15 @@
use crate::common;
use crate::db::establish_connection;
use anyhow::Result;
use diesel::*;
use libipld::Cid;
use rsky_common;
pub fn update_root(did: String, cid: Cid, rev: String) -> Result<()> {
// @TODO balance risk of a race in the case of a long retry
use crate::schema::pds::repo_root::dsl as RepoRootSchema;
let conn = &mut establish_connection()?;
let now = common::now();
let now = rsky_common::now();
insert_into(RepoRootSchema::repo_root)
.values((

View File

@ -8,9 +8,6 @@ use crate::account_manager::helpers::invite::{CodeDetail, CodeUse};
use crate::account_manager::helpers::password::UpdateUserPasswordOpts;
use crate::account_manager::helpers::repo;
use crate::auth_verifier::AuthScope;
use crate::common;
use crate::common::time::{from_micros_to_str, from_str_to_micros, HOUR};
use crate::common::RFC3339_VARIANT;
use crate::models::models::EmailTokenPurpose;
use anyhow::Result;
use chrono::offset::Utc as UtcOffset;
@ -18,6 +15,9 @@ use chrono::DateTime;
use futures::try_join;
use helpers::{account, auth, email_token, invite, password};
use libipld::Cid;
use rsky_common;
use rsky_common::time::{from_micros_to_str, from_str_to_micros, HOUR};
use rsky_common::RFC3339_VARIANT;
use rsky_lexicon::com::atproto::admin::StatusAttr;
use rsky_lexicon::com::atproto::server::{AccountCodes, CreateAppPasswordOutput};
use secp256k1::{Keypair, Secp256k1, SecretKey};
@ -136,7 +136,7 @@ impl AccountManager {
expires_in: None,
})?;
let refresh_payload = auth::decode_refresh_token(refresh_jwt.clone(), jwt_key)?;
let now = common::now();
let now = rsky_common::now();
if let Some(invite_code) = invite_code.clone() {
invite::ensure_invite_is_available(invite_code).await?;
@ -415,7 +415,7 @@ impl AccountManager {
pub async fn confirm_email<'em>(opts: ConfirmEmailOpts<'em>) -> Result<()> {
let ConfirmEmailOpts { did, token } = opts;
email_token::assert_valid_token(did, EmailTokenPurpose::ConfirmEmail, token, None).await?;
let now = common::now();
let now = rsky_common::now();
try_join!(
email_token::delete_email_token(did, EmailTokenPurpose::ConfirmEmail),
account::set_email_confirmed_at(did, now)

View File

@ -1,7 +1,5 @@
use std::str::FromStr;
// based on https://github.com/bluesky-social/atproto/blob/main/packages/aws/src/s3.ts
use crate::common::env::env_str;
use crate::common::get_random_str;
use anyhow::Result;
use aws_config::SdkConfig;
use aws_sdk_s3 as s3;
@ -9,6 +7,8 @@ use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{Delete, ObjectCannedAcl, ObjectIdentifier};
use lexicon_cid::Cid;
use rsky_common::env::env_str;
use rsky_common::get_random_str;
struct MoveObject {
from: String,

View File

@ -1,12 +1,7 @@
use crate::common::ipld::sha256_raw_to_cid;
use crate::common::now;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::db::establish_connection;
use crate::image;
use crate::models::models;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::blob_refs::BlobRef;
use crate::repo::error::BlobError;
use crate::repo::types::{PreparedBlobRef, PreparedWrite};
use crate::{common, image};
use anyhow::{bail, Result};
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::primitives::ByteStream;
@ -18,8 +13,13 @@ use futures::try_join;
use lexicon_cid::Cid;
use rocket::data::{Data, ToByteUnit};
use rocket::form::validate::Contains;
use rsky_common::ipld::sha256_raw_to_cid;
use rsky_common::now;
use rsky_lexicon::blob_refs::BlobRef;
use rsky_lexicon::com::atproto::admin::StatusAttr;
use rsky_lexicon::com::atproto::repo::ListMissingBlobsRefRecordBlob;
use rsky_repo::error::BlobError;
use rsky_repo::types::{PreparedBlobRef, PreparedWrite};
use sha2::{Digest, Sha256};
pub struct BlobMetadata {
@ -510,7 +510,7 @@ impl BlobReader {
let takedown_ref: Option<String> = match takedown.applied {
true => match takedown.r#ref {
Some(takedown_ref) => Some(takedown_ref),
None => Some(common::now()),
None => Some(now()),
},
false => None,
};

View File

@ -0,0 +1,330 @@
// based on https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts
// also adds components from https://github.com/bluesky-social/atproto/blob/main/packages/pds/src/actor-store/repo/transactor.ts
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::blob::BlobReader;
use crate::actor_store::preference::PreferenceReader;
use crate::actor_store::record::RecordReader;
use crate::actor_store::repo::sql_repo::SqlRepoReader;
use crate::db::DbConn;
use anyhow::{bail, Result};
use diesel::*;
use futures::stream::{self, StreamExt};
use lexicon_cid::Cid;
use rsky_common;
use rsky_repo::repo::Repo;
use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
use rsky_repo::storage::types::RepoStorage;
use rsky_repo::types::{
write_to_op, CommitData, PreparedCreateOrUpdate, PreparedWrite, RecordCreateOrUpdateOp,
RecordWriteEnum, RecordWriteOp, WriteOpAction,
};
use rsky_syntax::aturi::AtUri;
use secp256k1::{Keypair, Secp256k1, SecretKey};
use std::env;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct ActorStore {
pub did: String,
pub storage: Arc<RwLock<SqlRepoReader>>, // get ipld blocks from db
pub record: RecordReader, // get lexicon records from db
pub blob: BlobReader, // get blobs
pub pref: PreferenceReader, // get preferences
}
// Combination of RepoReader/Transactor, BlobReader/Transactor, SqlRepoReader/Transactor
impl ActorStore {
/// Concrete reader of an individual repo (hence S3BlobStore which takes `did` param)
pub fn new(did: String, blobstore: S3BlobStore, conn: DbConn) -> Self {
ActorStore {
storage: Arc::new(RwLock::new(SqlRepoReader::new(did.clone(), None, conn))),
record: RecordReader::new(did.clone()),
pref: PreferenceReader::new(did.clone()),
did,
blob: BlobReader::new(blobstore), // Unlike TS impl, just use blob reader vs generator
}
}
// Transactors
// -------------------
pub async fn create_repo(
&self,
keypair: Keypair,
writes: Vec<PreparedCreateOrUpdate>,
) -> Result<CommitData> {
let write_ops = writes
.clone()
.into_iter()
.map(|prepare| {
let at_uri: AtUri = prepare.uri.try_into()?;
Ok(RecordCreateOrUpdateOp {
action: WriteOpAction::Create,
collection: at_uri.get_collection(),
rkey: at_uri.get_rkey(),
record: prepare.record,
})
})
.collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?;
let commit = Repo::format_init_commit(
self.storage.clone(),
self.did.clone(),
keypair,
Some(write_ops),
)
.await?;
let storage_guard = self.storage.read().await;
storage_guard.apply_commit(commit.clone(), None).await?;
let writes = writes
.into_iter()
.map(|w| PreparedWrite::Create(w))
.collect::<Vec<PreparedWrite>>();
self.blob.process_write_blobs(writes).await?;
Ok(commit)
}
pub async fn process_writes(
&mut self,
writes: Vec<PreparedWrite>,
swap_commit_cid: Option<Cid>,
) -> Result<CommitData> {
let commit = self.format_commit(writes.clone(), swap_commit_cid).await?;
{
let immutable_borrow = &self;
// & send to indexing
immutable_borrow
.index_writes(writes.clone(), &commit.rev)
.await?;
}
// persist the commit to repo storage
let storage_guard = self.storage.read().await;
storage_guard.apply_commit(commit.clone(), None).await?;
// process blobs
self.blob.process_write_blobs(writes).await?;
Ok(commit)
}
pub async fn format_commit(
&mut self,
writes: Vec<PreparedWrite>,
swap_commit: Option<Cid>,
) -> Result<CommitData> {
let current_root = {
let storage_guard = self.storage.read().await;
storage_guard.get_root_detailed().await
};
if let Ok(current_root) = current_root {
if let Some(swap_commit) = swap_commit {
if !current_root.cid.eq(&swap_commit) {
bail!("BadCommitSwapError: {0}", current_root.cid)
}
}
{
let mut storage_guard = self.storage.write().await;
storage_guard.cache_rev(current_root.rev).await?;
}
let mut new_record_cids: Vec<Cid> = vec![];
let mut delete_and_update_uris = vec![];
for write in &writes {
match write.clone() {
PreparedWrite::Create(c) => new_record_cids.push(c.cid),
PreparedWrite::Update(u) => {
new_record_cids.push(u.cid);
let u_at_uri: AtUri = u.uri.try_into()?;
delete_and_update_uris.push(u_at_uri);
}
PreparedWrite::Delete(d) => {
let d_at_uri: AtUri = d.uri.try_into()?;
delete_and_update_uris.push(d_at_uri)
}
}
if write.swap_cid().is_none() {
continue;
}
let write_at_uri: &AtUri = &write.uri().try_into()?;
let record = self
.record
.get_record(write_at_uri, None, Some(true))
.await?;
let current_record = match record {
Some(record) => Some(Cid::from_str(&record.cid)?),
None => None,
};
match write {
// There should be no current record for a create
PreparedWrite::Create(_) if write.swap_cid().is_some() => {
bail!("BadRecordSwapError: `{0:?}`", current_record)
}
// There should be a current record for an update
PreparedWrite::Update(_) if write.swap_cid().is_none() => {
bail!("BadRecordSwapError: `{0:?}`", current_record)
}
// There should be a current record for a delete
PreparedWrite::Delete(_) if write.swap_cid().is_none() => {
bail!("BadRecordSwapError: `{0:?}`", current_record)
}
_ => Ok::<(), anyhow::Error>(()),
}?;
match (current_record, write.swap_cid()) {
(Some(current_record), Some(swap_cid)) if current_record.eq(swap_cid) => {
Ok::<(), anyhow::Error>(())
}
_ => bail!(
"BadRecordSwapError: current record is `{0:?}`",
current_record
),
}?;
}
let mut repo = Repo::load(self.storage.clone(), Some(current_root.cid)).await?;
let write_ops: Vec<RecordWriteOp> = writes
.into_iter()
.map(write_to_op)
.collect::<Result<Vec<RecordWriteOp>>>()?;
// @TODO: Use repo signing key global config
let secp = Secp256k1::new();
let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX").unwrap();
let repo_secret_key =
SecretKey::from_slice(&hex::decode(repo_private_key.as_bytes()).unwrap()).unwrap();
let repo_signing_key = Keypair::from_secret_key(&secp, &repo_secret_key);
let mut commit = repo
.format_commit(RecordWriteEnum::List(write_ops), repo_signing_key)
.await?;
// find blocks that would be deleted but are referenced by another record
let duplicate_record_cids = self
.get_duplicate_record_cids(commit.removed_cids.to_list(), delete_and_update_uris)
.await?;
for cid in duplicate_record_cids {
commit.removed_cids.delete(cid)
}
// find blocks that are relevant to ops but not included in diff
// (for instance a record that was moved but cid stayed the same)
let new_record_blocks = commit.relevant_blocks.get_many(new_record_cids)?;
if new_record_blocks.missing.len() > 0 {
let missing_blocks = {
let storage_guard = self.storage.read().await;
storage_guard.get_blocks(new_record_blocks.missing).await?
};
commit.relevant_blocks.add_map(missing_blocks.blocks)?;
}
Ok(commit)
} else {
bail!("No repo root found for `{0}`", self.did)
}
}
pub async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &String) -> Result<()> {
let now: &str = &rsky_common::now();
let _ = stream::iter(writes)
.then(|write| async move {
Ok::<(), anyhow::Error>(match write {
PreparedWrite::Create(write) => {
let write_at_uri: AtUri = write.uri.try_into()?;
self.record
.index_record(
write_at_uri.clone(),
write.cid,
Some(write.record),
Some(write.action),
rev.clone(),
Some(now.to_string()),
)
.await?
}
PreparedWrite::Update(write) => {
let write_at_uri: AtUri = write.uri.try_into()?;
self.record
.index_record(
write_at_uri.clone(),
write.cid,
Some(write.record),
Some(write.action),
rev.clone(),
Some(now.to_string()),
)
.await?
}
PreparedWrite::Delete(write) => {
let write_at_uri: AtUri = write.uri.try_into()?;
self.record.delete_record(&write_at_uri).await?
}
})
})
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
Ok(())
}
pub async fn destroy(&mut self) -> Result<()> {
let did: String = self.did.clone();
let storage_guard = self.storage.read().await;
let db: Arc<DbConn> = storage_guard.db.clone();
use crate::schema::pds::blob::dsl as BlobSchema;
let blob_rows: Vec<String> = db
.run(move |conn| {
BlobSchema::blob
.filter(BlobSchema::did.eq(did))
.select(BlobSchema::cid)
.get_results(conn)
})
.await?;
let cids = blob_rows
.into_iter()
.map(|row| Ok(Cid::from_str(&row)?))
.collect::<Result<Vec<Cid>>>()?;
let _ = stream::iter(cids.chunks(500))
.then(|chunk| async {
Ok::<(), anyhow::Error>(self.blob.blobstore.delete_many(chunk.to_vec()).await?)
})
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
Ok(())
}
pub async fn get_duplicate_record_cids(
&self,
cids: Vec<Cid>,
touched_uris: Vec<AtUri>,
) -> Result<Vec<Cid>> {
if touched_uris.len() == 0 || cids.len() == 0 {
return Ok(vec![]);
}
let did: String = self.did.clone();
let storage_guard = self.storage.read().await;
let db: Arc<DbConn> = storage_guard.db.clone();
use crate::schema::pds::record::dsl as RecordSchema;
let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect();
let res: Vec<String> = db
.run(move |conn| {
RecordSchema::record
.filter(RecordSchema::did.eq(did))
.filter(RecordSchema::cid.eq_any(cid_strs))
.filter(RecordSchema::uri.ne_all(touched_uri_strs))
.select(RecordSchema::cid)
.get_results(conn)
})
.await?;
Ok(res
.into_iter()
.map(|row| Cid::from_str(&row).map_err(|error| anyhow::Error::new(error)))
.collect::<Result<Vec<Cid>>>()?)
}
}
pub mod aws;
pub mod blob;
pub mod preference;
pub mod record;
pub mod repo;

View File

@ -1,8 +1,8 @@
use crate::actor_store::preference::util::pref_in_scope;
use crate::auth_verifier::AuthScope;
use crate::db::establish_connection;
use crate::models;
use crate::models::AccountPref;
use crate::repo::preference::util::pref_in_scope;
use anyhow::{bail, Result};
use diesel::*;
use rsky_lexicon::app::bsky::actor::RefPreferences;

View File

@ -1,14 +1,14 @@
use crate::common;
use crate::db::establish_connection;
use crate::models::{models, Backlink, Record};
use crate::repo::types::{Ids, Lex, RepoRecord, WriteOpAction};
use crate::repo::util::cbor_to_lex_record;
use crate::storage::Ipld;
use anyhow::{bail, Result};
use diesel::*;
use futures::stream::{self, StreamExt};
use lexicon_cid::Cid;
use rsky_common;
use rsky_lexicon::com::atproto::admin::StatusAttr;
use rsky_repo::storage::Ipld;
use rsky_repo::types::{Ids, Lex, RepoRecord, WriteOpAction};
use rsky_repo::util::cbor_to_lex_record;
use rsky_syntax::aturi::AtUri;
use rsky_syntax::aturi_validation::ensure_valid_at_uri;
use rsky_syntax::did::ensure_valid_did;
@ -36,7 +36,7 @@ pub struct RecordsForCollection {
// @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs.
// For now, we just want to ensure we're tracking links from follows, blocks, likes, and reposts.
pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result<Vec<models::Backlink>> {
pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result<Vec<Backlink>> {
if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(record_type)))) = record.get("$type") {
if record_type == Ids::AppBskyGraphFollow.as_str()
|| record_type == Ids::AppBskyGraphBlock.as_str()
@ -44,7 +44,7 @@ pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result<Vec<models::Bac
if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(subject)))) = record.get("subject") {
match ensure_valid_did(&uri) {
Ok(_) => {
return Ok(vec![models::Backlink {
return Ok(vec![Backlink {
uri: uri.to_string(),
path: "subject".to_owned(),
link_to: subject.clone(),
@ -62,7 +62,7 @@ pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result<Vec<models::Bac
{
match ensure_valid_at_uri(&uri) {
Ok(_) => {
return Ok(vec![models::Backlink {
return Ok(vec![Backlink {
uri: uri.to_string(),
path: "subject.uri".to_owned(),
link_to: subject_uri.clone(),
@ -286,7 +286,7 @@ impl RecordReader {
collection: String,
path: String,
link_to: String,
) -> Result<Vec<models::Record>> {
) -> Result<Vec<Record>> {
use crate::schema::pds::backlink::dsl as BacklinkSchema;
use crate::schema::pds::record::dsl as RecordSchema;
let conn = &mut establish_connection()?;
@ -353,7 +353,7 @@ impl RecordReader {
let rkey = uri.get_rkey();
let hostname = uri.get_hostname().to_string();
let action = action.unwrap_or(WriteOpAction::Create);
let indexed_at = timestamp.unwrap_or_else(|| common::now());
let indexed_at = timestamp.unwrap_or_else(|| rsky_common::now());
let row = Record {
did: self.did.clone(),
uri: uri.to_string(),
@ -452,7 +452,7 @@ impl RecordReader {
let takedown_ref: Option<String> = match takedown.applied {
true => match takedown.r#ref {
Some(takedown_ref) => Some(takedown_ref),
None => Some(common::now()),
None => Some(rsky_common::now()),
},
false => None,
};

View File

@ -0,0 +1 @@
pub mod sql_repo;

View File

@ -1,14 +1,6 @@
use crate::car::blocks_to_car_file;
use crate::db::DbConn;
use crate::models;
use crate::models::RepoBlock;
use crate::repo::block_map::{BlockMap, BlocksAndMissing};
use crate::repo::cid_set::CidSet;
use crate::repo::types::CommitData;
use crate::storage::readable_blockstore::ReadableBlockstore;
use crate::storage::types::RepoStorage;
use crate::storage::CidAndRev;
use crate::storage::RepoRootError::RepoRootNotFoundError;
use crate::{common, models};
use anyhow::Result;
use diesel::dsl::sql;
use diesel::prelude::*;
@ -16,6 +8,15 @@ use diesel::sql_types::{Bool, Text};
use diesel::*;
use futures::{stream, StreamExt, TryStreamExt};
use lexicon_cid::Cid;
use rsky_common;
use rsky_repo::block_map::{BlockMap, BlocksAndMissing};
use rsky_repo::car::blocks_to_car_file;
use rsky_repo::cid_set::CidSet;
use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
use rsky_repo::storage::types::RepoStorage;
use rsky_repo::storage::CidAndRev;
use rsky_repo::storage::RepoRootError::RepoRootNotFoundError;
use rsky_repo::types::CommitData;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
@ -322,7 +323,7 @@ impl RepoStorage for SqlRepoReader {
// Basically handles getting ipld blocks from db
impl SqlRepoReader {
pub fn new(did: String, now: Option<String>, db: DbConn) -> Self {
let now = now.unwrap_or_else(|| common::now());
let now = now.unwrap_or_else(|| rsky_common::now());
SqlRepoReader {
cache: Arc::new(RwLock::new(BlockMap::new())),
root: None,

View File

@ -1,8 +1,8 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandard;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -1,8 +1,8 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandard;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -3,7 +3,6 @@ use crate::auth_verifier::{AccessOutput, AccessStandard};
use crate::config::ServerConfig;
use crate::pipethrough::{pipethrough, OverrideOpts, ProxyRequest};
use crate::read_after_write::util::ReadAfterWriteResponse;
use crate::repo::types::Ids;
use crate::xrpc_server::types::{HandlerPipeThrough, InvalidRequestError};
use crate::{SharedATPAgent, SharedIdResolver};
use anyhow::{anyhow, Result};
@ -16,6 +15,7 @@ use rocket::http::Status;
use rocket::request::{FromRequest, Outcome, Request};
use rocket::State;
use rsky_lexicon::app::bsky::feed::AuthorFeed;
use rsky_repo::types::Ids;
use std::collections::BTreeMap;
#[derive(Debug, Clone, PartialEq, Serialize)]

View File

@ -1,3 +1,5 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandard;
use crate::config::ServerConfig;
@ -9,9 +11,6 @@ use crate::read_after_write::util::{
ReadAfterWriteResponse,
};
use crate::read_after_write::viewer::LocalViewer;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::Ids;
use crate::repo::ActorStore;
use crate::xrpc_server::types::{HandlerPipeThrough, InvalidRequestError, XRPCError};
use crate::{SharedLocalViewer, APP_USER_AGENT};
use anyhow::{anyhow, Result};
@ -28,6 +27,7 @@ use reqwest::header::HeaderMap;
use rocket::State;
use rsky_lexicon::app::bsky::feed::Post;
use rsky_lexicon::app::bsky::feed::{GetPostThreadOutput, ThreadViewPost, ThreadViewPostEnum};
use rsky_repo::types::Ids;
use rsky_syntax::aturi::AtUri;
use std::collections::BTreeMap;
use std::future::Future;

View File

@ -1,9 +1,7 @@
use crate::apis::app::bsky::util::get_did_doc;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandardSignupQueued;
use crate::common::get_notif_endpoint;
use crate::config::ServerConfig;
use crate::repo::types::Ids;
use crate::{context, SharedIdResolver, APP_USER_AGENT};
use anyhow::{anyhow, bail, Result};
use atrium_api::app::bsky::notification::register_push::{
@ -15,7 +13,9 @@ use atrium_ipld::ipld::Ipld as AtriumIpld;
use atrium_xrpc_client::reqwest::ReqwestClientBuilder;
use rocket::serde::json::Json;
use rocket::State;
use rsky_common::get_notif_endpoint;
use rsky_lexicon::app::bsky::notification::RegisterPushInput;
use rsky_repo::types::Ids;
pub async fn inner_register_push(
body: Json<RegisterPushInput>,

View File

@ -1,10 +1,10 @@
use crate::account_manager::helpers::account::AccountStatus;
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AdminToken;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use crate::{sequencer, SharedSequencer};
use anyhow::Result;
use aws_config::SdkConfig;

View File

@ -2,10 +2,10 @@ use crate::account_manager::helpers::account::AvailabilityFlags;
use crate::account_manager::AccountManager;
use crate::apis::ApiError;
use crate::auth_verifier::Moderator;
use crate::common::env::env_str;
use anyhow::{bail, Result};
use futures::try_join;
use rocket::serde::json::Json;
use rsky_common::env::env_str;
use rsky_lexicon::com::atproto::admin::AccountView;
use rsky_syntax::handle::INVALID_HANDLE;

View File

@ -1,8 +1,6 @@
use crate::account_manager::helpers::invite::{get_invite_codes_uses, CodeDetail};
use crate::apis::ApiError;
use crate::auth_verifier::Moderator;
use crate::common::time::{from_millis_to_utc, from_str_to_millis};
use crate::common::RFC3339_VARIANT;
use crate::db::establish_connection;
use crate::models;
use anyhow::{anyhow, bail, Result};
@ -11,6 +9,8 @@ use diesel::prelude::*;
use diesel::sql_types::{Bool, Text};
use diesel::QueryDsl;
use rocket::serde::json::Json;
use rsky_common::time::{from_millis_to_utc, from_str_to_millis};
use rsky_common::RFC3339_VARIANT;
use rsky_lexicon::com::atproto::admin::GetInviteCodesOutput;
use std::mem;

View File

@ -1,9 +1,9 @@
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::Moderator;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use futures::try_join;

View File

@ -3,13 +3,13 @@ use crate::account_manager::AccountManager;
use crate::apis::com::atproto::server::get_keys_from_private_key_str;
use crate::apis::ApiError;
use crate::auth_verifier::AdminToken;
use crate::common::env::env_str;
use crate::config::ServerConfig;
use crate::handle::{normalize_and_validate_handle, HandleValidationContext, HandleValidationOpts};
use crate::{plc, SharedIdResolver, SharedSequencer};
use anyhow::{bail, Result};
use rocket::serde::json::Json;
use rocket::State;
use rsky_common::env::env_str;
use rsky_lexicon::com::atproto::admin::UpdateAccountHandleInput;
use std::env;

View File

@ -1,9 +1,9 @@
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::Moderator;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use crate::SharedSequencer;
use anyhow::Result;
use aws_config::SdkConfig;

View File

@ -1,11 +1,11 @@
use crate::account_manager::helpers::account::ActorAccount;
use crate::account_manager::AccountManager;
use crate::apis::ApiError;
use crate::common::env::{env_list, env_str};
use crate::{SharedIdResolver, APP_USER_AGENT};
use anyhow::{bail, Result};
use rocket::serde::json::Json;
use rocket::State;
use rsky_common::env::{env_list, env_str};
use rsky_lexicon::com::atproto::identity::ResolveHandleOutput;
async fn try_resolve_from_app_view(handle: &String) -> Result<Option<String>> {

View File

@ -3,13 +3,13 @@ use crate::account_manager::AccountManager;
use crate::apis::com::atproto::server::get_keys_from_private_key_str;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandardCheckTakedown;
use crate::common::env::env_str;
use crate::config::ServerConfig;
use crate::handle::{normalize_and_validate_handle, HandleValidationContext, HandleValidationOpts};
use crate::{plc, SharedIdResolver, SharedSequencer};
use anyhow::{bail, Result};
use rocket::serde::json::Json;
use rocket::State;
use rsky_common::env::env_str;
use rsky_lexicon::com::atproto::identity::UpdateHandleInput;
use std::env;

View File

@ -1,13 +1,13 @@
use crate::account_manager::helpers::account::AvailabilityFlags;
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandardIncludeChecks;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::PreparedWrite;
use crate::repo::{
prepare_create, prepare_delete, prepare_update, ActorStore, PrepareCreateOpts,
PrepareDeleteOpts, PrepareUpdateOpts,
use crate::repo::prepare::{
prepare_create, prepare_delete, prepare_update, PrepareCreateOpts, PrepareDeleteOpts,
PrepareUpdateOpts,
};
use crate::SharedSequencer;
use anyhow::{bail, Result};
@ -17,6 +17,7 @@ use libipld::Cid;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::{ApplyWritesInput, ApplyWritesInputRefWrite};
use rsky_repo::types::PreparedWrite;
use std::str::FromStr;
async fn inner_apply_writes(

View File

@ -1,13 +1,11 @@
use crate::account_manager::helpers::account::AvailabilityFlags;
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandardIncludeChecks;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::{PreparedDelete, PreparedWrite};
use crate::repo::{
prepare_create, prepare_delete, ActorStore, PrepareCreateOpts, PrepareDeleteOpts,
};
use crate::repo::prepare::{prepare_create, prepare_delete, PrepareCreateOpts, PrepareDeleteOpts};
use crate::SharedSequencer;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
@ -15,6 +13,7 @@ use libipld::Cid;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::{CreateRecordInput, CreateRecordOutput};
use rsky_repo::types::{PreparedDelete, PreparedWrite};
use rsky_syntax::aturi::AtUri;
use std::str::FromStr;

View File

@ -1,11 +1,11 @@
use crate::account_manager::helpers::account::AvailabilityFlags;
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandardIncludeChecks;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::PreparedWrite;
use crate::repo::{prepare_delete, ActorStore, PrepareDeleteOpts};
use crate::repo::prepare::{prepare_delete, PrepareDeleteOpts};
use crate::SharedSequencer;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
@ -13,6 +13,7 @@ use libipld::Cid;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::DeleteRecordInput;
use rsky_repo::types::PreparedWrite;
use rsky_syntax::aturi::AtUri;
use std::str::FromStr;

View File

@ -1,9 +1,9 @@
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use crate::{common, SharedIdResolver};
use crate::SharedIdResolver;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use rocket::serde::json::Json;
@ -28,7 +28,7 @@ async fn inner_describe_repo(
Err(err) => bail!("Could not resolve DID: `{err}`"),
Ok(res) => res,
};
let handle = common::get_handle(&did_doc);
let handle = rsky_common::get_handle(&did_doc);
let handle_is_correct = handle == account.handle;
let mut actor_store = ActorStore::new(

View File

@ -1,9 +1,9 @@
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::db::DbConn;
use crate::pipethrough::{pipethrough, OverrideOpts, ProxyRequest};
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -1,9 +1,9 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::blob::ListMissingBlobsOpts;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessFull;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::blob::ListMissingBlobsOpts;
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -1,8 +1,8 @@
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -1,13 +1,11 @@
use crate::account_manager::helpers::account::AvailabilityFlags;
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandardIncludeChecks;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::{CommitData, PreparedWrite};
use crate::repo::{
prepare_create, prepare_update, ActorStore, PrepareCreateOpts, PrepareUpdateOpts,
};
use crate::repo::prepare::{prepare_create, prepare_update, PrepareCreateOpts, PrepareUpdateOpts};
use crate::SharedSequencer;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
@ -15,6 +13,7 @@ use libipld::Cid;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::{PutRecordInput, PutRecordOutput};
use rsky_repo::types::{CommitData, PreparedWrite};
use rsky_syntax::aturi::AtUri;
use std::str::FromStr;

View File

@ -1,16 +1,41 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessStandardIncludeChecks;
use crate::common::ContentType;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::{BlobConstraint, PreparedBlobRef};
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use rocket::data::Data;
use rocket::http::Status;
use rocket::request::{FromRequest, Outcome};
use rocket::serde::json::Json;
use rocket::State;
use rocket::{Request, State};
use rsky_common::BadContentTypeError;
use rsky_lexicon::com::atproto::repo::{Blob, BlobOutput};
use rsky_repo::types::{BlobConstraint, PreparedBlobRef};
#[derive(Clone)]
pub struct ContentType {
pub name: String,
}
/// Used mainly as a way to parse out content-type from request
#[rocket::async_trait]
impl<'r> FromRequest<'r> for ContentType {
type Error = BadContentTypeError;
async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
match req.content_type() {
None => Outcome::Error((
Status::UnsupportedMediaType,
BadContentTypeError::MissingType,
)),
Some(content_type) => Outcome::Success(ContentType {
name: content_type.to_string(),
}),
}
}
}
async fn inner_upload_blob(
auth: AccessStandardIncludeChecks,

View File

@ -1,17 +1,17 @@
use crate::account_manager::helpers::account::AvailabilityFlags;
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::server::assert_valid_did_documents_for_service;
use crate::apis::ApiError;
use crate::auth_verifier::AccessFull;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::cid_set::CidSet;
use crate::repo::types::CommitData;
use crate::repo::ActorStore;
use crate::storage::readable_blockstore::ReadableBlockstore;
use crate::SharedSequencer;
use aws_config::SdkConfig;
use rocket::State;
use rsky_repo::cid_set::CidSet;
use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
use rsky_repo::types::CommitData;
use rsky_syntax::handle::INVALID_HANDLE;
#[tracing::instrument(skip_all)]

View File

@ -1,10 +1,10 @@
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::server::is_valid_did_doc_for_service;
use crate::apis::ApiError;
use crate::auth_verifier::AccessFull;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use futures::try_join;

View File

@ -1,24 +1,23 @@
use crate::account_manager::helpers::account::AccountStatus;
use crate::account_manager::{AccountManager, CreateAccountOpts};
use crate::apis::com::atproto::server::{
encode_did_key, get_keys_from_private_key_str, safe_resolve_did_doc,
};
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::server::safe_resolve_did_doc;
use crate::apis::ApiError;
use crate::auth_verifier::UserDidAuthOptional;
use crate::common::env::env_str;
use crate::config::ServerConfig;
use crate::db::DbConn;
use crate::handle::{normalize_and_validate_handle, HandleValidationContext, HandleValidationOpts};
use crate::plc::operations::{create_op, CreateAtprotoOpInput};
use crate::plc::types::{CompatibleOpOrTombstone, OpOrTombstone, Operation};
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use crate::plc::types::{OpOrTombstone, Operation};
use crate::SharedSequencer;
use crate::{plc, SharedIdResolver};
use aws_config::SdkConfig;
use email_address::*;
use rocket::serde::json::Json;
use rocket::State;
use rsky_common::env::env_str;
use rsky_crypto::utils::encode_did_key;
use rsky_lexicon::com::atproto::server::{CreateAccountInput, CreateAccountOutput};
use secp256k1::{Keypair, Secp256k1, SecretKey};
use std::env;

View File

@ -1,11 +1,11 @@
use crate::account_manager::helpers::account::{AccountStatus, AvailabilityFlags};
use crate::account_manager::AccountManager;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AdminToken;
use crate::db::DbConn;
use crate::models::models::EmailTokenPurpose;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use crate::sequencer;
use crate::SharedSequencer;
use aws_config::SdkConfig;

View File

@ -1,6 +1,6 @@
use crate::apis::ApiError;
use crate::common::env::{env_bool, env_list, env_str};
use rocket::serde::json::Json;
use rsky_common::env::{env_bool, env_list, env_str};
use rsky_lexicon::com::atproto::server::{
DescribeServerOutput, DescribeServerRefContact, DescribeServerRefLinks,
};

View File

@ -3,11 +3,11 @@ use crate::account_manager::AccountManager;
use crate::apis::com::atproto::server::gen_invite_codes;
use crate::apis::ApiError;
use crate::auth_verifier::AccessFull;
use crate::common::env::{env_bool, env_int};
use crate::common::RFC3339_VARIANT;
use anyhow::{bail, Result};
use chrono::NaiveDateTime;
use rocket::serde::json::Json;
use rsky_common::env::{env_bool, env_int};
use rsky_common::RFC3339_VARIANT;
use rsky_lexicon::com::atproto::server::GetAccountInviteCodesOutput;
use std::time::SystemTime;

View File

@ -1,12 +1,12 @@
use crate::account_manager::helpers::auth::{create_service_jwt, ServiceJwtParams};
use crate::apis::ApiError;
use crate::auth_verifier::AccessFull;
use crate::common::time::{from_micros_to_utc, HOUR, MINUTE};
use crate::pipethrough::{PRIVILEGED_METHODS, PROTECTED_METHODS};
use anyhow::{bail, Result};
use chrono::offset::Utc as UtcOffset;
use chrono::DateTime;
use rocket::serde::json::Json;
use rsky_common::time::{from_micros_to_utc, HOUR, MINUTE};
use rsky_lexicon::com::atproto::server::GetServiceAuthOutput;
use secp256k1::SecretKey;
use std::env;

View File

@ -1,20 +1,13 @@
extern crate unsigned_varint;
use crate::common::env::{env_int, env_str};
use crate::{plc, SharedIdResolver};
use anyhow::{bail, Result};
use diesel::prelude::*;
use multibase::Base::Base58Btc;
use rand::{distributions::Alphanumeric, Rng};
use reqwest;
use rocket::form::validate::Contains;
use rocket::State;
use rsky_common::env::{env_int, env_str};
use rsky_crypto::utils::encode_did_key;
use rsky_identity::types::DidDocument;
use secp256k1::{PublicKey, Secp256k1, SecretKey};
use sha2::Digest;
use std::env;
use unsigned_varint::encode::u16 as encode_varint;
const DID_KEY_PREFIX: &str = "did:key:";
#[derive(Debug, Deserialize, Serialize)]
pub struct AssertionContents {
@ -81,29 +74,6 @@ pub fn validate_handle(handle: &str) -> bool {
// Need to check suffix here and need to make sure handle doesn't include "." after trumming it
}
/// https://github.com/gnunicorn/rust-multicodec/blob/master/src/lib.rs#L249-L260
pub fn multicodec_wrap(bytes: Vec<u8>) -> Vec<u8> {
let mut buf = [0u8; 3];
encode_varint(0xe7, &mut buf);
let mut v: Vec<u8> = Vec::new();
for b in &buf {
v.push(*b);
// varint uses first bit to indicate another byte follows, stop if not the case
if *b <= 127 {
break;
}
}
v.extend(bytes);
v
}
pub fn encode_did_key(pubkey: &PublicKey) -> String {
let pk_compact = pubkey.serialize();
let pk_wrapped = multicodec_wrap(pk_compact.to_vec());
let pk_multibase = multibase::encode(Base58Btc, pk_wrapped.as_slice());
format!("{DID_KEY_PREFIX}{pk_multibase}")
}
pub fn get_keys_from_private_key_str(private_key: String) -> Result<(SecretKey, PublicKey)> {
let secp = Secp256k1::new();
let decoded_key = hex::decode(private_key.as_bytes()).map_err(|error| {

View File

@ -1,10 +1,10 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::repo::assert_repo_availability;
use crate::apis::ApiError;
use crate::auth_verifier;
use crate::auth_verifier::OptionalAccessOrAdminToken;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use aws_sdk_s3::operation::get_object::GetObjectError;

View File

@ -1,16 +1,16 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::repo::assert_repo_availability;
use crate::apis::ApiError;
use crate::auth_verifier;
use crate::auth_verifier::OptionalAccessOrAdminToken;
use crate::car::blocks_to_car_file;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use crate::storage::readable_blockstore::ReadableBlockstore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use libipld::Cid;
use rocket::{Responder, State};
use rsky_repo::car::blocks_to_car_file;
use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
use std::str::FromStr;
#[derive(Responder)]

View File

@ -1,10 +1,10 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::repo::assert_repo_availability;
use crate::apis::ApiError;
use crate::auth_verifier;
use crate::auth_verifier::OptionalAccessOrAdminToken;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -1,16 +1,16 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::repo::assert_repo_availability;
use crate::apis::ApiError;
use crate::auth_verifier;
use crate::auth_verifier::OptionalAccessOrAdminToken;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::RecordPath;
use crate::repo::ActorStore;
use crate::storage::types::RepoStorage;
use crate::{auth_verifier, repo};
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use libipld::Cid;
use rocket::{Responder, State};
use rsky_repo::storage::types::RepoStorage;
use rsky_repo::types::RecordPath;
use std::str::FromStr;
#[derive(Responder)]
@ -42,7 +42,7 @@ async fn inner_get_record(
match commit {
None => bail!("Could not find repo for DID: {did}"),
Some(commit) => {
repo::sync::provider::get_records(
rsky_repo::sync::provider::get_records(
actor_store.storage.clone(),
commit,
vec![RecordPath { collection, rkey }],

View File

@ -1,10 +1,10 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::repo::assert_repo_availability;
use crate::apis::ApiError;
use crate::auth_verifier;
use crate::auth_verifier::OptionalAccessOrAdminToken;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use rocket::{Responder, State};

View File

@ -1,11 +1,11 @@
use crate::account_manager::helpers::account::{
format_account_status, AccountStatus, FormattedAccountStatus,
};
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::repo::assert_repo_availability;
use crate::apis::ApiError;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -1,11 +1,11 @@
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::blob::ListBlobsOpts;
use crate::actor_store::ActorStore;
use crate::apis::com::atproto::repo::assert_repo_availability;
use crate::apis::ApiError;
use crate::auth_verifier;
use crate::auth_verifier::OptionalAccessOrAdminToken;
use crate::db::DbConn;
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::blob::ListBlobsOpts;
use crate::repo::ActorStore;
use anyhow::Result;
use aws_config::SdkConfig;
use rocket::serde::json::Json;

View File

@ -2,8 +2,6 @@ use crate::account_manager::helpers::account::{
format_account_status, AccountStatus, ActorAccount, FormattedAccountStatus,
};
use crate::apis::ApiError;
use crate::common::time::{from_millis_to_utc, from_str_to_millis};
use crate::common::RFC3339_VARIANT;
use crate::db::establish_connection;
use anyhow::{anyhow, bail, Result};
use diesel::dsl::sql;
@ -11,6 +9,8 @@ use diesel::prelude::*;
use diesel::sql_types::{Bool, Text};
use diesel::QueryDsl;
use rocket::serde::json::Json;
use rsky_common::time::{from_millis_to_utc, from_str_to_millis};
use rsky_common::RFC3339_VARIANT;
use rsky_lexicon::com::atproto::sync::{ListReposOutput, RefRepo as LexiconRepo, RepoStatus};
#[derive(Debug, Clone)]

View File

@ -1,5 +1,3 @@
use crate::common::time::from_str_to_utc;
use crate::common::RFC3339_VARIANT;
use crate::config::ServerConfig;
use crate::crawlers::Crawlers;
use crate::sequencer::events::{
@ -15,6 +13,8 @@ use chrono::{DateTime, Duration};
use futures::{pin_mut, StreamExt};
use rocket::tokio::select;
use rocket::{Shutdown, State};
use rsky_common::time::from_str_to_utc;
use rsky_common::RFC3339_VARIANT;
use rsky_lexicon::com::atproto::sync::{
SubscribeReposAccount, SubscribeReposCommit, SubscribeReposCommitOperation,
SubscribeReposHandle, SubscribeReposIdentity, SubscribeReposTombstone,

View File

@ -1,8 +1,6 @@
use crate::account_manager::helpers::account::{ActorAccount, AvailabilityFlags};
use crate::account_manager::helpers::auth::CustomClaimObj;
use crate::account_manager::AccountManager;
use crate::common::env::env_str;
use crate::common::get_verification_material;
use crate::xrpc_server::auth::{verify_jwt as verify_service_jwt_server, ServiceJwtPayload};
use crate::SharedIdResolver;
use anyhow::{bail, Result};
@ -12,6 +10,8 @@ use jwt_simple::prelude::*;
use rocket::http::Status;
use rocket::request::{FromRequest, Outcome, Request};
use rocket::State;
use rsky_common::env::env_str;
use rsky_common::get_verification_material;
use rsky_identity::did::atproto_data::get_did_key_from_multibase;
use rsky_identity::types::DidDocument;
use secp256k1::{Keypair, Secp256k1, SecretKey};

View File

@ -1,33 +0,0 @@
use std::env;
pub fn env_int(name: &str) -> Option<usize> {
match env::var(name) {
Ok(str) => match str.parse::<usize>() {
Ok(int) => Some(int),
_ => None,
},
_ => None,
}
}
pub fn env_str(name: &str) -> Option<String> {
match env::var(name) {
Ok(str) => Some(str),
_ => None,
}
}
pub fn env_bool(name: &str) -> Option<bool> {
match env::var(name) {
Ok(str) if str == "true" || str == "1" => Some(true),
Ok(str) if str == "false" || str == "0" => Some(false),
_ => None,
}
}
pub fn env_list(name: &str) -> Vec<String> {
match env::var(name) {
Ok(str) => str.split(",").into_iter().map(|s| s.to_string()).collect(),
_ => Vec::new(),
}
}

View File

@ -1,202 +0,0 @@
use anyhow::Result;
use base64ct::{Base64, Encoding};
use chrono::offset::Utc as UtcOffset;
use chrono::DateTime;
use rand::{distributions::Alphanumeric, Rng};
use rocket::http::Status;
use rocket::request::{FromRequest, Outcome};
use rocket::Request;
use rsky_identity::did::atproto_data::VerificationMaterial;
use rsky_identity::types::DidDocument;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::thread;
use std::time::{Duration, SystemTime};
use thiserror::Error;
use url::Url;
use urlencoding::encode;
pub const RFC3339_VARIANT: &str = "%Y-%m-%dT%H:%M:%S%.3fZ";
#[derive(Error, Debug)]
pub enum BadContentTypeError {
#[error("BadType: `{0}`")]
BadType(String),
#[error("Content-Type header is missing")]
MissingType,
}
#[derive(Clone)]
pub struct ContentType {
pub name: String,
}
#[derive(Debug)]
pub struct GetServiceEndpointOpts {
pub id: String,
pub r#type: Option<String>,
}
/// Used mainly as a way to parse out content-type from request
#[rocket::async_trait]
impl<'r> FromRequest<'r> for ContentType {
type Error = BadContentTypeError;
async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
match req.content_type() {
None => Outcome::Error((
Status::UnsupportedMediaType,
BadContentTypeError::MissingType,
)),
Some(content_type) => Outcome::Success(ContentType {
name: content_type.to_string(),
}),
}
}
}
pub fn now() -> String {
let system_time = SystemTime::now();
let dt: DateTime<UtcOffset> = system_time.into();
format!("{}", dt.format(RFC3339_VARIANT))
}
pub fn wait(ms: u64) {
thread::sleep(Duration::from_millis(ms))
}
pub fn beginning_of_time() -> String {
let beginning_of_time = SystemTime::UNIX_EPOCH;
let dt: DateTime<UtcOffset> = beginning_of_time.into();
format!("{}", dt.format(RFC3339_VARIANT))
}
pub fn get_random_str() -> String {
let token: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect();
token
}
pub fn struct_to_cbor<T: Serialize>(obj: T) -> Result<Vec<u8>> {
Ok(serde_ipld_dagcbor::to_vec(&obj)?)
}
pub fn cbor_to_struct<T: DeserializeOwned>(bytes: Vec<u8>) -> Result<T> {
Ok(serde_ipld_dagcbor::from_slice::<T>(bytes.as_slice())?)
}
pub fn json_to_b64url<T: Serialize>(obj: &T) -> Result<String> {
Ok(Base64::encode_string((&serde_json::to_string(obj)?).as_ref()).replace("=", ""))
}
pub fn encode_uri_component(input: &String) -> String {
encode(input).to_string()
}
// based on did-doc.ts
pub fn get_did(doc: &DidDocument) -> String {
doc.id.clone()
}
pub fn get_handle(doc: &DidDocument) -> Option<String> {
match &doc.also_known_as {
None => None,
Some(aka) => {
let found = aka.into_iter().find(|name| name.starts_with("at://"));
match found {
None => None,
// strip off at:// prefix
Some(found) => Some(found[5..].to_string()),
}
}
}
}
pub fn get_verification_material(
doc: &DidDocument,
key_id: &String,
) -> Option<VerificationMaterial> {
let did = get_did(doc);
let keys = &doc.verification_method;
if let Some(keys) = keys {
let found = keys
.into_iter()
.find(|key| key.id == format!("#{key_id}") || key.id == format!("{did}#{key_id}"));
match found {
Some(found) if found.public_key_multibase.is_some() => {
let found = found.clone();
Some(VerificationMaterial {
r#type: found.r#type,
public_key_multibase: found.public_key_multibase.unwrap(),
})
}
_ => None,
}
} else {
None
}
}
pub fn get_notif_endpoint(doc: DidDocument) -> Option<String> {
get_service_endpoint(
doc,
GetServiceEndpointOpts {
id: "#bsky_notif".to_string(),
r#type: Some("BskyNotificationService".to_string()),
},
)
}
#[tracing::instrument(skip_all)]
pub fn get_service_endpoint(doc: DidDocument, opts: GetServiceEndpointOpts) -> Option<String> {
tracing::info!(
"@LOG: common::get_service_endpoint() doc: {:?}; opts: {:?}",
doc,
opts
);
let did = get_did(&doc);
match doc.service {
None => None,
Some(services) => {
let found = services.iter().find(|service| {
service.id == opts.id || service.id == format!("{}{}", did, opts.id)
});
match found {
None => None,
Some(found) => match opts.r#type {
None => None,
Some(opts_type) if found.r#type == opts_type => {
validate_url(&found.service_endpoint)
}
_ => None,
},
}
}
}
}
// Check protocol and hostname to prevent potential SSRF
pub fn validate_url(url_str: &String) -> Option<String> {
match Url::parse(url_str) {
Err(_) => None,
Ok(url) => {
return if !vec!["http", "https"].contains(&url.scheme()) {
None
} else if url.host().is_none() {
None
} else {
Some(url_str.clone())
}
}
}
}
pub mod r#async;
pub mod env;
pub mod ipld;
pub mod sign;
pub mod tid;
pub mod time;

View File

@ -1,8 +1,8 @@
use crate::common::env::{env_bool, env_int, env_list, env_str};
use crate::common::time::{DAY, HOUR, SECOND};
use crate::context;
use anyhow::{bail, Result};
use reqwest::header::HeaderMap;
use rsky_common::env::{env_bool, env_int, env_list, env_str};
use rsky_common::time::{DAY, HOUR, SECOND};
#[derive(Debug, Clone, PartialEq)]
pub struct ServerConfig {

View File

@ -1,7 +1,7 @@
use crate::common::time::MINUTE;
use crate::APP_USER_AGENT;
use anyhow::Result;
use futures::stream::{self, StreamExt};
use rsky_common::time::MINUTE;
use std::time::SystemTime;
const NOTIFY_THRESHOLD: i32 = 20 * MINUTE; // 20 minutes;

View File

@ -45,10 +45,9 @@ lazy_static! {
}
pub mod account_manager;
pub mod actor_store;
pub mod apis;
pub mod auth_verifier;
pub mod car;
pub mod common;
pub mod config;
pub mod context;
pub mod crawlers;
@ -64,7 +63,5 @@ pub mod read_after_write;
pub mod repo;
pub mod schema;
pub mod sequencer;
pub mod storage;
mod vendored;
pub mod well_known;
pub mod xrpc_server;

View File

@ -17,11 +17,11 @@ use rocket::response::status;
use rocket::serde::json::Json;
use rocket::shield::{NoSniff, Shield};
use rocket::{Request, Response};
use rsky_common::env::env_list;
use rsky_identity::types::{DidCache, IdentityResolverOpts};
use rsky_identity::IdResolver;
use rsky_pds::account_manager::AccountManager;
use rsky_pds::apis::*;
use rsky_pds::common::env::env_list;
use rsky_pds::config::env_to_cfg;
use rsky_pds::crawlers::Crawlers;
use rsky_pds::db::DbConn;

View File

@ -1,7 +1,5 @@
use crate::auth_verifier::{AccessOutput, AccessStandard};
use crate::common::{get_service_endpoint, GetServiceEndpointOpts};
use crate::config::{ServerConfig, ServiceConfig};
use crate::repo::types::Ids;
use crate::xrpc_server::types::{HandlerPipeThrough, InvalidRequestError, XRPCError};
use crate::{context, SharedIdResolver, APP_USER_AGENT};
use anyhow::{bail, Result};
@ -11,6 +9,8 @@ use reqwest::{Client, RequestBuilder, Response};
use rocket::http::{Method, Status};
use rocket::request::{FromRequest, Outcome, Request};
use rocket::State;
use rsky_common::{get_service_endpoint, GetServiceEndpointOpts};
use rsky_repo::types::Ids;
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::collections::{BTreeMap, HashSet};

View File

@ -1,8 +1,8 @@
use crate::common::encode_uri_component;
use crate::plc::operations::update_handle_op;
use crate::plc::types::{CompatibleOp, OpOrTombstone};
use crate::APP_USER_AGENT;
use anyhow::{bail, Result};
use rsky_common::encode_uri_component;
use secp256k1::SecretKey;
use serde::de::DeserializeOwned;
use types::{CompatibleOpOrTombstone, DocumentData};

View File

@ -1,10 +1,10 @@
use crate::common::ipld::cid_for_cbor;
use crate::common::sign::atproto_sign;
use crate::plc::types::{CompatibleOp, CompatibleOpOrTombstone, Operation, Service, Tombstone};
use anyhow::Result;
use data_encoding::BASE32;
use indexmap::IndexMap;
use libipld::Cid;
use rsky_common::ipld::cid_for_cbor;
use rsky_common::sign::atproto_sign;
use secp256k1::SecretKey;
use serde_json::{Value as JsonValue, Value};
use sha2::{Digest, Sha256};

View File

@ -1,10 +1,9 @@
use crate::common::time::from_str_to_utc;
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::db::DbConn;
use crate::pipethrough::parse_res;
use crate::read_after_write::types::LocalRecords;
use crate::read_after_write::viewer::{get_records_since_rev, LocalViewer};
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use crate::xrpc_server::types::HandlerPipeThrough;
use crate::SharedLocalViewer;
use anyhow::Result;
@ -15,6 +14,7 @@ use rocket::http::Status;
use rocket::request::Request;
use rocket::response::{self, Responder, Response};
use rocket::State;
use rsky_common::time::from_str_to_utc;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::BTreeMap;

View File

@ -1,12 +1,10 @@
use crate::account_manager::helpers::auth::ServiceJwtParams;
use crate::account_manager::AccountManager;
use crate::common::beginning_of_time;
use crate::actor_store::ActorStore;
use crate::db::establish_connection;
use crate::models::models;
use crate::read_after_write::types::{LocalRecords, RecordDescript};
use crate::read_after_write::util;
use crate::repo::types::Ids;
use crate::repo::ActorStore;
use crate::xrpc_server::auth::create_service_auth_headers;
use crate::APP_USER_AGENT;
use anyhow::{bail, Result};
@ -29,6 +27,7 @@ use diesel::*;
use futures::stream::{self, StreamExt};
use libipld::Cid;
use reqwest::header::HeaderMap;
use rsky_common::beginning_of_time;
use rsky_lexicon::app::bsky::actor::{Profile, ProfileView, ProfileViewBasic, ProfileViewDetailed};
use rsky_lexicon::app::bsky::embed::external::{
ExternalObject, View as ExternalView, ViewExternal,
@ -43,6 +42,7 @@ use rsky_lexicon::app::bsky::embed::record_with_media::{
use rsky_lexicon::app::bsky::embed::{record, EmbedViews, Embeds, MediaUnion, MediaViewUnion};
use rsky_lexicon::app::bsky::feed::{FeedViewPost, GeneratorView, Post, PostView};
use rsky_lexicon::app::bsky::graph::ListView;
use rsky_repo::types::Ids;
use rsky_syntax::aturi::AtUri;
use rsky_syntax::handle::INVALID_HANDLE;
use secp256k1::SecretKey;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,267 @@
use crate::lexicon::LEXICONS;
use anyhow::bail;
use lazy_static::lazy_static;
use lexicon_cid::Cid;
use rsky_common::ipld::cid_for_cbor;
use rsky_common::tid::Ticker;
use rsky_lexicon::blob_refs::{BlobRef, JsonBlobRef};
use rsky_repo::storage::Ipld;
use rsky_repo::types::{
BlobConstraint, Ids, Lex, PreparedBlobRef, PreparedCreateOrUpdate, PreparedDelete, RepoRecord,
WriteOpAction,
};
use rsky_repo::util::{cbor_to_lex, lex_to_ipld};
use rsky_syntax::aturi::AtUri;
use serde_json::{json, Value as JsonValue};
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct FoundBlobRef {
pub r#ref: BlobRef,
pub path: Vec<String>,
}
pub struct PrepareCreateOpts {
pub did: String,
pub collection: String,
pub rkey: Option<String>,
pub swap_cid: Option<Cid>,
pub record: RepoRecord,
pub validate: Option<bool>,
}
pub struct PrepareUpdateOpts {
pub did: String,
pub collection: String,
pub rkey: String,
pub swap_cid: Option<Cid>,
pub record: RepoRecord,
pub validate: Option<bool>,
}
pub struct PrepareDeleteOpts {
pub did: String,
pub collection: String,
pub rkey: String,
pub swap_cid: Option<Cid>,
}
pub fn blobs_for_write(record: RepoRecord, validate: bool) -> anyhow::Result<Vec<PreparedBlobRef>> {
let refs = find_blob_refs(Lex::Map(record.clone()), None, None);
let record_type = match record.get("$type") {
Some(Lex::Ipld(Ipld::String(t))) => Some(t),
_ => None,
};
for r#ref in refs.clone() {
if matches!(r#ref.r#ref.original, JsonBlobRef::Untyped(_)) {
bail!("Legacy blob ref at `{}`", r#ref.path.join("/"))
}
}
refs.into_iter()
.map(|FoundBlobRef { r#ref, path }| {
let constraints: BlobConstraint = match (validate, record_type) {
(true, Some(record_type)) => {
let properties: crate::lexicon::lexicons::Image2 = serde_json::from_value(
crate::repo::prepare::CONSTRAINTS[record_type.as_str()][path.join("/")]
.clone(),
)?;
BlobConstraint {
max_size: Some(properties.max_size as usize),
accept: Some(properties.accept),
}
}
(_, _) => BlobConstraint {
max_size: None,
accept: None,
},
};
Ok(PreparedBlobRef {
cid: r#ref.get_cid()?,
mime_type: r#ref.get_mime_type().to_string(),
constraints,
})
})
.collect::<anyhow::Result<Vec<PreparedBlobRef>>>()
}
pub fn find_blob_refs(val: Lex, path: Option<Vec<String>>, layer: Option<u8>) -> Vec<FoundBlobRef> {
let layer = layer.unwrap_or_else(|| 0);
let path = path.unwrap_or_else(|| vec![]);
if layer > 32 {
return vec![];
}
// walk arrays
match val {
Lex::List(list) => list
.into_iter()
.flat_map(|item| find_blob_refs(item, Some(path.clone()), Some(layer + 1)))
.collect::<Vec<FoundBlobRef>>(),
Lex::Blob(blob) => vec![FoundBlobRef { r#ref: blob, path }],
Lex::Ipld(Ipld::Json(JsonValue::Array(list))) => list
.into_iter()
.flat_map(|item| match serde_json::from_value::<RepoRecord>(item) {
Ok(item) => find_blob_refs(Lex::Map(item), Some(path.clone()), Some(layer + 1)),
Err(_) => vec![],
})
.collect::<Vec<FoundBlobRef>>(),
Lex::Ipld(Ipld::Json(json)) => match serde_json::from_value::<JsonBlobRef>(json.clone()) {
Ok(blob) => vec![FoundBlobRef {
r#ref: BlobRef { original: blob },
path,
}],
Err(_) => match serde_json::from_value::<RepoRecord>(json) {
Ok(record) => record
.into_iter()
.flat_map(|(key, item)| {
find_blob_refs(
item,
Some([path.as_slice(), [key].as_slice()].concat()),
Some(layer + 1),
)
})
.collect::<Vec<FoundBlobRef>>(),
Err(_) => vec![],
},
},
Lex::Ipld(_) => vec![],
Lex::Map(map) => map
.into_iter()
.flat_map(|(key, item)| {
find_blob_refs(
item,
Some([path.as_slice(), [key].as_slice()].concat()),
Some(layer + 1),
)
})
.collect::<Vec<FoundBlobRef>>(),
}
}
pub fn assert_valid_record(record: &RepoRecord) -> anyhow::Result<()> {
match record.get("$type") {
Some(Lex::Ipld(Ipld::String(_))) => Ok(()),
_ => bail!("No $type provided"),
}
}
pub fn set_collection_name(
collection: &String,
mut record: RepoRecord,
validate: bool,
) -> anyhow::Result<RepoRecord> {
if record.get("$type").is_none() {
record.insert(
"$type".to_string(),
Lex::Ipld(Ipld::Json(JsonValue::String(collection.clone()))),
);
}
if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(record_type)))) = record.get("$type") {
if validate && record_type.to_string() != *collection {
bail!("Invalid $type: expected {collection}, got {record_type}")
}
}
Ok(record)
}
pub async fn cid_for_safe_record(record: RepoRecord) -> anyhow::Result<Cid> {
let lex = lex_to_ipld(Lex::Map(record));
let block = serde_ipld_dagcbor::to_vec(&lex)?;
// Confirm whether Block properly transforms between lex and cbor
let _ = cbor_to_lex(block)?;
cid_for_cbor(&lex)
}
pub async fn prepare_create(opts: PrepareCreateOpts) -> anyhow::Result<PreparedCreateOrUpdate> {
let PrepareCreateOpts {
did,
collection,
rkey,
swap_cid,
validate,
..
} = opts;
let validate = validate.unwrap_or_else(|| true);
let record = set_collection_name(&collection, opts.record, validate)?;
if validate {
assert_valid_record(&record)?;
}
// assert_no_explicit_slurs(rkey, record).await?;
let next_rkey = Ticker::new().next(None);
let rkey = rkey.unwrap_or(next_rkey.to_string());
let uri = AtUri::make(did, Some(collection), Some(rkey))?;
Ok(PreparedCreateOrUpdate {
action: WriteOpAction::Create,
uri: uri.to_string(),
cid: cid_for_safe_record(record.clone()).await?,
swap_cid,
record: record.clone(),
blobs: blobs_for_write(record, validate)?,
})
}
pub async fn prepare_update(opts: PrepareUpdateOpts) -> anyhow::Result<PreparedCreateOrUpdate> {
let PrepareUpdateOpts {
did,
collection,
rkey,
swap_cid,
validate,
..
} = opts;
let validate = validate.unwrap_or_else(|| true);
let record = set_collection_name(&collection, opts.record, validate)?;
if validate {
assert_valid_record(&record)?;
}
// assert_no_explicit_slurs(rkey, record).await?;
let uri = AtUri::make(did, Some(collection), Some(rkey))?;
Ok(PreparedCreateOrUpdate {
action: WriteOpAction::Update,
uri: uri.to_string(),
cid: cid_for_safe_record(record.clone()).await?,
swap_cid,
record: record.clone(),
blobs: blobs_for_write(record, validate)?,
})
}
pub fn prepare_delete(opts: PrepareDeleteOpts) -> anyhow::Result<PreparedDelete> {
let PrepareDeleteOpts {
did,
collection,
rkey,
swap_cid,
} = opts;
let uri = AtUri::make(did, Some(collection), Some(rkey))?;
Ok(PreparedDelete {
action: WriteOpAction::Delete,
uri: uri.to_string(),
swap_cid,
})
}
lazy_static! {
static ref CONSTRAINTS: JsonValue = {
json!({
Ids::AppBskyActorProfile.as_str(): {
"avatar": LEXICONS.app_bsky_actor_profile.defs.main.record.properties.avatar,
"banner": LEXICONS.app_bsky_actor_profile.defs.main.record.properties.banner
},
Ids::AppBskyFeedGenerator.as_str(): {
"avatar": LEXICONS.app_bsky_feed_generator.defs.main.record.properties.avatar
},
Ids::AppBskyGraphList.as_str(): {
"avatar": LEXICONS.app_bsky_graph_list.defs.main.record.properties.avatar
},
Ids::AppBskyFeedPost.as_str(): {
"embed/images/image": LEXICONS.app_bsky_embed_images.defs.image.properties.image,
"embed/external/thumb": LEXICONS.app_bsky_embed_external.defs.external.properties.thumb,
"embed/media/images/image": LEXICONS.app_bsky_embed_images.defs.image.properties.image,
"embed/media/external/thumb": LEXICONS.app_bsky_embed_external.defs.external.properties.thumb
}
})
};
}

View File

@ -1,15 +1,15 @@
use crate::account_manager::helpers::account::AccountStatus;
use crate::car::blocks_to_car_file;
use crate::common;
use crate::common::struct_to_cbor;
use crate::models::models;
use crate::repo::block_map::BlockMap;
use crate::repo::cid_set::CidSet;
use crate::repo::types::{CommitData, PreparedWrite};
use crate::repo::util::format_data_key;
use anyhow::Result;
use lexicon_cid::Cid;
use rsky_common;
use rsky_common::struct_to_cbor;
use rsky_lexicon::com::atproto::sync::AccountStatus as LexiconAccountStatus;
use rsky_repo::block_map::BlockMap;
use rsky_repo::car::blocks_to_car_file;
use rsky_repo::cid_set::CidSet;
use rsky_repo::types::{CommitData, PreparedWrite};
use rsky_repo::util::format_data_key;
use rsky_syntax::aturi::AtUri;
use serde::de::Error as DeserializerError;
use serde::{Deserialize, Deserializer};
@ -95,7 +95,7 @@ impl Default for TypedCommitEvt {
Self {
r#type: "commit".to_string(),
seq: 0,
time: common::now(),
time: rsky_common::now(),
evt: CommitEvt {
rebase: false,
too_big: false,
@ -125,7 +125,7 @@ impl Default for TypedHandleEvt {
Self {
r#type: "handle".to_string(),
seq: 0,
time: common::now(),
time: rsky_common::now(),
evt: HandleEvt {
did: "".to_string(),
handle: "".to_string(),
@ -281,8 +281,8 @@ pub async fn format_seq_commit(
Ok(models::RepoSeq::new(
did,
"append".to_string(),
struct_to_cbor(evt)?,
common::now(),
struct_to_cbor(&evt)?,
rsky_common::now(),
))
}
@ -294,8 +294,8 @@ pub async fn format_seq_handle_update(did: String, handle: String) -> Result<mod
Ok(models::RepoSeq::new(
did,
"handle".to_string(),
struct_to_cbor(evt)?,
common::now(),
struct_to_cbor(&evt)?,
rsky_common::now(),
))
}
@ -313,8 +313,8 @@ pub async fn format_seq_identity_evt(
Ok(models::RepoSeq::new(
did,
"identity".to_string(),
struct_to_cbor(evt)?,
common::now(),
struct_to_cbor(&evt)?,
rsky_common::now(),
))
}
@ -337,8 +337,8 @@ pub async fn format_seq_account_evt(did: String, status: AccountStatus) -> Resul
Ok(models::RepoSeq::new(
did,
"account".to_string(),
struct_to_cbor(evt)?,
common::now(),
struct_to_cbor(&evt)?,
rsky_common::now(),
))
}
@ -347,7 +347,7 @@ pub async fn format_seq_tombstone(did: String) -> Result<models::RepoSeq> {
Ok(models::RepoSeq::new(
did,
"tombstone".to_string(),
struct_to_cbor(evt)?,
common::now(),
struct_to_cbor(&evt)?,
rsky_common::now(),
))
}

View File

@ -1,10 +1,7 @@
use crate::account_manager::helpers::account::AccountStatus;
use crate::common::time::SECOND;
use crate::common::{cbor_to_struct, wait};
use crate::crawlers::Crawlers;
use crate::db::establish_connection;
use crate::models;
use crate::repo::types::{CommitData, PreparedWrite};
use crate::sequencer::events::{
format_seq_account_evt, format_seq_commit, format_seq_handle_update, format_seq_identity_evt,
format_seq_tombstone, SeqEvt, TypedAccountEvt, TypedCommitEvt, TypedHandleEvt,
@ -14,6 +11,9 @@ use crate::EVENT_EMITTER;
use anyhow::Result;
use diesel::*;
use futures::{Stream, StreamExt};
use rsky_common::time::SECOND;
use rsky_common::{cbor_to_struct, wait};
use rsky_repo::types::{CommitData, PreparedWrite};
use std::cmp;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

View File

@ -1,4 +1,3 @@
use crate::common::r#async::{AsyncBuffer, AsyncBufferFullError};
use crate::sequencer::events::SeqEvt;
use crate::sequencer::{RequestSeqRangeOpts, Sequencer};
use crate::EVENT_EMITTER;
@ -6,6 +5,7 @@ use anyhow::{anyhow, Result};
use futures::stream::Stream;
use futures::{pin_mut, StreamExt};
use rocket::async_stream::try_stream;
use rsky_common::r#async::{AsyncBuffer, AsyncBufferFullError};
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::{Mutex, RwLock};

View File

@ -1,8 +1,8 @@
use crate::common::struct_to_cbor;
use crate::xrpc_server::stream::types::{
ErrorFrameBody, ErrorFrameHeader, FrameType, MessageFrameHeader,
};
use anyhow::Result;
use rsky_common::struct_to_cbor;
use serde_json::Value;
pub trait Frame {

40
rsky-repo/Cargo.toml Normal file
View File

@ -0,0 +1,40 @@
[package]
name = "rsky-repo"
version = "0.0.1"
authors = ["Rudy Fraser <him@rudyfraser.com>"]
description = "Rust crate for atproto repositories, an in particular the Merkle Search Tree (MST) data structure."
license = "Apache-2.0"
edition = "2021"
publish = true
homepage = "https://blackskyweb.xyz"
repository = "https://github.com/blacksky-algorithms/rsky/tree/main/rsky-repo"
documentation = "https://docs.rs/rsky-repo"
[dependencies]
rsky-syntax = { workspace = true }
rsky-common = { workspace = true }
rsky-crypto = {workspace = true}
rsky-lexicon = {workspace = true}
serde = {workspace = true}
serde_derive = {workspace = true}
serde_cbor = { workspace = true }
serde_ipld_dagcbor = {workspace = true}
serde_json = {workspace = true}
serde_bytes = {workspace = true}
lexicon_cid = {workspace = true}
tokio = {workspace = true}
async-recursion = "1.1.1"
sha2 = {workspace = true}
anyhow = "1.0.79" # @TODO: Remove anyhow in lib
futures = "0.3.28"
thiserror = "1.0.40"
async-stream = "0.3.5"
async-trait = "0.1.86"
integer-encoding = { version = "3", features = ["tokio_async"] }
rand = "0.8.5"
rand_core = "0.6.4"
secp256k1 = {workspace = true}
libipld = {workspace = true}
regex = "1.10.3"
lazy_static = "1.4.0"

7
rsky-repo/README.md Normal file
View File

@ -0,0 +1,7 @@
# rsky-repo: Repository and MST
Rust crate for atproto repositories, and in particular the Merkle Search Tree (MST) data structure.
## License
rsky is released under the [Apache License 2.0](../LICENSE).

View File

@ -1,8 +1,8 @@
use crate::common;
use crate::common::ipld;
use crate::repo::types::CidAndBytes;
use crate::types::CidAndBytes;
use anyhow::Result;
use lexicon_cid::Cid;
use rsky_common;
use rsky_common::ipld;
use serde::Serialize;
use std::collections::BTreeMap;
use std::str::FromStr;
@ -30,7 +30,7 @@ impl BlockMap {
let cid = ipld::cid_for_cbor(&value)?;
self.set(
cid,
common::struct_to_cbor(value)?, //bytes
rsky_common::struct_to_cbor(&value)?, //bytes
);
Ok(cid)
}

View File

@ -1,5 +1,5 @@
use crate::repo::block_map::BlockMap;
use crate::repo::util::stream_to_buffer;
use crate::block_map::BlockMap;
use crate::util::stream_to_buffer;
use crate::vendored::iroh_car::{CarHeader, CarReader, CarWriter};
use anyhow::{bail, Result};
use async_stream::stream;

View File

@ -1,7 +1,7 @@
use crate::repo::block_map::BlockMap;
use crate::repo::cid_set::CidSet;
use crate::repo::mst::diff::mst_diff;
use crate::repo::mst::{NodeEntry, MST};
use crate::block_map::BlockMap;
use crate::cid_set::CidSet;
use crate::mst::diff::mst_diff;
use crate::mst::{NodeEntry, MST};
use anyhow::Result;
use lexicon_cid::Cid;
use std::collections::HashMap;

19
rsky-repo/src/lib.rs Normal file
View File

@ -0,0 +1,19 @@
#[macro_use]
extern crate serde_derive;
extern crate core;
extern crate serde;
pub mod block_map;
pub mod car;
pub mod cid_set;
pub mod data_diff;
pub mod error;
pub mod mst;
pub mod parse;
pub mod readable_repo;
pub mod repo;
pub mod storage;
pub mod sync;
pub mod types;
pub mod util;
mod vendored;

View File

@ -1,6 +1,6 @@
use crate::repo::data_diff::DataDiff;
use crate::repo::mst::walker::{MstWalker, WalkerStatus};
use crate::repo::mst::{NodeEntry, MST};
use crate::data_diff::DataDiff;
use crate::mst::walker::{MstWalker, WalkerStatus};
use crate::mst::{NodeEntry, MST};
use anyhow::{bail, Result};
use futures::StreamExt;

View File

@ -12,23 +12,23 @@
* For atproto, we use SHA-256 as the key hashing algorithm, and ~4 fanout
* (2-bits of zero per layer).
*/
use crate::common;
use crate::common::ipld;
use crate::common::tid::Ticker;
use crate::repo::block_map::BlockMap;
use crate::repo::cid_set::CidSet;
use crate::repo::error::DataStoreError;
use crate::repo::parse;
use crate::repo::types::CidAndBytes;
use crate::block_map::BlockMap;
use crate::cid_set::CidSet;
use crate::error::DataStoreError;
use crate::parse;
use crate::storage::types::RepoStorage;
use crate::storage::ObjAndBytes;
use crate::types::CidAndBytes;
use crate::vendored::iroh_car::CarWriter;
use anyhow::{anyhow, Result};
use async_recursion::async_recursion;
use async_stream::stream;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use lexicon_cid::Cid;
use rocket::async_stream::stream;
use rocket::async_trait;
use rsky_common;
use rsky_common::ipld;
use rsky_common::tid::Ticker;
use serde_cbor::Value as CborValue;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;
@ -691,7 +691,7 @@ impl MST {
let data = util::serialize_node_data(entries.as_slice()).await?;
Ok(CidAndBytes {
cid: ipld::cid_for_cbor(&data)?,
bytes: common::struct_to_cbor(data)?,
bytes: rsky_common::struct_to_cbor(&data)?,
})
}
@ -1566,7 +1566,7 @@ pub mod walker;
mod tests {
use super::util::*;
use super::*;
use crate::repo::data_diff::{DataAdd, DataDelete, DataDiff, DataUpdate};
use crate::data_diff::{DataAdd, DataDelete, DataDiff, DataUpdate};
use crate::storage::memory_blockstore::MemoryBlockstore;
use anyhow::Result;
use rand::seq::SliceRandom;

View File

@ -1,13 +1,13 @@
use super::{Leaf, NodeData, NodeEntry, TreeEntry, MST};
use crate::common;
use crate::common::ipld::cid_for_cbor;
use crate::common::tid::Ticker;
use crate::storage::types::RepoStorage;
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
use lexicon_cid::Cid;
use rand::{thread_rng, Rng};
use regex::Regex;
use rsky_common;
use rsky_common::ipld::cid_for_cbor;
use rsky_common::tid::Ticker;
use serde_json::json;
use sha2::{Digest, Sha256};
use std::collections::BTreeMap;
@ -194,7 +194,7 @@ pub async fn random_cid(
) -> Result<Cid> {
let record = json!({ "test": random_str(50) });
let cid = cid_for_cbor(&record)?;
let bytes = common::struct_to_cbor(record)?;
let bytes = rsky_common::struct_to_cbor(&record)?;
if let Some(ref mut storage) = storage {
if let Some(rev) = rev {
storage.put_block(cid, bytes, rev).await?;

Some files were not shown because too many files have changed in this diff Show More