add getSubjectStatus endpoint

This commit is contained in:
Rudy Fraser
2024-06-07 01:47:18 -04:00
parent 7b7d3d34ee
commit fb04770bbc
9 changed files with 249 additions and 15 deletions

View File

@ -1,3 +1,4 @@
use crate::com::atproto::repo::StrongRef;
use crate::com::atproto::server::InviteCode;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -53,13 +54,24 @@ pub struct UpdateAccountPasswordInput {
pub password: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[derive(Debug, Serialize, Clone)]
pub struct GetInviteCodesOutput {
#[serde(skip_serializing_if = "Option::is_none")]
pub cursor: Option<String>,
pub codes: Vec<InviteCode>,
}
#[derive(Debug, Serialize)]
pub struct GetSubjectStatusOutput {
pub subject: Subject,
pub takedown: Option<StatusAttr>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deactivated: Option<StatusAttr>,
}
// Defs
// ----
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct AccountView {
pub did: String,
@ -79,3 +91,34 @@ pub struct AccountView {
#[serde(rename = "inviteNote")]
pub invite_note: Option<String>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct StatusAttr {
pub applied: bool,
pub r#ref: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum Subject {
RepoRef(RepoRef),
StrongRef(StrongRef),
RepoBlobRef(RepoBlobRef),
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct RepoRef {
#[serde(rename = "$type")]
pub r#type: String,
pub did: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct RepoBlobRef {
#[serde(rename = "$type")]
pub r#type: String,
pub did: String,
pub cid: String,
#[serde(rename = "recordUri")]
pub record_uri: Option<String>,
}

View File

@ -2,6 +2,8 @@ use serde_json::Value;
#[derive(Debug, Serialize, Deserialize)]
pub struct StrongRef {
#[serde(rename = "$type")]
pub r#type: String,
pub uri: String,
pub cid: String,
}

View File

@ -255,7 +255,7 @@ pub struct RequestEmailUpdateOutput {
pub token_required: bool,
}
// Refs
// Defs
// ----
#[derive(Clone, Debug, Deserialize, Serialize)]

View File

@ -13,6 +13,7 @@ use diesel::helper_types::{Eq, IntoBoxed};
use diesel::pg::Pg;
use diesel::result::{DatabaseErrorKind, Error as DieselError};
use diesel::*;
use rsky_lexicon::com::atproto::admin::StatusAttr;
use std::ops::Add;
use std::time::SystemTime;
use thiserror::Error;
@ -30,6 +31,11 @@ pub struct AvailabilityFlags {
pub include_deactivated: Option<bool>,
}
pub struct GetAccountAdminStatusOutput {
pub takedown: StatusAttr,
pub deactivated: StatusAttr,
}
pub type ActorJoinAccount =
LeftJoinOn<ActorTable, AccountTable, Eq<ActorSchema::did, AccountSchema::did>>;
pub type BoxedQuery<'a> = IntoBoxed<'a, ActorJoinAccount, Pg>;
@ -325,3 +331,42 @@ pub async fn set_email_confirmed_at(did: &String, email_confirmed_at: String) ->
.execute(conn)?;
Ok(())
}
pub async fn get_account_admin_status(did: &String) -> Result<Option<GetAccountAdminStatusOutput>> {
let conn = &mut establish_connection()?;
let res: Option<(Option<String>, Option<String>)> = ActorSchema::actor
.filter(ActorSchema::did.eq(did))
.select((ActorSchema::takedownRef, ActorSchema::deactivatedAt))
.first(conn)
.optional()?;
match res {
None => Ok(None),
Some(res) => {
let takedown = match res.0 {
Some(takedown_ref) => StatusAttr {
applied: true,
r#ref: Some(takedown_ref),
},
None => StatusAttr {
applied: false,
r#ref: None,
},
};
let deactivated = match res.1 {
Some(_) => StatusAttr {
applied: true,
r#ref: None,
},
None => StatusAttr {
applied: false,
r#ref: None,
},
};
Ok(Some(GetAccountAdminStatusOutput {
takedown,
deactivated,
}))
}
}
}

View File

@ -1,4 +1,6 @@
use crate::account_manager::helpers::account::{ActorAccount, AvailabilityFlags};
use crate::account_manager::helpers::account::{
ActorAccount, AvailabilityFlags, GetAccountAdminStatusOutput,
};
use crate::account_manager::helpers::auth::{
AuthHelperError, CreateTokensOpts, RefreshGracePeriodOpts,
};
@ -147,6 +149,12 @@ impl AccountManager {
Ok((access_jwt, refresh_jwt))
}
pub async fn get_account_admin_status(
did: &String,
) -> Result<Option<GetAccountAdminStatusOutput>> {
account::get_account_admin_status(did).await
}
pub fn update_repo_root(did: String, cid: Cid, rev: String) -> Result<()> {
Ok(repo::update_root(did, cid, rev)?)
}

View File

@ -1,4 +1,120 @@
#[rocket::get("/xrpc/com.atproto.admin.getSubjectStatus")]
pub async fn get_subject_status() {
todo!();
use crate::account_manager::AccountManager;
use crate::auth_verifier::Moderator;
use crate::models::{InternalErrorCode, InternalErrorMessageResponse};
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::ActorStore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use futures::try_join;
use rocket::http::Status;
use rocket::response::status;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::admin::{GetSubjectStatusOutput, RepoBlobRef, RepoRef, Subject};
use rsky_lexicon::com::atproto::repo::StrongRef;
use std::str::FromStr;
use libipld::Cid;
async fn inner_get_subject_status(
did: Option<String>,
uri: Option<String>,
blob: Option<String>,
s3_config: &State<SdkConfig>,
) -> Result<GetSubjectStatusOutput> {
let mut body: Option<GetSubjectStatusOutput> = None;
if let Some(blob) = blob {
match did {
None => bail!("Must provide a did to request blob state"),
Some(did) => {
let actor_store = ActorStore::new(
did.clone(),
S3BlobStore::new(did.clone(), s3_config),
);
let takedown = actor_store
.blob
.get_blob_takedown_status(Cid::from_str(&blob)?)
.await?;
if let Some(takedown) = takedown {
body = Some(GetSubjectStatusOutput {
subject: Subject::RepoBlobRef(RepoBlobRef {
r#type: "com.atproto.admin.defs#repoBlobRef".to_string(),
did,
cid: blob,
record_uri: None,
}),
takedown: Some(takedown),
deactivated: None,
});
}
}
}
} else if let Some(uri) = uri {
let uri_without_prefix = uri.replace("at://", "");
let parts = uri_without_prefix.split("/").collect::<Vec<&str>>();
if let (Some(uri_hostname), Some(_), Some(_)) =
(parts.get(0), parts.get(1), parts.get(2))
{
let actor_store = ActorStore::new(
uri_hostname.to_string(),
S3BlobStore::new(uri_hostname.to_string(), s3_config),
);
let (takedown, cid) = try_join!(
actor_store.record.get_record_takedown_status(uri.clone()),
actor_store.record.get_current_record_cid(uri.clone()),
)?;
if let (Some(cid), Some(takedown)) = (cid, takedown) {
body = Some(GetSubjectStatusOutput {
subject: Subject::StrongRef(StrongRef {
r#type: "com.atproto.repo.strongRef".to_string(),
uri,
cid: cid.to_string(),
}),
takedown: Some(takedown),
deactivated: None,
});
}
}
} else if let Some(did) = did {
let status = AccountManager::get_account_admin_status(&did).await?;
if let Some(status) = status {
body = Some(GetSubjectStatusOutput {
subject: Subject::RepoRef(RepoRef {
r#type: "com.atproto.admin.defs#repoRef".to_string(),
did,
}),
takedown: Some(status.takedown),
deactivated: Some(status.deactivated),
});
}
} else {
bail!("No provided subject");
}
match body {
None => bail!("NotFound: Subject not found"),
Some(body) => Ok(body),
}
}
#[rocket::get("/xrpc/com.atproto.admin.getSubjectStatus?<did>&<uri>&<blob>")]
pub async fn get_subject_status(
did: Option<String>,
uri: Option<String>,
blob: Option<String>,
s3_config: &State<SdkConfig>,
_auth: Moderator,
) -> Result<(), status::Custom<Json<InternalErrorMessageResponse>>> {
match inner_get_subject_status(did, uri, blob, s3_config).await {
Ok(_) => Ok(()),
Err(error) => {
let internal_error = InternalErrorMessageResponse {
code: Some(InternalErrorCode::InternalError),
message: Some(error.to_string()),
};
return Err(status::Custom(
Status::InternalServerError,
Json(internal_error),
));
}
}
}

View File

@ -15,6 +15,7 @@ use futures::try_join;
use libipld::Cid;
use rocket::data::{Data, ToByteUnit};
use rocket::form::validate::Contains;
use rsky_lexicon::com::atproto::admin::StatusAttr;
use rsky_lexicon::com::atproto::repo::ListMissingBlobsRefRecordBlob;
use sha2::{Digest, Sha256};
@ -371,6 +372,30 @@ impl BlobReader {
})
.collect())
}
pub async fn get_blob_takedown_status(&self, cid: Cid) -> Result<Option<StatusAttr>> {
use crate::schema::pds::blob::dsl as BlobSchema;
let conn = &mut establish_connection()?;
let res = BlobSchema::blob
.filter(BlobSchema::cid.eq(cid.to_string()))
.select(models::Blob::as_select())
.first(conn)
.optional()?;
match res {
None => Ok(None),
Some(res) => match res.takedown_ref {
None => Ok(Some(StatusAttr {
applied: false,
r#ref: None,
})),
Some(takedown_ref) => Ok(Some(StatusAttr {
applied: true,
r#ref: Some(takedown_ref),
})),
},
}
}
}
pub async fn accepted_mime(mime: String, accepted: Vec<String>) -> bool {

View File

@ -1,12 +1,13 @@
use crate::db::establish_connection;
use crate::models::models;
use crate::repo::types::{Ids, Lex, RepoRecord, StatusAttr, WriteOpAction};
use crate::repo::types::{Ids, Lex, RepoRecord, WriteOpAction};
use crate::repo::util::cbor_to_lex_record;
use crate::storage::Ipld;
use anyhow::Result;
use diesel::*;
use futures::stream::{self, StreamExt};
use libipld::Cid;
use rsky_lexicon::com::atproto::admin::StatusAttr;
use serde_json::Value as JsonValue;
use std::env;
use std::str::FromStr;
@ -226,7 +227,7 @@ impl RecordReader {
Ok(!!record_uri.is_some())
}
pub async fn get_record_takedown_status(&mut self, uri: String) -> Result<Option<StatusAttr>> {
pub async fn get_record_takedown_status(&self, uri: String) -> Result<Option<StatusAttr>> {
use crate::schema::pds::record::dsl as RecordSchema;
let conn = &mut establish_connection()?;
@ -252,7 +253,7 @@ impl RecordReader {
}
}
pub async fn get_current_record_cid(&mut self, uri: String) -> Result<Option<Cid>> {
pub async fn get_current_record_cid(&self, uri: String) -> Result<Option<Cid>> {
use crate::schema::pds::record::dsl as RecordSchema;
let conn = &mut establish_connection()?;

View File

@ -320,12 +320,6 @@ pub struct VerifiedRepo {
pub type CarBlock = CidAndBytes;
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct StatusAttr {
pub applied: bool,
pub r#ref: Option<String>,
}
pub struct CidAndBytes {
pub cid: Cid,
pub bytes: Vec<u8>,