mirror of
https://github.com/blacksky-algorithms/rsky.git
synced 2025-03-14 13:59:02 +00:00
Correct broken MST
This commit is contained in:
@ -106,12 +106,14 @@ CREATE TABLE IF NOT EXISTS pds.repo_root (
|
||||
|
||||
-- Create Repo Block Table
|
||||
CREATE TABLE IF NOT EXISTS pds.repo_block (
|
||||
cid character varying PRIMARY KEY,
|
||||
cid character varying NOT NULL,
|
||||
did character varying NOT NULL,
|
||||
"repoRev" character varying NOT NULL,
|
||||
size integer NOT NULL,
|
||||
content bytea NOT NULL
|
||||
);
|
||||
ALTER TABLE ONLY pds.repo_block
|
||||
ADD CONSTRAINT repo_block_pkey PRIMARY KEY (cid, did);
|
||||
CREATE INDEX repo_block_repo_rev_idx
|
||||
ON pds.repo_block("repoRev", cid);
|
||||
|
||||
@ -135,7 +137,7 @@ CREATE INDEX record_repo_rev_idx
|
||||
|
||||
-- Create Blob Table
|
||||
CREATE TABLE IF NOT EXISTS pds.blob (
|
||||
cid character varying PRIMARY KEY,
|
||||
cid character varying NOT NULL,
|
||||
did character varying NOT NULL,
|
||||
"mimeType" character varying NOT NULL,
|
||||
size integer NOT NULL,
|
||||
@ -145,6 +147,8 @@ CREATE TABLE IF NOT EXISTS pds.blob (
|
||||
"createdAt" character varying NOT NULL,
|
||||
"takedownRef" character varying
|
||||
);
|
||||
ALTER TABLE ONLY pds.blob
|
||||
ADD CONSTRAINT blob_pkey PRIMARY KEY (cid, did);
|
||||
CREATE INDEX blob_tempkey_idx
|
||||
ON pds.blob("tempKey");
|
||||
|
||||
|
@ -59,6 +59,7 @@ async fn inner_resolve_handle(
|
||||
}
|
||||
|
||||
// this is not someone on our server, but we help with resolving anyway
|
||||
// @TODO: Weird error about Tokio received when this fails that leads to panic
|
||||
if did.is_none() && env_str("PDS_BSKY_APP_VIEW_URL").is_some() {
|
||||
did = try_resolve_from_app_view(&handle).await?;
|
||||
}
|
||||
|
@ -73,6 +73,7 @@ async fn inner_create_record(
|
||||
}
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
// @TODO: Use ATUri
|
||||
let backlink_deletions: Vec<PreparedDelete> = backlink_conflicts
|
||||
.into_iter()
|
||||
|
@ -1,18 +1,19 @@
|
||||
use anyhow::{Result, bail};
|
||||
use crate::account_manager::AccountManager;
|
||||
use crate::account_manager::helpers::account::{ActorAccount, AvailabilityFlags};
|
||||
use crate::account_manager::AccountManager;
|
||||
use anyhow::{bail, Result};
|
||||
|
||||
pub async fn assert_repo_availability(
|
||||
did: &String,
|
||||
is_admin_of_self: bool
|
||||
is_admin_of_self: bool,
|
||||
) -> Result<ActorAccount> {
|
||||
let account = AccountManager::get_account(
|
||||
did,
|
||||
Some(AvailabilityFlags {
|
||||
include_deactivated: Some(true),
|
||||
include_taken_down: Some(true)
|
||||
})
|
||||
).await?;
|
||||
include_taken_down: Some(true),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
match account {
|
||||
None => bail!("RepoNotFound: Could not find repo for DID: {did}"),
|
||||
Some(account) => {
|
||||
|
@ -39,7 +39,7 @@ async fn inner_upload_blob(
|
||||
.await?;
|
||||
|
||||
if records_for_blob.len() > 0 {
|
||||
actor_store
|
||||
let res = actor_store
|
||||
.blob
|
||||
.verify_blob_and_make_permanent(PreparedBlobRef {
|
||||
cid: blobref.get_cid()?,
|
||||
@ -50,6 +50,7 @@ async fn inner_upload_blob(
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
println!("Debugging make permanent: {:?}", res);
|
||||
}
|
||||
|
||||
Ok(BlobOutput {
|
||||
|
@ -48,6 +48,7 @@ async fn create(
|
||||
body.recovery_key = Some(input_recovery_key.to_owned());
|
||||
}
|
||||
//@TODO: Lookup user by email as well
|
||||
//@TODO: Validate and require invite code
|
||||
|
||||
let secp = Secp256k1::new();
|
||||
let private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX").unwrap();
|
||||
|
@ -1,36 +1,35 @@
|
||||
use std::str::FromStr;
|
||||
use crate::apis::com::atproto::repo::assert_repo_availability;
|
||||
use crate::auth_verifier;
|
||||
use crate::auth_verifier::OptionalAccessOrAdminToken;
|
||||
use anyhow::{Result};
|
||||
use crate::models::{InternalErrorCode, InternalErrorMessageResponse};
|
||||
use crate::repo::aws::s3::S3BlobStore;
|
||||
use crate::repo::ActorStore;
|
||||
use anyhow::Result;
|
||||
use aws_config::SdkConfig;
|
||||
use aws_sdk_s3::primitives::ByteStream as AwsStream;
|
||||
use rocket::response::stream::ByteStream;
|
||||
use libipld::Cid;
|
||||
use rocket::http::Status;
|
||||
use rocket::response::status;
|
||||
use rocket::response::stream::ByteStream;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::State;
|
||||
use crate::apis::com::atproto::repo::assert_repo_availability;
|
||||
use crate::auth_verifier;
|
||||
use crate::models::{InternalErrorCode, InternalErrorMessageResponse};
|
||||
use crate::repo::ActorStore;
|
||||
use crate::repo::aws::s3::S3BlobStore;
|
||||
use std::str::FromStr;
|
||||
|
||||
async fn inner_get_blob(
|
||||
did: String,
|
||||
cid: String,
|
||||
s3_config: &State<SdkConfig>,
|
||||
auth: OptionalAccessOrAdminToken
|
||||
auth: OptionalAccessOrAdminToken,
|
||||
) -> Result<AwsStream> {
|
||||
let _ = assert_repo_availability(
|
||||
&did,
|
||||
auth_verifier::is_user_or_admin(auth.access.unwrap(), &did)
|
||||
).await?;
|
||||
let is_user_or_admin = if let Some(access) = auth.access {
|
||||
auth_verifier::is_user_or_admin(access, &did)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
let _ = assert_repo_availability(&did, is_user_or_admin).await?;
|
||||
|
||||
let cid = Cid::from_str(&cid)?;
|
||||
let actor_store = ActorStore::new(
|
||||
did.clone(),
|
||||
S3BlobStore::new(did.clone(), s3_config),
|
||||
);
|
||||
let actor_store = ActorStore::new(did.clone(), S3BlobStore::new(did.clone(), s3_config));
|
||||
|
||||
let found = actor_store.blob.get_blob(cid).await?;
|
||||
Ok(found.stream)
|
||||
@ -43,23 +42,23 @@ pub async fn get_blob(
|
||||
did: String,
|
||||
cid: String,
|
||||
s3_config: &State<SdkConfig>,
|
||||
auth: OptionalAccessOrAdminToken
|
||||
auth: OptionalAccessOrAdminToken,
|
||||
) -> Result<ByteStream![Vec<u8>], status::Custom<Json<InternalErrorMessageResponse>>> {
|
||||
match inner_get_blob(did, cid, s3_config, auth).await {
|
||||
Ok(mut stream) => {
|
||||
Ok(ByteStream! {
|
||||
while let Some(byte_stream) = stream.next().await {
|
||||
match byte_stream {
|
||||
Ok(byte_stream) => yield byte_stream.to_vec(),
|
||||
Err(e) => {
|
||||
eprintln!("error while streaming: {}", e);
|
||||
break;
|
||||
}
|
||||
Ok(mut stream) => Ok(ByteStream! {
|
||||
while let Some(byte_stream) = stream.next().await {
|
||||
match byte_stream {
|
||||
Ok(byte_stream) => yield byte_stream.to_vec(),
|
||||
Err(e) => {
|
||||
eprintln!("error while streaming: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
},
|
||||
}
|
||||
}),
|
||||
Err(error) => {
|
||||
// @TODO: Need to update error handling to return 404 if we have it but it's in tmp
|
||||
eprintln!("Error: {}", error);
|
||||
let internal_error = InternalErrorMessageResponse {
|
||||
code: Some(InternalErrorCode::InternalError),
|
||||
message: Some(error.to_string()),
|
||||
|
@ -73,6 +73,10 @@ impl TID {
|
||||
pub fn older_than(&self, other: &TID) -> bool {
|
||||
self.compare_to(other) < 0
|
||||
}
|
||||
|
||||
pub fn to_string(self) -> String {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Based on adenosine/adenosine/src/identifiers.rs
|
||||
|
@ -14,8 +14,7 @@ use rocket::http::Status;
|
||||
use rocket::response::status;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::{Request, Response};
|
||||
use rsky_identity::did::did_resolver::DidResolver;
|
||||
use rsky_identity::types::{DidCache, DidResolverOpts, IdentityResolverOpts};
|
||||
use rsky_identity::types::{DidCache, IdentityResolverOpts};
|
||||
use rsky_identity::IdResolver;
|
||||
use rsky_pds::apis::*;
|
||||
use rsky_pds::common::env::env_list;
|
||||
|
@ -89,7 +89,16 @@ pub struct AppPassword {
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Queryable, Identifiable, Selectable, Clone, Debug, PartialEq, Default, Serialize, Deserialize,
|
||||
Queryable,
|
||||
Identifiable,
|
||||
Insertable,
|
||||
Selectable,
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Default,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
)]
|
||||
#[diesel(primary_key(uri, path))]
|
||||
#[diesel(table_name = crate::schema::pds::backlink)]
|
||||
@ -265,7 +274,16 @@ pub struct InviteCodeUse {
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Queryable, Identifiable, Selectable, Clone, Debug, PartialEq, Default, Serialize, Deserialize,
|
||||
Queryable,
|
||||
Identifiable,
|
||||
Insertable,
|
||||
Selectable,
|
||||
Clone,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Default,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
)]
|
||||
#[diesel(primary_key(uri))]
|
||||
#[diesel(table_name = crate::schema::pds::record)]
|
||||
@ -273,6 +291,7 @@ pub struct InviteCodeUse {
|
||||
pub struct Record {
|
||||
pub uri: String,
|
||||
pub cid: String,
|
||||
pub did: String,
|
||||
pub collection: String,
|
||||
pub rkey: String,
|
||||
#[diesel(column_name = repoRev)]
|
||||
|
@ -43,12 +43,12 @@ pub struct ListMissingBlobsOpts {
|
||||
pub struct GetBlobOutput {
|
||||
pub size: i32,
|
||||
pub mime_type: Option<String>,
|
||||
pub stream: ByteStream
|
||||
pub stream: ByteStream,
|
||||
}
|
||||
|
||||
pub struct GetBlobMetadataOutput {
|
||||
pub size: i32,
|
||||
pub mime_type: Option<String>
|
||||
pub mime_type: Option<String>,
|
||||
}
|
||||
|
||||
// Basically handles getting blob records from db
|
||||
@ -59,7 +59,7 @@ impl BlobReader {
|
||||
blobstore,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub async fn get_blob_metadata(&self, cid: Cid) -> Result<GetBlobMetadataOutput> {
|
||||
use crate::schema::pds::blob::dsl as BlobSchema;
|
||||
let conn = &mut establish_connection()?;
|
||||
@ -72,15 +72,15 @@ impl BlobReader {
|
||||
.first(conn)
|
||||
.optional()?;
|
||||
|
||||
match found {
|
||||
match found {
|
||||
None => bail!("Blob not found"),
|
||||
Some(found) => Ok(GetBlobMetadataOutput {
|
||||
size: found.size,
|
||||
mime_type: Some(found.mime_type)
|
||||
})
|
||||
mime_type: Some(found.mime_type),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub async fn get_blob(&self, cid: Cid) -> Result<GetBlobOutput> {
|
||||
let metadata = self.get_blob_metadata(cid).await?;
|
||||
let blob_stream = self.blobstore.get_stream(cid).await?;
|
||||
@ -173,7 +173,7 @@ impl BlobReader {
|
||||
let upsert = sql_query("INSERT INTO pds.blob (cid, did, \"mimeType\", size, \"tempKey\", width, height, \"createdAt\", \"takedownRef\") \
|
||||
VALUES \
|
||||
($1, $2, $3, $4, $5, $6, $7, $8, $9) \
|
||||
ON CONFLICT (cid) DO UPDATE \
|
||||
ON CONFLICT (cid, did) DO UPDATE \
|
||||
SET \"tempKey\" = EXCLUDED.\"tempKey\" \
|
||||
WHERE pds.blob.\"tempKey\" is not null;");
|
||||
upsert
|
||||
|
@ -123,6 +123,7 @@ impl BlockMap {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BlocksAndMissing {
|
||||
pub blocks: BlockMap,
|
||||
pub missing: Vec<Cid>,
|
||||
|
@ -6,22 +6,26 @@ use anyhow::Result;
|
||||
use libipld::Cid;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DataAdd {
|
||||
key: String,
|
||||
cid: Cid,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DataUpdate {
|
||||
key: String,
|
||||
prev: Cid,
|
||||
cid: Cid,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DataDelete {
|
||||
key: String,
|
||||
cid: Cid,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DataDiff {
|
||||
pub adds: BTreeMap<String, DataAdd>,
|
||||
pub updates: BTreeMap<String, DataUpdate>,
|
||||
|
@ -73,6 +73,7 @@ pub struct CommitRecord {
|
||||
record: RepoRecord,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Repo {
|
||||
storage: SqlRepoReader, // get ipld blocks from db
|
||||
data: MST,
|
||||
@ -272,7 +273,7 @@ impl ActorStore {
|
||||
Some(write.record),
|
||||
Some(write.action),
|
||||
rev.clone(),
|
||||
now,
|
||||
Some(now.to_string()),
|
||||
)
|
||||
.await?
|
||||
}
|
||||
@ -284,7 +285,7 @@ impl ActorStore {
|
||||
Some(write.record),
|
||||
Some(write.action),
|
||||
rev.clone(),
|
||||
now,
|
||||
Some(now.to_string()),
|
||||
)
|
||||
.await?
|
||||
}
|
||||
@ -376,7 +377,6 @@ impl Repo {
|
||||
})?;
|
||||
let commit: VersionedCommit = serde_cbor::value::from_value(commit)?;
|
||||
let data = MST::load(storage.clone(), commit.data(), None)?;
|
||||
println!("Loaded repo for did: `{:?}`", commit.did());
|
||||
Ok(Repo::new(
|
||||
storage.clone(),
|
||||
data,
|
||||
@ -544,6 +544,7 @@ impl Repo {
|
||||
|
||||
let data_cid = data.get_pointer()?;
|
||||
let diff = DataDiff::of(&mut data, Some(&mut self.data.clone()))?;
|
||||
|
||||
let mut new_blocks = diff.new_mst_blocks;
|
||||
let mut removed_cids = diff.removed_cids;
|
||||
|
||||
@ -755,10 +756,13 @@ pub async fn prepare_create(opts: PrepareCreateOpts) -> Result<PreparedCreateOrU
|
||||
if validate {
|
||||
assert_valid_record(&record)?;
|
||||
}
|
||||
|
||||
// assert_no_explicit_slurs(rkey, record).await?;
|
||||
let next_rkey = Ticker::new().next(None);
|
||||
let rkey = rkey.unwrap_or(next_rkey.to_string());
|
||||
Ok(PreparedCreateOrUpdate {
|
||||
action: WriteOpAction::Create,
|
||||
uri: make_aturi(did, Some(collection), rkey),
|
||||
uri: make_aturi(did, Some(collection), Some(rkey)),
|
||||
cid: cid_for_safe_record(record.clone()).await?,
|
||||
swap_cid,
|
||||
record: record.clone(),
|
||||
|
@ -11,6 +11,7 @@ use libipld::Cid;
|
||||
use serde_cbor::Value as CborValue;
|
||||
use std::mem;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NodeIter {
|
||||
entries: Vec<NodeEntry>, // Contains the remaining children of a node,
|
||||
// The iterator of the parent node, if present
|
||||
@ -86,6 +87,7 @@ impl Iterator for NodeIter {
|
||||
}
|
||||
|
||||
/// Alternative implementation of iterator
|
||||
#[derive(Debug)]
|
||||
pub struct NodeIterReachable {
|
||||
entries: Vec<NodeEntry>,
|
||||
parent: Option<Box<NodeIterReachable>>,
|
||||
@ -185,7 +187,7 @@ pub struct Leaf {
|
||||
/// Following the Typescript implementation, this is basically a flexible
|
||||
/// "TreeEntry" (aka "leaf") which might also be the "Left" pointer on a
|
||||
/// NodeData (aka "tree").
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum NodeEntry {
|
||||
MST(MST),
|
||||
Leaf(Leaf),
|
||||
@ -258,7 +260,7 @@ pub struct UnstoredBlocks {
|
||||
/// (pointer is defined, no entries have been pulled from block store)
|
||||
///
|
||||
/// MerkleSearchTree values are immutable. Methods return copies with changes.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MST {
|
||||
pub entries: Option<Vec<NodeEntry>>,
|
||||
pub layer: Option<u32>,
|
||||
@ -340,8 +342,11 @@ impl MST {
|
||||
|
||||
// can compute the layer on the first KeySuffix, because
|
||||
// for the first entry that field is a complete key
|
||||
let leaf = &data.e[0];
|
||||
let layer = Some(util::leading_zeros_on_hash(&leaf.k)?);
|
||||
let first_leaf = data.e.get(0);
|
||||
let layer: Option<u32> = match first_leaf {
|
||||
Some(first_leaf) => Some(util::leading_zeros_on_hash(&first_leaf.k)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
self.entries = Some(util::deserialize_node_data(
|
||||
&self.storage,
|
||||
@ -464,13 +469,16 @@ impl MST {
|
||||
key_zeros = util::leading_zeros_on_hash(&key.clone().into_bytes())?;
|
||||
}
|
||||
let layer = self.get_layer()?;
|
||||
|
||||
let new_leaf = Leaf {
|
||||
key: key.clone(),
|
||||
value,
|
||||
};
|
||||
|
||||
return if key_zeros == layer {
|
||||
// it belongs in this layer
|
||||
let index = self.find_gt_or_equal_leaf_index(&key)?;
|
||||
|
||||
let found = self.at_index(index)?;
|
||||
if let Some(NodeEntry::Leaf(l)) = found {
|
||||
if l.key == *key {
|
||||
@ -634,7 +642,7 @@ impl MST {
|
||||
// -------------------
|
||||
|
||||
/// update entry in place
|
||||
pub fn update_entry(&mut self, index: usize, entry: NodeEntry) -> Result<MST> {
|
||||
pub fn update_entry(&mut self, index: isize, entry: NodeEntry) -> Result<MST> {
|
||||
let mut update = Vec::new();
|
||||
for e in self.slice(Some(0), Some(index))? {
|
||||
update.push(e);
|
||||
@ -648,7 +656,7 @@ impl MST {
|
||||
}
|
||||
|
||||
/// remove entry at index
|
||||
pub fn remove_entry(&mut self, index: usize) -> Result<MST> {
|
||||
pub fn remove_entry(&mut self, index: isize) -> Result<MST> {
|
||||
let mut updated = Vec::new();
|
||||
updated.append(&mut self.slice(Some(0), Some(index))?);
|
||||
updated.append(&mut self.slice(Some(index + 1), None)?);
|
||||
@ -671,27 +679,79 @@ impl MST {
|
||||
}
|
||||
|
||||
/// returns entry at index
|
||||
pub fn at_index(&mut self, index: usize) -> Result<Option<NodeEntry>> {
|
||||
pub fn at_index(&mut self, index: isize) -> Result<Option<NodeEntry>> {
|
||||
let entries = self.get_entries()?;
|
||||
Ok(entries.into_iter().nth(index))
|
||||
if index >= 0 {
|
||||
Ok(entries.into_iter().nth(index as usize))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// returns a slice of the node
|
||||
pub fn slice(&mut self, start: Option<usize>, end: Option<usize>) -> Result<Vec<NodeEntry>> {
|
||||
pub fn slice(&mut self, start: Option<isize>, end: Option<isize>) -> Result<Vec<NodeEntry>> {
|
||||
let entries = self.get_entries()?;
|
||||
if start.is_some() && end.is_some() {
|
||||
Ok(entries[start.unwrap()..end.unwrap()].to_vec())
|
||||
} else if start.is_some() && end.is_none() {
|
||||
Ok(entries[start.unwrap()..].to_vec())
|
||||
} else if start.is_none() && end.is_some() {
|
||||
Ok(entries[..end.unwrap()].to_vec())
|
||||
} else {
|
||||
Ok(entries)
|
||||
let entry_len = entries.len() as isize;
|
||||
match (start, end) {
|
||||
(Some(start), Some(end)) => {
|
||||
// Adapted from Javascript Array.prototype.slice()
|
||||
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice
|
||||
let start: usize = if start < 0 && start >= (-1 * entry_len) {
|
||||
(start + entry_len) as usize
|
||||
} else if start < (-1 * entry_len) {
|
||||
0
|
||||
} else if start >= entry_len {
|
||||
return Ok(vec![]);
|
||||
} else {
|
||||
start as usize
|
||||
};
|
||||
|
||||
let end: usize = if end < 0 && end >= (-1 * entry_len) {
|
||||
(end + entry_len) as usize
|
||||
} else if end < (-1 * entry_len) {
|
||||
0
|
||||
} else if end >= entry_len {
|
||||
entries.len()
|
||||
} else if end <= start as isize {
|
||||
return Ok(vec![]);
|
||||
} else {
|
||||
end as usize
|
||||
};
|
||||
|
||||
Ok(entries[start..end].to_vec())
|
||||
}
|
||||
(Some(start), None) => {
|
||||
let start: usize = if start < 0 && start >= (-1 * entry_len) {
|
||||
(start + entry_len) as usize
|
||||
} else if start < (-1 * entry_len) {
|
||||
0
|
||||
} else if start >= entry_len {
|
||||
return Ok(vec![]);
|
||||
} else {
|
||||
start as usize
|
||||
};
|
||||
Ok(entries[start..].to_vec())
|
||||
}
|
||||
(None, Some(end)) => {
|
||||
let end: usize = if end < 0 && end >= (-1 * entry_len) {
|
||||
(end + entry_len) as usize
|
||||
} else if end < (-1 * entry_len) {
|
||||
0
|
||||
} else if end >= entry_len {
|
||||
entries.len()
|
||||
} else if end <= 0 {
|
||||
return Ok(vec![]);
|
||||
} else {
|
||||
end as usize
|
||||
};
|
||||
Ok(entries[..end].to_vec())
|
||||
}
|
||||
(None, None) => Ok(entries),
|
||||
}
|
||||
}
|
||||
|
||||
/// inserts entry at index
|
||||
pub fn splice_in(&mut self, entry: NodeEntry, index: usize) -> Result<MST> {
|
||||
pub fn splice_in(&mut self, entry: NodeEntry, index: isize) -> Result<MST> {
|
||||
let mut update = Vec::new();
|
||||
for e in self.slice(Some(0), Some(index))? {
|
||||
update.push(e.clone());
|
||||
@ -707,7 +767,7 @@ impl MST {
|
||||
/// replaces an entry with [ Some(tree), Leaf, Some(tree) ]
|
||||
pub fn replace_with_split(
|
||||
&mut self,
|
||||
index: usize,
|
||||
index: isize,
|
||||
left: Option<MST>,
|
||||
leaf: Leaf,
|
||||
right: Option<MST>,
|
||||
@ -755,9 +815,13 @@ impl MST {
|
||||
// if the far right of the left side is a subtree,
|
||||
// we need to split it on the key as well
|
||||
let left_len = left_data.len();
|
||||
let last_in_left = left_data.into_iter().nth(left_len - 1);
|
||||
let last_in_left: Option<NodeEntry> = if let [.., last] = left_data.as_slice() {
|
||||
Some(last.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if let Some(NodeEntry::MST(mut last)) = last_in_left {
|
||||
left = left.remove_entry(left_len - 1)?;
|
||||
left = left.remove_entry(left_len as isize - 1)?;
|
||||
let split = last.split_around(key)?;
|
||||
if let Some(s0) = split.0 {
|
||||
left = left.append(NodeEntry::MST(s0.clone()))?;
|
||||
@ -837,7 +901,7 @@ impl MST {
|
||||
// -------------------
|
||||
|
||||
/// finds index of first leaf node that is greater than or equal to the value
|
||||
pub fn find_gt_or_equal_leaf_index(&mut self, key: &String) -> Result<usize> {
|
||||
pub fn find_gt_or_equal_leaf_index(&mut self, key: &String) -> Result<isize> {
|
||||
let entries = self.get_entries()?;
|
||||
let maybe_index = entries
|
||||
.clone()
|
||||
@ -852,9 +916,9 @@ impl MST {
|
||||
.position(|entry| entry.key >= *key);
|
||||
// if we can't find, we're on the end
|
||||
if let Some(i) = maybe_index {
|
||||
Ok(i)
|
||||
Ok(i as isize)
|
||||
} else {
|
||||
Ok(entries.len())
|
||||
Ok(entries.len() as isize)
|
||||
}
|
||||
}
|
||||
|
||||
@ -866,7 +930,7 @@ impl MST {
|
||||
/// controls might stop earlier.
|
||||
pub fn walk_leaves_from(&mut self, key: &String) -> impl Iterator<Item = Leaf> {
|
||||
let mut iter: Vec<Leaf> = Vec::new();
|
||||
let index = self.find_gt_or_equal_leaf_index(key).unwrap();
|
||||
let index = self.find_gt_or_equal_leaf_index(key).unwrap() as usize;
|
||||
let entries = self.get_entries().unwrap();
|
||||
let prev = entries.get(index - 1).unwrap().clone();
|
||||
if let NodeEntry::MST(mut p) = prev {
|
||||
|
@ -8,11 +8,12 @@ use regex::Regex;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::str;
|
||||
|
||||
fn is_valid_chars(input: String) -> bool {
|
||||
// @TODO fix regex
|
||||
fn is_valid_chars(input: &str) -> bool {
|
||||
lazy_static! {
|
||||
static ref RE: Regex = Regex::new(r"^[a-zA-Z0-9_\-:.]*$").unwrap();
|
||||
}
|
||||
RE.is_match(&input)
|
||||
RE.is_match(input)
|
||||
}
|
||||
|
||||
// * Restricted to a subset of ASCII characters — the allowed characters are
|
||||
@ -21,12 +22,13 @@ fn is_valid_chars(input: String) -> bool {
|
||||
// * The specific record key values . and .. are not allowed
|
||||
pub fn is_valid_repo_mst_path(key: &String) -> Result<bool> {
|
||||
let split: Vec<&str> = key.split("/").collect();
|
||||
|
||||
return if key.len() <= 256
|
||||
&& split.len() == 2
|
||||
&& split[0].len() > 0
|
||||
&& split[1].len() > 0
|
||||
&& is_valid_chars(split[0].to_owned())
|
||||
&& is_valid_chars(split[1].to_owned())
|
||||
&& is_valid_chars(split[0])
|
||||
&& is_valid_chars(split[1])
|
||||
{
|
||||
Ok(true)
|
||||
} else {
|
||||
@ -50,10 +52,10 @@ pub fn cid_for_entries(entries: Vec<NodeEntry>) -> Result<Cid> {
|
||||
pub fn count_prefix_len(a: String, b: String) -> Result<usize> {
|
||||
let mut x = 0;
|
||||
for i in 0..a.len() {
|
||||
if a.chars().nth(i).unwrap() != b.chars().nth(i).unwrap() {
|
||||
break;
|
||||
match (a.chars().nth(i), b.chars().nth(i)) {
|
||||
(Some(a), Some(b)) if a == b => x += 1,
|
||||
_ => break,
|
||||
}
|
||||
x += 1;
|
||||
}
|
||||
Ok(x)
|
||||
}
|
||||
@ -71,14 +73,14 @@ pub fn serialize_node_data(entries: Vec<NodeEntry>) -> Result<NodeData> {
|
||||
let mut last_key = "";
|
||||
while i < entries.len() {
|
||||
let leaf = &entries[i];
|
||||
let next = &entries[i + 1];
|
||||
let next = entries.get(i + 1);
|
||||
if !leaf.is_leaf() {
|
||||
return Err(anyhow!("Not a valid node: two subtrees next to each other"));
|
||||
};
|
||||
i += 1;
|
||||
let mut subtree: Option<Cid> = None;
|
||||
match next {
|
||||
NodeEntry::MST(tree) => {
|
||||
Some(NodeEntry::MST(tree)) => {
|
||||
subtree = Some(tree.pointer);
|
||||
i += 1;
|
||||
}
|
||||
@ -89,7 +91,7 @@ pub fn serialize_node_data(entries: Vec<NodeEntry>) -> Result<NodeData> {
|
||||
let prefix_len = count_prefix_len(last_key.to_owned(), l.key.to_owned())?;
|
||||
data.e.push(TreeEntry {
|
||||
p: u8::try_from(prefix_len)?,
|
||||
k: l.key[0..prefix_len].to_owned().into_bytes(),
|
||||
k: l.key[prefix_len..].to_owned().into_bytes(),
|
||||
v: l.value,
|
||||
t: subtree,
|
||||
});
|
||||
|
@ -1,10 +1,10 @@
|
||||
use crate::common;
|
||||
use crate::db::establish_connection;
|
||||
use crate::models::models;
|
||||
use crate::models::{models, Backlink, Record};
|
||||
use crate::repo::types::{Ids, Lex, RepoRecord, WriteOpAction};
|
||||
use crate::repo::util::cbor_to_lex_record;
|
||||
use crate::storage::Ipld;
|
||||
use anyhow::Result;
|
||||
use anyhow::{bail, Result};
|
||||
use diesel::*;
|
||||
use futures::stream::{self, StreamExt};
|
||||
use libipld::Cid;
|
||||
@ -282,11 +282,11 @@ impl RecordReader {
|
||||
|
||||
let res = RecordSchema::record
|
||||
.inner_join(BacklinkSchema::backlink.on(BacklinkSchema::uri.eq(RecordSchema::uri)))
|
||||
.select(models::Record::as_select())
|
||||
.select(Record::as_select())
|
||||
.filter(BacklinkSchema::path.eq(path))
|
||||
.filter(BacklinkSchema::linkTo.eq(link_to))
|
||||
.filter(RecordSchema::collection.eq(collection))
|
||||
.load::<models::Record>(conn)?;
|
||||
.load::<Record>(conn)?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@ -303,9 +303,9 @@ impl RecordReader {
|
||||
.into_iter()
|
||||
.nth(0)
|
||||
.unwrap_or("");
|
||||
let conflicts: Vec<Vec<models::Record>> = stream::iter(record_backlinks)
|
||||
let conflicts: Vec<Vec<Record>> = stream::iter(record_backlinks)
|
||||
.then(|backlink| async move {
|
||||
Ok::<Vec<models::Record>, anyhow::Error>(
|
||||
Ok::<Vec<Record>, anyhow::Error>(
|
||||
self.get_record_backlinks(
|
||||
collection.to_owned(),
|
||||
backlink.path,
|
||||
@ -337,21 +337,110 @@ impl RecordReader {
|
||||
|
||||
pub async fn index_record(
|
||||
&self,
|
||||
_uri: String, // @TODO: Use AtUri
|
||||
_cid: Cid,
|
||||
_record: Option<RepoRecord>,
|
||||
_action: Option<WriteOpAction>, // Create or update with a default of create
|
||||
_repo_rev: String,
|
||||
_timestamp: &str,
|
||||
uri: String, // @TODO: Use AtUri
|
||||
cid: Cid,
|
||||
record: Option<RepoRecord>,
|
||||
action: Option<WriteOpAction>, // Create or update with a default of create
|
||||
repo_rev: String,
|
||||
timestamp: Option<String>,
|
||||
) -> Result<()> {
|
||||
todo!()
|
||||
println!("@LOG DEBUG RecordReader::index_record, indexing record {uri}");
|
||||
let action = action.unwrap_or(WriteOpAction::Create);
|
||||
let uri_without_prefix = uri.replace("at://", "");
|
||||
let parts = uri_without_prefix.split("/").collect::<Vec<&str>>();
|
||||
match (parts.get(0), parts.get(1), parts.get(2)) {
|
||||
(Some(hostname), Some(collection), Some(rkey)) => {
|
||||
let indexed_at = timestamp.unwrap_or_else(|| common::now());
|
||||
let row = Record {
|
||||
did: self.did.clone(),
|
||||
uri: uri.clone(),
|
||||
cid: cid.to_string(),
|
||||
collection: collection.to_string(),
|
||||
rkey: rkey.to_string(),
|
||||
repo_rev: Some(repo_rev.clone()),
|
||||
indexed_at: indexed_at.clone(),
|
||||
takedown_ref: None,
|
||||
};
|
||||
|
||||
if !hostname.starts_with("did:") {
|
||||
bail!("Expected indexed URI to contain DID")
|
||||
} else if collection.len() < 1 {
|
||||
bail!("Expected indexed URI to contain a collection")
|
||||
} else if rkey.len() < 1 {
|
||||
bail!("Expected indexed URI to contain a record key")
|
||||
}
|
||||
|
||||
use crate::schema::pds::record::dsl as RecordSchema;
|
||||
let conn = &mut establish_connection()?;
|
||||
|
||||
// Track current version of record
|
||||
insert_into(RecordSchema::record)
|
||||
.values(row)
|
||||
.on_conflict(RecordSchema::uri)
|
||||
.do_update()
|
||||
.set((
|
||||
RecordSchema::cid.eq(cid.to_string()),
|
||||
RecordSchema::repoRev.eq(&repo_rev),
|
||||
RecordSchema::indexedAt.eq(&indexed_at),
|
||||
))
|
||||
.execute(conn)?;
|
||||
|
||||
if let Some(record) = record {
|
||||
// Maintain backlinks
|
||||
let backlinks = get_backlinks(&uri, &record)?;
|
||||
if let WriteOpAction::Update = action {
|
||||
// On update just recreate backlinks from scratch for the record, so we can clear out
|
||||
// the old ones. E.g. for weird cases like updating a follow to be for a different did.
|
||||
self.remove_backlinks_by_uri(&uri).await?;
|
||||
}
|
||||
self.add_backlinks(backlinks).await?;
|
||||
}
|
||||
println!("@LOG DEBUG RecordReader::index_record, indexed record {uri}");
|
||||
Ok(())
|
||||
}
|
||||
_ => bail!("Issue parsing uri: {uri}"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_record(
|
||||
&self,
|
||||
_uri: String, // @TODO: Use AtUri
|
||||
uri: String, // @TODO: Use AtUri
|
||||
) -> Result<()> {
|
||||
todo!()
|
||||
println!("@LOG DEBUG RecordReader::delete_record, deleting indexed record {uri}");
|
||||
use crate::schema::pds::backlink::dsl as BacklinkSchema;
|
||||
use crate::schema::pds::record::dsl as RecordSchema;
|
||||
let conn = &mut establish_connection()?;
|
||||
delete(RecordSchema::record)
|
||||
.filter(RecordSchema::uri.eq(&uri))
|
||||
.execute(conn)?;
|
||||
delete(BacklinkSchema::backlink)
|
||||
.filter(BacklinkSchema::uri.eq(&uri))
|
||||
.execute(conn)?;
|
||||
println!("@LOG DEBUG RecordReader::delete_record, deleted indexed record {uri}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_backlinks_by_uri(&self, uri: &String) -> Result<()> {
|
||||
use crate::schema::pds::backlink::dsl as BacklinkSchema;
|
||||
let conn = &mut establish_connection()?;
|
||||
delete(BacklinkSchema::backlink)
|
||||
.filter(BacklinkSchema::uri.eq(uri))
|
||||
.execute(conn)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_backlinks(&self, backlinks: Vec<Backlink>) -> Result<()> {
|
||||
if backlinks.len() == 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
use crate::schema::pds::backlink::dsl as BacklinkSchema;
|
||||
let conn = &mut establish_connection()?;
|
||||
insert_into(BacklinkSchema::backlink)
|
||||
.values(&backlinks)
|
||||
.on_conflict_do_nothing()
|
||||
.execute(conn)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_record_takedown_status(
|
||||
|
@ -39,6 +39,7 @@ pub struct LegacyV2Commit {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum VersionedCommit {
|
||||
Commit(Commit),
|
||||
LegacyV2Commit(LegacyV2Commit),
|
||||
@ -68,6 +69,7 @@ impl VersionedCommit {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum Lex {
|
||||
Ipld(Ipld),
|
||||
Blob(BlobRef),
|
||||
@ -214,6 +216,7 @@ pub fn delete_write_to_op(write: PreparedDelete) -> RecordWriteOp {
|
||||
}
|
||||
|
||||
pub fn write_to_op(write: PreparedWrite) -> RecordWriteOp {
|
||||
println!("Write: {write:?}");
|
||||
match write {
|
||||
PreparedWrite::Create(c) => create_write_to_op(c),
|
||||
PreparedWrite::Update(u) => update_write_to_op(u),
|
||||
|
@ -6,7 +6,7 @@ use anyhow::{bail, Result};
|
||||
use secp256k1::Keypair;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::str::FromStr;
|
||||
|
||||
pub fn sign_commit(unsigned: UnsignedCommit, keypair: Keypair) -> Result<Commit> {
|
||||
@ -21,8 +21,8 @@ pub fn sign_commit(unsigned: UnsignedCommit, keypair: Keypair) -> Result<Commit>
|
||||
})
|
||||
}
|
||||
|
||||
pub fn format_data_key<T: FromStr + Debug>(collection: T, rkey: T) -> String {
|
||||
format!("{collection:?}/{rkey:?}")
|
||||
pub fn format_data_key<T: FromStr + Display>(collection: T, rkey: T) -> String {
|
||||
format!("{collection}/{rkey}")
|
||||
}
|
||||
|
||||
pub fn lex_to_ipld(val: Lex) -> Ipld {
|
||||
|
@ -51,7 +51,7 @@ pub mod pds {
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
pds.blob (cid) {
|
||||
pds.blob (cid, did) {
|
||||
cid -> Varchar,
|
||||
did -> Varchar,
|
||||
mimeType -> Varchar,
|
||||
@ -132,7 +132,7 @@ pub mod pds {
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
pds.repo_block (cid) {
|
||||
pds.repo_block (cid, did) {
|
||||
cid -> Varchar,
|
||||
did -> Varchar,
|
||||
repoRev -> Varchar,
|
||||
|
@ -23,6 +23,7 @@ use std::str::FromStr;
|
||||
|
||||
/// Ipld
|
||||
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum Ipld {
|
||||
/// Represents a Json Value
|
||||
Json(JsonValue),
|
||||
@ -34,6 +35,8 @@ pub enum Ipld {
|
||||
Map(BTreeMap<String, Ipld>),
|
||||
/// Represents a Cid.
|
||||
Link(Cid),
|
||||
/// String
|
||||
String(String),
|
||||
}
|
||||
|
||||
impl Encode<DagCborCodec> for Ipld {
|
||||
@ -57,6 +60,7 @@ impl Encode<DagCborCodec> for Ipld {
|
||||
Self::List(l) => l.encode(c, w),
|
||||
Self::Map(m) => m.encode(c, w),
|
||||
Self::Link(cid) => cid.encode(c, w),
|
||||
Self::String(s) => s.encode(c, w),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -73,7 +77,7 @@ pub struct CidAndRev {
|
||||
pub rev: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SqlRepoReader {
|
||||
pub cache: BlockMap,
|
||||
pub blocks: BlockMap,
|
||||
|
Reference in New Issue
Block a user