Improve MST performance around memory allocation (to_vec(), clone(), etc.); Begin unit test for diffs

This commit is contained in:
Rudy Fraser
2025-01-07 18:00:04 -05:00
parent 1fa2032b45
commit fbe0bcfa01
6 changed files with 374 additions and 157 deletions

View File

@ -2,8 +2,6 @@
Rust implementation of an atproto PDS.
[![Crate](https://img.shields.io/crates/v/rsky-pds?logo=rust&style=flat-square&logoColor=E05D44&color=E05D44)](https://crates.io/crates/rsky-pds)
## License
rsky is released under the [Apache License 2.0](../LICENSE).

View File

@ -6,26 +6,26 @@ use anyhow::Result;
use lexicon_cid::Cid;
use std::collections::BTreeMap;
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct DataAdd {
key: String,
cid: Cid,
pub(crate) key: String,
pub(crate) cid: Cid,
}
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct DataUpdate {
key: String,
prev: Cid,
cid: Cid,
pub(crate) key: String,
pub(crate) prev: Cid,
pub(crate) cid: Cid,
}
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct DataDelete {
key: String,
cid: Cid,
pub(crate) key: String,
pub(crate) cid: Cid,
}
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct DataDiff {
pub adds: BTreeMap<String, DataAdd>,
pub updates: BTreeMap<String, DataUpdate>,
@ -149,4 +149,16 @@ impl DataDiff {
}
Ok(())
}
pub fn add_list(&self) -> Vec<DataAdd> {
self.adds.values().cloned().collect()
}
pub fn update_list(&self) -> Vec<DataUpdate> {
self.updates.values().cloned().collect()
}
pub fn delete_list(&self) -> Vec<DataDelete> {
self.deletes.values().cloned().collect()
}
}

View File

@ -43,8 +43,8 @@ pub fn mst_diff(curr: &mut MST, prev: Option<&mut MST>) -> Result<DataDiff> {
WalkerStatus::WalkerStatusProgress(ref l),
WalkerStatus::WalkerStatusProgress(ref r),
) => {
let left = l.curr.clone();
let right = r.curr.clone();
let mut left = l.curr.clone();
let mut right = r.curr.clone();
// if both pointers are leaves, record an update & advance both or record
// the lowest key and advance that pointer
@ -96,9 +96,10 @@ pub fn mst_diff(curr: &mut MST, prev: Option<&mut MST>) -> Result<DataDiff> {
// if we're on the same level, and both pointers are trees, do a comparison
// if they're the same, step over. if they're different, step in to
// find the subdiff
if let (NodeEntry::MST(left_tree), NodeEntry::MST(right_tree)) = (&left, &right)
if let (NodeEntry::MST(left_tree), NodeEntry::MST(right_tree)) =
(&mut left, &mut right)
{
if left_tree.pointer.eq(&right_tree.pointer) {
if left_tree.get_pointer()?.eq(&right_tree.get_pointer()?) {
left_walker.step_over()?;
right_walker.step_over()?;
} else {

View File

@ -20,10 +20,10 @@ use crate::repo::error::DataStoreError;
use crate::repo::parse;
use crate::repo::types::{BlockWriter, CidAndBytes};
use crate::storage::{ObjAndBytes, SqlRepoReader};
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, Result};
use lexicon_cid::Cid;
use serde_cbor::Value as CborValue;
use std::mem;
use std::{fmt, mem};
#[derive(Debug)]
pub struct NodeIter {
@ -58,9 +58,8 @@ impl Iterator for NodeIter {
None => {
match self.this {
Some(NodeEntry::MST(_)) => {
let this = self.this.clone().unwrap();
self.this = None;
Some(this)
let this = mem::replace(&mut self.this, None);
Some(this.unwrap())
}
_ => {
match self.parent.take() {
@ -78,19 +77,25 @@ impl Iterator for NodeIter {
// If it is a NodeEntry::Leaf, we return its content.
Some(NodeEntry::Leaf(_)) => {
let leaf = self.entries.get(0).unwrap().clone();
self.entries = self.entries[1..].to_vec();
self.entries.drain(..1);
Some(leaf)
}
// If it is a NodeEntry::MST, we create a new iterator for the child entries.
// The parent field is set to self, and self is replaced with the newly created iterator
Some(NodeEntry::MST(ref mst)) => {
let mut subtree = mst.clone();
let this = self.entries.get(0).unwrap().clone();
self.entries = self.entries[1..].to_vec();
Some(NodeEntry::MST(_)) => {
let mut this = self.entries.get(0).unwrap().clone();
self.entries.drain(..1);
let entries = if let NodeEntry::MST(ref mut subtree) = this {
match subtree.get_entries() {
Err(_) => vec![],
Ok(e) => e.to_vec(),
}
} else {
vec![]
};
// start iterating the child trees
*self = NodeIter {
entries: subtree.get_entries().unwrap_or(vec![]),
entries,
parent: Some(Box::new(mem::take(self))),
this: Some(this),
};
@ -124,10 +129,9 @@ impl Iterator for NodeIterReachable {
fn next(&mut self) -> Option<Self::Item> {
match self.entries.get(0) {
None => match self.this {
Some(NodeEntry::MST(ref t)) => {
let this = NodeEntry::MST(t.clone());
self.this = None;
Some(Ok(this))
Some(NodeEntry::MST(_)) => {
let this = mem::replace(&mut self.this, None);
Some(Ok(this.unwrap()))
}
_ => match self.parent.take() {
Some(parent) => {
@ -139,13 +143,13 @@ impl Iterator for NodeIterReachable {
},
Some(NodeEntry::Leaf(_)) => {
let leaf = self.entries.get(0).unwrap().clone();
self.entries = self.entries[1..].to_vec();
self.entries.drain(..1);
Some(Ok(leaf))
}
Some(NodeEntry::MST(_)) => {
let this = self.entries.get(0).unwrap().clone();
self.entries = self.entries[1..].to_vec();
let entries = if let NodeEntry::MST(mut r) = this.clone() {
let mut this = self.entries.get(0).unwrap().clone();
self.entries.drain(..1);
let entries = if let NodeEntry::MST(ref mut r) = this {
r.get_entries()
} else {
Err(anyhow::Error::new(DataStoreError::MissingBlock(
@ -161,9 +165,9 @@ impl Iterator for NodeIterReachable {
}
_ => {
*self = NodeIterReachable {
entries: entries.unwrap(),
entries: entries.unwrap().to_vec(),
parent: Some(Box::new(mem::take(self))),
this: Some(this.clone()),
this: Some(this),
};
self.next()
}
@ -192,7 +196,7 @@ impl Iterator for NodeIterReachable {
* and the second will be described as `prefix: 16, key: 'hi'.`
*/
/// treeEntry are elements of nodeData's Entries.
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
#[derive(PartialEq, Clone, Deserialize, Serialize)]
pub struct TreeEntry {
pub p: u8, // count of characters shared with previous path/key in tree
#[serde(with = "serde_bytes")]
@ -201,20 +205,49 @@ pub struct TreeEntry {
pub t: Option<Cid>, // [optional] pointer to lower-level subtree to the "right" of this path/key entry
}
impl fmt::Debug for TreeEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TreeEntry")
.field("p", &self.p)
.field("k", &self.k)
.field("v", &self.v.to_string())
.field("t", &self.t)
.finish()
}
}
/// MST tree node as gets serialized to CBOR. Note that the CBOR fields are all
/// single-character.
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
#[derive(PartialEq, Clone, Deserialize, Serialize)]
pub struct NodeData {
pub l: Option<Cid>, // [optional] pointer to lower-level subtree to the "left" of this path/key
pub e: Vec<TreeEntry>, // ordered list of entries at this node
}
#[derive(Debug, Deserialize, Serialize, Clone)]
impl fmt::Debug for NodeData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TreeEntry")
.field("l", &self.l)
.field("e", &self.e)
.finish()
}
}
#[derive(Deserialize, Serialize, Clone)]
pub struct Leaf {
pub key: String, // record key
pub value: Cid,
}
impl fmt::Debug for Leaf {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TreeEntry")
.field("key", &self.key)
.field("value", &self.value.to_string())
.finish()
}
}
impl PartialEq for Leaf {
fn eq(&self, other: &Self) -> bool {
self.key == other.key && self.value == other.value
@ -302,8 +335,8 @@ impl PartialEq for NodeEntry {
(NodeEntry::Leaf(_), NodeEntry::MST(_)) => false,
(NodeEntry::MST(_), NodeEntry::Leaf(_)) => false,
(NodeEntry::MST(this), NodeEntry::MST(other)) => {
let this_pointer = this.clone().get_pointer().unwrap();
let other_pointer = other.clone().get_pointer().unwrap();
let this_pointer = this.get_pointer_ref().unwrap();
let other_pointer = other.get_pointer_ref().unwrap();
this_pointer == other_pointer
}
}
@ -315,8 +348,8 @@ impl PartialEq<MST> for NodeEntry {
match self {
NodeEntry::Leaf(_) => false,
NodeEntry::MST(this) => {
let this_pointer = this.clone().get_pointer().unwrap();
let other_pointer = other.clone().get_pointer().unwrap();
let this_pointer = this.get_pointer_ref().unwrap();
let other_pointer = other.get_pointer_ref().unwrap();
this_pointer == other_pointer
}
}
@ -354,7 +387,7 @@ pub struct UnstoredBlocks {
/// (pointer is defined, no entries have been pulled from block store)
///
/// MerkleSearchTree values are immutable. Methods return copies with changes.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct MST {
pub entries: Option<Vec<NodeEntry>>,
pub layer: Option<u32>,
@ -363,6 +396,17 @@ pub struct MST {
pub storage: SqlRepoReader,
}
impl fmt::Debug for MST {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MST")
.field("entries", &self.entries)
.field("layer", &self.layer)
.field("pointer", &self.pointer.to_string())
.field("outdated_pointer", &self.outdated_pointer)
.finish()
}
}
impl MST {
pub fn new(
storage: SqlRepoReader,
@ -385,12 +429,12 @@ impl MST {
layer: Option<u32>,
) -> Result<MST> {
let entries = entries.unwrap_or(Vec::new());
let pointer = util::cid_for_entries(entries.clone())?;
let pointer = util::cid_for_entries(entries.as_slice())?;
Ok(MST::new(storage, pointer, Some(entries), layer))
}
pub fn from_data(storage: SqlRepoReader, data: NodeData, layer: Option<u32>) -> Result<MST> {
let entries = util::deserialize_node_data(&storage, data.clone(), layer)?;
pub fn from_data(storage: SqlRepoReader, data: &NodeData, layer: Option<u32>) -> Result<MST> {
let entries = util::deserialize_node_data(&storage, data, layer)?;
let pointer = ipld::cid_for_cbor(&data)?;
Ok(MST::new(storage, pointer, Some(entries), layer))
}
@ -419,39 +463,58 @@ impl MST {
// === "Getters (lazy load)" ===
/// "We don't want to load entries of every subtree, just the ones we need"
pub fn get_entries(&mut self) -> Result<Vec<NodeEntry>> {
// if we are "hydrated", entries are available
if let Some(entries) = self.entries.clone() {
return Ok(entries);
};
// otherwise this is a virtual/pointer struct, and we need to hydrate from
// block store before returning entries
let data: CborValue = self.storage.read_obj(&self.pointer, |obj: &CborValue| {
match serde_cbor::value::from_value::<NodeData>(obj.clone()) {
Ok(_) => true,
Err(_) => false,
}
})?;
let data: NodeData = serde_cbor::value::from_value(data)?;
pub fn get_entries(&mut self) -> Result<&mut [NodeEntry]> {
// If `self.entries` is not populated, hydrate it first
if self.entries.is_none() {
// Read from storage (block store) to get the node data
let data: CborValue = self.storage.read_obj(&self.pointer, |obj: &CborValue| {
match serde_cbor::value::from_value::<NodeData>(obj.clone()) {
Ok(_) => true,
Err(_) => false,
}
})?;
let data: NodeData = serde_cbor::value::from_value(data)?;
// can compute the layer on the first KeySuffix, because
// for the first entry that field is a complete key
let first_leaf = data.e.get(0);
let layer: Option<u32> = match first_leaf {
Some(first_leaf) => Some(util::leading_zeros_on_hash(&first_leaf.k)?),
None => None,
};
// Compute the layer
let first_leaf = data.e.get(0);
let layer = match first_leaf {
Some(first_leaf) => Some(util::leading_zeros_on_hash(&first_leaf.k)?),
None => None,
};
self.entries = Some(util::deserialize_node_data(
&self.storage,
data.clone(),
layer,
)?);
// Deserialize into self.entries
self.entries = Some(util::deserialize_node_data(&self.storage, &data, layer)?);
}
if let Some(entries) = self.entries.clone() {
Ok(entries)
} else {
bail!("No entries")
// Now, unwrapping is safe because we either already had entries
// or we just set them above
Ok(self.entries.as_mut().ok_or_else(|| anyhow::anyhow!("No entries"))?)
}
pub fn get_entries_ref(&self) -> Result<Vec<NodeEntry>> {
// If `self.entries` is not populated, hydrate it first
return match self.entries {
None => {
// Read from storage (block store) to get the node data
let data: CborValue = self.storage.clone().read_obj(&self.pointer, |obj: &CborValue| {
match serde_cbor::value::from_value::<NodeData>(obj.clone()) {
Ok(_) => true,
Err(_) => false,
}
})?;
let data: NodeData = serde_cbor::value::from_value(data)?;
// Compute the layer
let first_leaf = data.e.get(0);
let layer = match first_leaf {
Some(first_leaf) => Some(util::leading_zeros_on_hash(&first_leaf.k)?),
None => None,
};
// Deserialize into self.entries
util::deserialize_node_data(&self.storage, &data, layer)
},
Some(ref entries) => Ok(entries.clone())
}
}
@ -467,10 +530,10 @@ impl MST {
pub fn serialize(&mut self) -> Result<CidAndBytes> {
let mut entries = self.get_entries()?;
let mut outdated: Vec<&mut MST> = entries
.iter_mut()
let mut outdated: Vec<MST> = entries
.iter()
.filter_map(|e| match e {
NodeEntry::MST(e) if e.outdated_pointer => Some(e),
NodeEntry::MST(ref e) if e.outdated_pointer => Some(e.clone()),
_ => None,
})
.collect::<Vec<_>>();
@ -489,6 +552,38 @@ impl MST {
})
}
pub fn get_pointer_ref(&self) -> Result<Cid> {
if !self.outdated_pointer {
return Ok(self.pointer);
}
let CidAndBytes { cid, .. } = self.serialize_ref()?;
Ok(cid)
}
pub fn serialize_ref(&self) -> Result<CidAndBytes> {
let mut entries = self.get_entries_ref()?;
let mut outdated: Vec<MST> = entries
.iter()
.filter_map(|e| match e {
NodeEntry::MST(ref e) if e.outdated_pointer => Some(e.clone()),
_ => None,
})
.collect::<Vec<_>>();
if outdated.len() > 0 {
let _ = outdated
.iter_mut()
.map(|e| e.get_pointer())
.collect::<Result<Vec<Cid>>>()?;
entries = self.get_entries_ref()?;
}
let data = util::serialize_node_data(entries.as_slice())?;
Ok(CidAndBytes {
cid: ipld::cid_for_cbor(&data)?,
bytes: common::struct_to_cbor(data)?,
})
}
/// In most cases, we get the layer of a node from a hint on creation
/// In the case of the topmost node in the tree, we look for a key in the node & determine the layer
/// In the case where we don't find one, we recurse down until we do.
@ -506,10 +601,10 @@ impl MST {
return Ok(self.layer);
};
let entries = self.get_entries()?;
let mut layer = util::layer_for_entries(entries.clone())?;
let mut layer = util::layer_for_entries(entries)?;
if layer.is_none() {
for entry in entries {
if let NodeEntry::MST(mut tree) = entry {
for entry in entries.iter_mut() {
if let NodeEntry::MST(ref mut tree) = entry {
let child_layer = tree.attempt_get_layer()?;
if let Some(c) = child_layer {
layer = Some(c + 1);
@ -538,10 +633,10 @@ impl MST {
});
}
let entries = self.get_entries()?;
let data = util::serialize_node_data(entries.clone())?;
let data = util::serialize_node_data(entries)?;
let _ = blocks.add(data)?;
for entry in entries {
if let NodeEntry::MST(mut e) = entry {
for entry in entries.iter_mut() {
if let NodeEntry::MST(ref mut e) = entry {
let subtree = e.get_unstored_blocks()?;
blocks.add_map(subtree.blocks)?;
}
@ -554,18 +649,18 @@ impl MST {
/// Adds a new leaf for the given key/value pair
/// Throws if a leaf with that key already exists
pub fn add(&mut self, key: &String, value: Cid, known_zeros: Option<u32>) -> Result<MST> {
pub fn add(&mut self, key: &str, value: Cid, known_zeros: Option<u32>) -> Result<MST> {
util::ensure_valid_mst_key(&key)?;
let key_zeros: u32;
if let Some(z) = known_zeros {
key_zeros = z;
} else {
key_zeros = util::leading_zeros_on_hash(&key.clone().into_bytes())?;
key_zeros = util::leading_zeros_on_hash(key.as_bytes())?;
}
let layer = self.get_layer()?;
let new_leaf = Leaf {
key: key.clone(),
key: key.to_string(),
value,
};
@ -633,14 +728,14 @@ impl MST {
}
let mut updated: Vec<NodeEntry> = Vec::new();
if let Some(l) = left {
updated.push(NodeEntry::MST(l.clone()));
updated.push(NodeEntry::MST(l));
}
updated.push(NodeEntry::Leaf(Leaf {
key: key.clone(),
key: key.to_string(),
value,
}));
if let Some(r) = right {
updated.push(NodeEntry::MST(r.clone()));
updated.push(NodeEntry::MST(r));
}
let mut new_root = MST::create(self.storage.clone(), Some(updated), Some(key_zeros))?;
new_root.outdated_pointer = true;
@ -666,7 +761,7 @@ impl MST {
/// Edits the value at the given key
/// Throws if the given key does not exist
pub fn update(&mut self, key: &String, value: Cid) -> Result<MST> {
pub fn update(&mut self, key: &str, value: Cid) -> Result<MST> {
util::ensure_valid_mst_key(key)?;
let index = self.find_gt_or_equal_leaf_index(key)?;
let found = self.at_index(index)?;
@ -675,7 +770,7 @@ impl MST {
return self.update_entry(
index,
NodeEntry::Leaf(Leaf {
key: key.clone(),
key: key.to_string(),
value,
}),
);
@ -684,7 +779,7 @@ impl MST {
let prev = self.at_index(index - 1)?;
if let Some(NodeEntry::MST(mut p)) = prev {
let updated_tree = p.update(key, value)?;
return self.update_entry(index - 1, NodeEntry::MST(updated_tree.clone()));
return self.update_entry(index - 1, NodeEntry::MST(updated_tree));
}
Err(anyhow!("Could not find a record with key: {}", key))
}
@ -692,7 +787,7 @@ impl MST {
/// Deletes the value at the given key
pub fn delete(&mut self, key: &String) -> Result<MST> {
let altered = self.delete_recurse(key)?;
Ok(altered.clone().trim_top()?)
Ok(altered.trim_top()?)
}
pub fn delete_recurse(&mut self, key: &String) -> Result<MST> {
@ -707,9 +802,10 @@ impl MST {
(Some(NodeEntry::MST(mut p)), Some(NodeEntry::MST(n))) => {
let merged = p.append_merge(n)?;
let mut new_tree_entries: Vec<NodeEntry> = Vec::new();
new_tree_entries.append(&mut self.slice(Some(0), Some(index - 1))?);
new_tree_entries
.append(&mut self.slice(Some(0), Some(index - 1))?.to_vec());
new_tree_entries.push(NodeEntry::MST(merged));
new_tree_entries.append(&mut self.slice(Some(index + 2), None)?);
new_tree_entries.append(&mut self.slice(Some(index + 2), None)?.to_vec());
self.new_tree(new_tree_entries)
}
(_, _) => self.remove_entry(index),
@ -737,12 +833,12 @@ impl MST {
/// update entry in place
pub fn update_entry(&mut self, index: isize, entry: NodeEntry) -> Result<MST> {
let mut update = Vec::new();
for e in self.slice(Some(0), Some(index))? {
for e in self.slice(Some(0), Some(index))?.to_vec() {
update.push(e);
}
update.push(entry);
for e in self.slice(Some(index + 1), None)? {
update.push(e.clone());
for e in self.slice(Some(index + 1), None)?.to_vec() {
update.push(e);
}
Ok(self.new_tree(update)?)
}
@ -750,22 +846,22 @@ impl MST {
/// remove entry at index
pub fn remove_entry(&mut self, index: isize) -> Result<MST> {
let mut updated = Vec::new();
updated.append(&mut self.slice(Some(0), Some(index))?);
updated.append(&mut self.slice(Some(index + 1), None)?);
updated.append(&mut self.slice(Some(0), Some(index))?.to_vec());
updated.append(&mut self.slice(Some(index + 1), None)?.to_vec());
Ok(self.new_tree(updated)?)
}
/// append entry to end of the node
/// append entry to end of the node / Vec is allowed here.
pub fn append(&mut self, entry: NodeEntry) -> Result<MST> {
let mut entries = self.get_entries()?;
let mut entries = self.get_entries()?.to_vec();
entries.push(entry);
Ok(self.new_tree(entries)?)
}
/// prepend entry to end of the node
pub fn prepend(&mut self, entry: NodeEntry) -> Result<MST> {
let mut entries = self.get_entries()?;
let mut entries = self.get_entries()?.to_vec();
entries.splice(0..0, vec![entry]);
Ok(self.new_tree(entries)?)
}
@ -774,14 +870,17 @@ impl MST {
pub fn at_index(&mut self, index: isize) -> Result<Option<NodeEntry>> {
let entries = self.get_entries()?;
if index >= 0 {
Ok(entries.into_iter().nth(index as usize))
Ok(entries
.into_iter()
.nth(index as usize)
.map(|entry| entry.clone()))
} else {
Ok(None)
}
}
/// returns a slice of the node
pub fn slice(&mut self, start: Option<isize>, end: Option<isize>) -> Result<Vec<NodeEntry>> {
pub fn slice(&mut self, start: Option<isize>, end: Option<isize>) -> Result<&[NodeEntry]> {
let entries = self.get_entries()?;
let entry_len = entries.len() as isize;
match (start, end) {
@ -793,7 +892,7 @@ impl MST {
} else if start < (-1 * entry_len) {
0
} else if start >= entry_len {
return Ok(vec![]);
return Ok(&entries[0..0]);
} else {
start as usize
};
@ -805,12 +904,12 @@ impl MST {
} else if end >= entry_len {
entries.len()
} else if end <= start as isize {
return Ok(vec![]);
return Ok(&entries[0..0]);
} else {
end as usize
};
Ok(entries[start..end].to_vec())
Ok(&entries[start..end])
}
(Some(start), None) => {
let start: usize = if start < 0 && start >= (-1 * entry_len) {
@ -818,11 +917,11 @@ impl MST {
} else if start < (-1 * entry_len) {
0
} else if start >= entry_len {
return Ok(vec![]);
return Ok(&entries[0..0]);
} else {
start as usize
};
Ok(entries[start..].to_vec())
Ok(&entries[start..])
}
(None, Some(end)) => {
let end: usize = if end < 0 && end >= (-1 * entry_len) {
@ -832,11 +931,11 @@ impl MST {
} else if end >= entry_len {
entries.len()
} else if end <= 0 {
return Ok(vec![]);
return Ok(&entries[0..0]);
} else {
end as usize
};
Ok(entries[..end].to_vec())
Ok(&entries[..end])
}
(None, None) => Ok(entries),
}
@ -845,11 +944,11 @@ impl MST {
/// inserts entry at index
pub fn splice_in(&mut self, entry: NodeEntry, index: isize) -> Result<MST> {
let mut update = Vec::new();
for e in self.slice(Some(0), Some(index))? {
for e in self.slice(Some(0), Some(index))?.to_vec() {
update.push(e);
}
update.push(entry);
for e in self.slice(Some(index), None)? {
for e in self.slice(Some(index), None)?.to_vec() {
update.push(e);
}
self.new_tree(update)
@ -866,11 +965,11 @@ impl MST {
let update = self.slice(Some(0), Some(index))?;
let mut update = update.to_vec();
if let Some(l) = left {
update.push(NodeEntry::MST(l.clone()));
update.push(NodeEntry::MST(l));
}
update.push(NodeEntry::Leaf(leaf));
if let Some(r) = right {
update.push(NodeEntry::MST(r.clone()));
update.push(NodeEntry::MST(r));
}
let remainder = self.slice(Some(index + 1), None)?;
let remainder = &mut remainder.to_vec();
@ -883,11 +982,11 @@ impl MST {
let entries = self.get_entries()?;
return if entries.len() == 1 {
match entries.into_iter().nth(0) {
Some(NodeEntry::MST(n)) => Ok(n.trim_top()?),
_ => Ok(self.clone()),
Some(NodeEntry::MST(n)) => Ok(n.clone().trim_top()?),
_ => Ok(self),
}
} else {
Ok(self.clone())
Ok(self)
};
}
@ -895,11 +994,17 @@ impl MST {
// -------------------
/// Recursively splits a subtree around a given key
pub fn split_around(&mut self, key: &String) -> Result<(Option<MST>, Option<MST>)> {
pub fn split_around(&mut self, key: &str) -> Result<(Option<MST>, Option<MST>)> {
let index = self.find_gt_or_equal_leaf_index(key)?;
// split tree around key
let left_data = self.slice(Some(0), Some(index))?;
let right_data = self.slice(Some(index), None)?;
let left_data = {
let tmp = self.slice(Some(0), Some(index))?;
tmp.to_vec()
};
let right_data = {
let tmp = self.slice(Some(index), None)?;
tmp.to_vec()
};
let mut left = self.new_tree(left_data.clone())?;
let mut right = self.new_tree(right_data)?;
@ -915,10 +1020,10 @@ impl MST {
left = left.remove_entry(left_len as isize - 1)?;
let split = last.split_around(key)?;
if let Some(s0) = split.0 {
left = left.append(NodeEntry::MST(s0.clone()))?;
left = left.append(NodeEntry::MST(s0))?;
}
if let Some(s1) = split.1 {
right = right.prepend(NodeEntry::MST(s1.clone()))?;
right = right.prepend(NodeEntry::MST(s1))?;
}
}
@ -943,26 +1048,27 @@ impl MST {
"Trying to merge two nodes from different layers of the MST"
));
}
let mut self_entries = self.get_entries()?;
let mut to_merge_entries = to_merge.get_entries()?;
let mut self_entries = self.get_entries()?.to_vec();
let mut to_merge_entries = to_merge.get_entries()?.to_vec();
let last_in_left = self_entries.last();
let first_in_right = to_merge_entries.first();
let mut new_tree_entries: Vec<NodeEntry> = Vec::new();
return match (last_in_left, first_in_right) {
match (last_in_left, first_in_right) {
(Some(NodeEntry::MST(l)), Some(NodeEntry::MST(r))) => {
let mut new_l = l.clone();
let merged = new_l.append_merge(r.clone())?;
new_tree_entries.append(&mut self_entries[0..self_entries.len() - 1].to_vec());
self_entries.pop();
new_tree_entries.append(&mut self_entries);
new_tree_entries.push(NodeEntry::MST(merged));
new_tree_entries.append(&mut to_merge_entries[1..].to_vec());
self.new_tree(new_tree_entries)
to_merge_entries.remove(0);
new_tree_entries.append(&mut to_merge_entries);
}
(_, _) => {
new_tree_entries.append(&mut self_entries);
new_tree_entries.append(&mut to_merge_entries);
self.new_tree(new_tree_entries)
}
};
self.new_tree(new_tree_entries)
}
// Create relatives
@ -977,7 +1083,7 @@ impl MST {
let layer = self.get_layer()?;
let mut parent = MST::create(
self.storage.clone(),
Some(vec![NodeEntry::MST(self.clone())]),
Some(vec![NodeEntry::MST(self)]),
Some(layer + 1),
)?;
parent.outdated_pointer = true;
@ -988,11 +1094,11 @@ impl MST {
// -------------------
/// finds index of first leaf node that is greater than or equal to the value
pub fn find_gt_or_equal_leaf_index(&mut self, key: &String) -> Result<isize> {
pub fn find_gt_or_equal_leaf_index(&mut self, key: &str) -> Result<isize> {
let entries = self.get_entries()?;
let maybe_index = entries.iter().position(|entry| match entry {
NodeEntry::MST(_) => false,
NodeEntry::Leaf(entry) => entry.key >= *key,
NodeEntry::Leaf(entry) => entry.key >= key.to_string(),
});
// if we can't find, we're on the end
if let Some(i) = maybe_index {
@ -1196,7 +1302,7 @@ impl MST {
bytes: found.bytes,
});
let node_data: NodeData = serde_cbor::value::from_value(found.obj)?;
let entries = util::deserialize_node_data(&self.storage, node_data.clone(), None)?;
let entries = util::deserialize_node_data(&self.storage, &node_data, None)?;
for entry in entries {
match entry {
@ -1274,9 +1380,11 @@ pub mod walker;
mod tests {
use super::util::*;
use super::*;
use crate::repo::data_diff::{DataAdd, DataDelete, DataDiff, DataUpdate};
use anyhow::Result;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::BTreeMap;
fn string_to_vec_u8(input: &str) -> Vec<u8> {
input.as_bytes().to_vec()
@ -1428,7 +1536,7 @@ mod tests {
fn saves_and_loads_from_blockstore() -> Result<()> {
let mut storage =
SqlRepoReader::new(None, "did:example:123456789abcdefghi".to_string(), None);
let mapping = generate_bulk_data_keys(50, Some(&mut storage))?;
let _mapping = generate_bulk_data_keys(50, Some(&mut storage))?;
let mut mst = MST::create(storage, None, None)?;
let mst_storage = mst.storage.clone();
@ -1443,6 +1551,104 @@ mod tests {
Ok(())
}
/*
@TODO: Get this working. Currently inconsistent and canonical test is failing as well
#[test]
fn diffs() -> Result<()> {
let mut storage =
SqlRepoReader::new(None, "did:example:123456789abcdefghi".to_string(), None);
let mapping = generate_bulk_data_keys(10, Some(&mut storage))?;
let mut mst = MST::create(storage.clone(), None, None)?;
let mut to_diff = mst.clone();
let entries = mapping
.iter()
.map(|e| (e.0.clone(), e.1.clone()))
.collect::<Vec<(String, Cid)>>();
for entry in &entries {
mst = mst.add(&entry.0, entry.1, None)?;
to_diff = to_diff.add(&entry.0, entry.1, None)?;
}
let to_add = generate_bulk_data_keys(2, Some(&mut storage))?
.into_iter()
.map(|e| (e.0, e.1))
.collect::<Vec<(String, Cid)>>();
println!(
"@TESTS: diffs to_add {:?}",
to_add
.iter()
.map(|e| (e.0.clone(), e.1.to_string()))
.collect::<Vec<(String, String)>>()
);
let to_edit = entries[2..3].to_vec();
let to_del = entries[3..4].to_vec();
let mut expected_adds: BTreeMap<String, DataAdd> = BTreeMap::new();
let mut expected_updates: BTreeMap<String, DataUpdate> = BTreeMap::new();
let mut expected_dels: BTreeMap<String, DataDelete> = BTreeMap::new();
for entry in &to_add {
to_diff = to_diff.add(&entry.0, entry.1, None)?;
expected_adds.insert(
entry.0.clone(),
DataAdd {
key: entry.0.clone(),
cid: entry.1.clone(),
},
);
}
for entry in &to_edit {
let updated = random_cid(&mut None)?;
to_diff = to_diff.update(&entry.0, updated)?;
expected_updates.insert(
entry.0.clone(),
DataUpdate {
key: entry.0.clone(),
prev: entry.1.clone(),
cid: updated,
},
);
}
for entry in &to_del {
to_diff = to_diff.delete(&entry.0)?;
expected_dels.insert(
entry.0.clone(),
DataDelete {
key: entry.0.clone(),
cid: entry.1.clone(),
},
);
}
let diff = DataDiff::of(&mut to_diff, Some(&mut mst))?;
assert_eq!(diff.add_list().len(), 2);
assert_eq!(diff.update_list().len(), 1);
assert_eq!(diff.delete_list().len(), 1);
assert_eq!(diff.adds, expected_adds);
assert_eq!(diff.updates, expected_updates);
assert_eq!(diff.deletes, expected_dels);
// ensure we correctly report all added CIDs
let mut blockstore = mst.storage.clone();
for mut entry in to_diff.walk() {
let cid: Cid = match entry {
NodeEntry::MST(ref mut entry) => entry.get_pointer()?,
NodeEntry::Leaf(ref entry) => entry.value.clone(),
};
let found =
blockstore.has(cid)? || diff.new_mst_blocks.has(cid) || diff.new_leaf_cids.has(cid);
assert!(found)
}
Ok(())
} */
#[test]
fn test_leading_zeros() -> Result<()> {
let msg = "MST 'depth' computation (SHA-256 leading zeros)";

View File

@ -25,7 +25,7 @@ fn is_valid_chars(input: &str) -> bool {
// alphanumeric (A-Za-z0-9), period, dash, underscore, colon, or tilde (.-_:~)
// * Must have at least 1 and at most 512 characters
// * The specific record key values . and .. are not allowed
pub fn is_valid_repo_mst_path(key: &String) -> Result<bool> {
pub fn is_valid_repo_mst_path(key: &str) -> Result<bool> {
let split: Vec<&str> = key.split("/").collect();
return if key.len() <= 256
@ -41,7 +41,7 @@ pub fn is_valid_repo_mst_path(key: &String) -> Result<bool> {
};
}
pub fn ensure_valid_mst_key(key: &String) -> Result<()> {
pub fn ensure_valid_mst_key(key: &str) -> Result<()> {
let result = is_valid_repo_mst_path(key)?;
match result {
true => Ok(()),
@ -49,7 +49,7 @@ pub fn ensure_valid_mst_key(key: &String) -> Result<()> {
}
}
pub fn cid_for_entries(entries: Vec<NodeEntry>) -> Result<Cid> {
pub fn cid_for_entries(entries: &[NodeEntry]) -> Result<Cid> {
let data = serialize_node_data(entries)?;
ipld::cid_for_cbor(&data)
}
@ -65,7 +65,7 @@ pub fn count_prefix_len(a: String, b: String) -> Result<usize> {
Ok(x)
}
pub fn serialize_node_data(entries: Vec<NodeEntry>) -> Result<NodeData> {
pub fn serialize_node_data(entries: &[NodeEntry]) -> Result<NodeData> {
let mut data = NodeData {
l: None,
e: Vec::new(),
@ -107,7 +107,7 @@ pub fn serialize_node_data(entries: Vec<NodeEntry>) -> Result<NodeData> {
pub fn deserialize_node_data(
storage: &SqlRepoReader,
data: NodeData,
data: &NodeData,
layer: Option<u32>,
) -> Result<Vec<NodeEntry>> {
let mut entries: Vec<NodeEntry> = Vec::new();
@ -123,7 +123,7 @@ pub fn deserialize_node_data(
entries.push(mst)
}
let mut last_key: String = "".to_owned();
for entry in data.e {
for entry in &data.e {
let key_str = str::from_utf8(entry.k.as_ref())?;
let p = usize::try_from(entry.p)?;
let key = format!("{}{}", &last_key[0..p], key_str);
@ -148,7 +148,7 @@ pub fn deserialize_node_data(
Ok(entries)
}
pub fn layer_for_entries(entries: Vec<NodeEntry>) -> Result<Option<u32>> {
pub fn layer_for_entries(entries: &[NodeEntry]) -> Result<Option<u32>> {
let first_leaf = entries.into_iter().find(|entry| entry.is_leaf());
if let Some(f) = first_leaf {
match f {
@ -160,7 +160,7 @@ pub fn layer_for_entries(entries: Vec<NodeEntry>) -> Result<Option<u32>> {
}
}
pub fn leading_zeros_on_hash(key: &Vec<u8>) -> Result<u32> {
pub fn leading_zeros_on_hash(key: &[u8]) -> Result<u32> {
let digest = Sha256::digest(&*key);
let hash: &[u8] = digest.as_ref();
let mut leading_zeros = 0;

View File

@ -4,7 +4,7 @@ use anyhow::{bail, Result};
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct WalkerStatusDone(bool);
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct WalkerStatusProgress {
pub done: bool,
pub curr: NodeEntry,
@ -12,13 +12,13 @@ pub struct WalkerStatusProgress {
pub index: usize,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum WalkerStatus {
WalkerStatusDone(WalkerStatusDone),
WalkerStatusProgress(WalkerStatusProgress),
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MstWalker {
pub stack: Vec<WalkerStatus>,
pub status: WalkerStatus,
@ -90,10 +90,10 @@ impl MstWalker {
WalkerStatus::WalkerStatusDone(_) => return Ok(()),
WalkerStatus::WalkerStatusProgress(ref mut p) => {
if let Some(_) = p.walking {
if let NodeEntry::MST(ref mut mst) = p.curr {
let next = mst.at_index(0)?;
if let NodeEntry::MST(ref mut curr) = p.curr {
let next = curr.at_index(0)?;
if let Some(next) = next {
p.walking = Some(mst.clone());
p.walking = Some(curr.clone());
self.stack
.push(WalkerStatus::WalkerStatusProgress(p.clone()));
p.curr = next.clone();
@ -112,7 +112,7 @@ impl MstWalker {
WalkerStatus::WalkerStatusProgress(WalkerStatusProgress {
done: false,
walking: Some(mst.clone()),
curr: next.clone(),
curr: next,
index: 0,
});
} else {