render-diff endpoint impl

This commit is contained in:
akvlad
2024-08-21 17:25:50 +03:00
parent decb26cb5e
commit fa3e67d983
12 changed files with 585 additions and 61 deletions

View File

@ -0,0 +1,45 @@
use crate::pattern::Pattern;
use uuid::Uuid;
pub struct PatternRegistry {
patterns: Vec<Pattern>,
}
impl PatternRegistry {
pub const fn new() -> PatternRegistry {
PatternRegistry { patterns: Vec::new() }
}
pub fn find_pattern(&mut self, str_text: &Vec<String>, i_text: &Vec<u64>, sample: String) -> &Pattern {
let mut idx: i32 = -1;
let mut mtc = 0;
for i in 0..self.patterns.len() {
mtc = self.patterns[i].match_text(&i_text);
if mtc == -1 || mtc > self.patterns[i].fluct {
continue;
}
idx = i as i32;
break;
}
if idx == -1 {
let pattern = Pattern::new(Uuid::new_v4().to_string(), &i_text, &str_text, sample);
self.patterns.push(pattern);
idx = (self.patterns.len() - 1) as i32;
} else if mtc != 0 {
self.patterns[idx as usize].adjust_pattern(&i_text);
}
return &self.patterns[idx as usize];
}
pub fn to_string(&self) -> String {
let mut s = String::new();
for i in 0..self.patterns.len() {
s += self.patterns[i].to_string().as_str();
s += "\n";
}
return s
}
}
pub static mut REGISTRY: PatternRegistry = PatternRegistry::new();

View File

@ -0,0 +1,45 @@
use regex::{Regex, CaptureMatches, Match};
/*pub fn tokenize(re: &Regex, text: &str) -> CaptureMatches {
return re.captures_iter(text);
}*/
pub struct Tokenizer<'a> {
text: String,
pos: usize,
re: Regex,
iter: Option<CaptureMatches<'a, 'a>>
}
impl Tokenizer<'_> {
pub fn new<'a>(text: &'a str) -> Tokenizer<'a> {
let mut res = Tokenizer {
text: text.to_string(),
pos: 0,
re: Regex::new(r"([\p{L}_]+|[\d.]+|[^\p{L}_\d.]+)\s*").unwrap(),
iter: None
};
res
}
}
impl Iterator for Tokenizer<'_> {
type Item = String;
fn next(&mut self) -> Option<Self::Item> {
None
/*let cap: Option<Match> = None;
if let Some(c) = cap {
self.pos += c.get(0).unwrap().end();
Some(c.get(0).unwrap().as_str().to_string())
} else {
None
}*/
}
}
#[test]
fn test_tokenizer() {
let text = "Hello, world! 123";
let mut tokenizer = Tokenizer::new(text);
}

View File

@ -52,3 +52,16 @@ export interface heatmap {
minDepth: uint64, minDepth: uint64,
maxDepth: uint64 maxDepth: uint64
} }
export interface level {
values: number[]
}
export interface flamegraphDiff {
name: string[],
levels: level[],
total: int64,
maxSelf: int64,
leftTicks: int64,
rightTicks: int64
}

View File

@ -22,7 +22,9 @@ const sqlWithReference = (ref) => {
let ctxIdx = 0 let ctxIdx = 0
const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) => { const newCtxIdx = () => ++ctxIdx
const importStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log, _ctxIdx, save) => {
const dist = clusterName ? '_dist' : '' const dist = clusterName ? '_dist' : ''
const v2 = checkVersion('profiles_v2', (fromTimeSec - 3600) * 1000) const v2 = checkVersion('profiles_v2', (fromTimeSec - 3600) * 1000)
const serviceNameSelector = serviceNameSelectorQuery(sel) const serviceNameSelector = serviceNameSelectorQuery(sel)
@ -127,30 +129,36 @@ const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) =>
const binData = Uint8Array.from(profiles.data) const binData = Uint8Array.from(profiles.data)
log.debug(`selectMergeStacktraces: profiles downloaded: ${binData.length / 1025}kB in ${Date.now() - start}ms`) log.debug(`selectMergeStacktraces: profiles downloaded: ${binData.length / 1025}kB in ${Date.now() - start}ms`)
require('./pprof-bin/pkg/pprof_bin').init_panic_hook() require('./pprof-bin/pkg/pprof_bin').init_panic_hook()
const _ctxIdx = ++ctxIdx
const [legacyLen, shift] = readULeb32(binData, 0) const [legacyLen, shift] = readULeb32(binData, 0)
let ofs = shift let ofs = shift
let mergePprofLat = BigInt(0)
for (let i = 0; i < legacyLen; i++) {
const [profLen, shift] = readULeb32(binData, ofs)
ofs += shift
start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
pprofBin.merge_prof(_ctxIdx,
Uint8Array.from(profiles.data.slice(ofs, ofs + profLen)),
`${typeRegex.sampleType}:${typeRegex.sampleUnit}`)
mergePprofLat += (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
ofs += profLen
}
start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
save && require('fs').writeFileSync(`/home/hromozeka/QXIP/qryn/data.${Date.now()}.bin`,
Buffer.from(Uint8Array.from(profiles.data.slice(ofs))))
pprofBin.merge_tree(_ctxIdx, Uint8Array.from(profiles.data.slice(ofs)),
typeRegex.sampleType + ':' + typeRegex.sampleUnit)
const mergeTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
log.debug(`merge_pprof: ${mergePprofLat / BigInt(1000000)}ms`)
log.debug(`merge_tree: ${mergeTreeLat / BigInt(1000000)}ms`)
}
const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) => {
const _ctxIdx = newCtxIdx()
try { try {
let mergePprofLat = BigInt(0) await importStackTraces(typeRegex, sel, fromTimeSec, toTimeSec, log, _ctxIdx)
for (let i = 0; i < legacyLen; i++) { const start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
const [profLen, shift] = readULeb32(binData, ofs)
ofs += shift
start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
pprofBin.merge_prof(_ctxIdx,
Uint8Array.from(profiles.data.slice(ofs, ofs + profLen)),
`${typeRegex.sampleType}:${typeRegex.sampleUnit}`)
mergePprofLat += (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
ofs += profLen
}
start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
pprofBin.merge_tree(_ctxIdx, Uint8Array.from(profiles.data.slice(ofs)),
typeRegex.sampleType + ':' + typeRegex.sampleUnit)
const mergeTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)
const resp = pprofBin.export_tree(_ctxIdx, typeRegex.sampleType + ':' + typeRegex.sampleUnit) const resp = pprofBin.export_tree(_ctxIdx, typeRegex.sampleType + ':' + typeRegex.sampleUnit)
const exportTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start const exportTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start
log.debug(`merge_pprof: ${mergePprofLat / BigInt(1000000)}ms`)
log.debug(`merge_tree: ${mergeTreeLat / BigInt(1000000)}ms`)
log.debug(`export_tree: ${exportTreeLat / BigInt(1000000)}ms`) log.debug(`export_tree: ${exportTreeLat / BigInt(1000000)}ms`)
return Buffer.from(resp) return Buffer.from(resp)
} finally { } finally {
@ -159,5 +167,7 @@ const mergeStackTraces = async (typeRegex, sel, fromTimeSec, toTimeSec, log) =>
} }
module.exports = { module.exports = {
mergeStackTraces mergeStackTraces,
} importStackTraces,
newCtxIdx
}

View File

@ -13,6 +13,13 @@ export function merge_prof(id: number, bytes: Uint8Array, sample_type: string):
*/ */
export function merge_tree(id: number, bytes: Uint8Array, sample_type: string): void; export function merge_tree(id: number, bytes: Uint8Array, sample_type: string): void;
/** /**
* @param {number} id1
* @param {number} id2
* @param {string} sample_type
* @returns {Uint8Array}
*/
export function diff_tree(id1: number, id2: number, sample_type: string): Uint8Array;
/**
* @param {number} id * @param {number} id
* @param {string} sample_type * @param {string} sample_type
* @returns {Uint8Array} * @returns {Uint8Array}

View File

@ -133,6 +133,28 @@ function getArrayU8FromWasm0(ptr, len) {
ptr = ptr >>> 0; ptr = ptr >>> 0;
return getUint8Memory0().subarray(ptr / 1, ptr / 1 + len); return getUint8Memory0().subarray(ptr / 1, ptr / 1 + len);
} }
/**
* @param {number} id1
* @param {number} id2
* @param {string} sample_type
* @returns {Uint8Array}
*/
module.exports.diff_tree = function(id1, id2, sample_type) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passStringToWasm0(sample_type, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len0 = WASM_VECTOR_LEN;
wasm.diff_tree(retptr, id1, id2, ptr0, len0);
var r0 = getInt32Memory0()[retptr / 4 + 0];
var r1 = getInt32Memory0()[retptr / 4 + 1];
var v2 = getArrayU8FromWasm0(r0, r1).slice();
wasm.__wbindgen_free(r0, r1 * 1, 1);
return v2;
} finally {
wasm.__wbindgen_add_to_stack_pointer(16);
}
};
/** /**
* @param {number} id * @param {number} id
* @param {string} sample_type * @param {string} sample_type

View File

@ -3,6 +3,7 @@
export const memory: WebAssembly.Memory; export const memory: WebAssembly.Memory;
export function merge_prof(a: number, b: number, c: number, d: number, e: number): void; export function merge_prof(a: number, b: number, c: number, d: number, e: number): void;
export function merge_tree(a: number, b: number, c: number, d: number, e: number): void; export function merge_tree(a: number, b: number, c: number, d: number, e: number): void;
export function diff_tree(a: number, b: number, c: number, d: number, e: number): void;
export function export_tree(a: number, b: number, c: number, d: number): void; export function export_tree(a: number, b: number, c: number, d: number): void;
export function export_trees_pprof(a: number, b: number, c: number): void; export function export_trees_pprof(a: number, b: number, c: number): void;
export function drop_tree(a: number): void; export function drop_tree(a: number): void;

View File

@ -2,6 +2,7 @@
mod ch64; mod ch64;
mod merge; mod merge;
use std::cmp::Ordering;
use ch64::city_hash_64; use ch64::city_hash_64;
use ch64::read_uint64_le; use ch64::read_uint64_le;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@ -12,6 +13,7 @@ use pprof_pb::google::v1::Sample;
use pprof_pb::querier::v1::FlameGraph; use pprof_pb::querier::v1::FlameGraph;
use pprof_pb::querier::v1::Level; use pprof_pb::querier::v1::Level;
use pprof_pb::querier::v1::SelectMergeStacktracesResponse; use pprof_pb::querier::v1::SelectMergeStacktracesResponse;
use pprof_pb::querier::v1::FlameGraphDiff;
use prost::Message; use prost::Message;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::Read; use std::io::Read;
@ -19,6 +21,8 @@ use std::panic;
use std::sync::Mutex; use std::sync::Mutex;
use std::vec::Vec; use std::vec::Vec;
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use std::sync::Arc;
pub mod pprof_pb { pub mod pprof_pb {
@ -47,16 +51,51 @@ struct TreeNodeV2 {
total: Vec<i64>, total: Vec<i64>,
} }
impl TreeNodeV2 {
pub fn clone(&self) -> TreeNodeV2 {
TreeNodeV2 {
fn_id: self.fn_id,
node_id: self.node_id,
slf: self.slf.clone(),
total: self.total.clone(),
}
}
pub fn set_total_and_self(&self, slf: Vec<i64>, total: Vec<i64>) -> TreeNodeV2 {
let mut res = self.clone();
res.slf = slf;
res.total = total;
return res;
}
}
struct Tree { struct Tree {
names: Vec<String>, names: Vec<String>,
names_map: HashMap<u64, usize>, names_map: HashMap<u64, usize>,
nodes: HashMap<u64, Vec<TreeNodeV2>>, nodes: HashMap<u64, Vec<Arc<TreeNodeV2>>>,
sample_types: Vec<String>, sample_types: Vec<String>,
max_self: Vec<i64>, max_self: Vec<i64>,
nodes_num: i32, nodes_num: i32,
} }
fn find_node(id: u64, nodes: &Vec<TreeNodeV2>) -> i32 { impl Tree {
pub fn total(&self) -> i64 {
let mut total: i64 = 0;
for c in 0..self.nodes.get(&0).unwrap().len() {
let _c = &self.nodes.get(&0).unwrap()[c];
total += _c.total[0];
}
total
}
pub fn add_name(&mut self, name: String, name_hash: u64) {
if self.names_map.contains_key(&name_hash) {
return;
}
self.names.push(name);
self.names_map.insert(name_hash, self.names.len() - 1);
}
}
fn find_node(id: u64, nodes: &Vec<Arc<TreeNodeV2>>) -> i32 {
let mut n: i32 = -1; let mut n: i32 = -1;
for c in 0..nodes.len() { for c in 0..nodes.len() {
let _c = &nodes[c]; let _c = &nodes[c];
@ -109,19 +148,25 @@ impl MergeTotalsProcessor {
fn merge_totals( fn merge_totals(
&self, &self,
node: &mut TreeNodeV2, node: Arc<TreeNodeV2>,
_max_self: &Vec<i64>, _max_self: &Vec<i64>,
sample: &Sample, sample: &Sample,
merge_self: bool, merge_self: bool,
) -> Vec<i64> { ) -> (TreeNodeV2, Vec<i64>) {
let mut max_self = _max_self.clone(); let mut max_self = _max_self.clone();
let mut res: TreeNodeV2 = TreeNodeV2 {
fn_id: node.fn_id,
node_id: node.node_id,
slf: vec![0; node.slf.len()],
total: vec![0; node.slf.len()],
};
for i in 0..self.from_idx.len() { for i in 0..self.from_idx.len() {
if self.from_idx[i] == -1 { if self.from_idx[i] == -1 {
continue; continue;
} }
node.total[i] += sample.value[self.from_idx[i] as usize]; res.total[i] += sample.value[self.from_idx[i] as usize];
if merge_self { if merge_self {
node.slf[i] += sample.value[self.from_idx[i] as usize]; res.slf[i] += sample.value[self.from_idx[i] as usize];
for i in 0..max_self.len() { for i in 0..max_self.len() {
if max_self[i] < node.slf[i] { if max_self[i] < node.slf[i] {
max_self[i] = node.slf[i]; max_self[i] = node.slf[i];
@ -129,7 +174,7 @@ impl MergeTotalsProcessor {
} }
} }
} }
max_self (res, max_self)
} }
} }
@ -164,29 +209,30 @@ fn merge(tree: &mut Tree, p: &Profile) {
if !tree.nodes.contains_key(&parent_id) && tree.nodes_num < 2000000 { if !tree.nodes.contains_key(&parent_id) && tree.nodes_num < 2000000 {
tree.nodes.insert(parent_id, Vec::new()); tree.nodes.insert(parent_id, Vec::new());
} }
let mut fake_children: Vec<TreeNodeV2> = Vec::new(); let mut fake_children: Vec<Arc<TreeNodeV2>> = Vec::new();
let children = tree.nodes.get_mut(&parent_id).unwrap_or(&mut fake_children); let children = tree.nodes.get_mut(&parent_id).unwrap_or(&mut fake_children);
let mut n = find_node(node_id, children); let mut n = find_node(node_id, children);
if n == -1 { if n == -1 {
children.push(TreeNodeV2 { children.push(Arc::new(TreeNodeV2 {
//parent_id, //parent_id,
fn_id: name_hash, fn_id: name_hash,
node_id, node_id,
slf: vec![0; tree.sample_types.len()], slf: vec![0; tree.sample_types.len()],
total: vec![0; tree.sample_types.len()], total: vec![0; tree.sample_types.len()],
}); }));
let idx = children.len().clone() - 1; let idx = children.len().clone() - 1;
let max_self = m.merge_totals( let new_node_and_max_self = m.merge_totals(
children.get_mut(idx).unwrap(), children.get(idx).unwrap().clone(),
tree.max_self.as_ref(), tree.max_self.as_ref(),
s, s,
i == 0, i == 0,
); );
tree.max_self = max_self; children[idx] = Arc::new(new_node_and_max_self.0);
tree.max_self = new_node_and_max_self.1;
n = idx as i32; n = idx as i32;
} else if tree.nodes_num < 2000000 { } else if tree.nodes_num < 2000000 {
m.merge_totals( m.merge_totals(
children.get_mut(n as usize).unwrap(), children.get_mut(n as usize).unwrap().clone(),
&tree.max_self, &tree.max_self,
s, s,
i == 0, i == 0,
@ -214,7 +260,7 @@ fn read_uleb128(bytes: &[u8]) -> (usize, usize) {
fn bfs(t: &Tree, res: &mut Vec<Level>, sample_type: String) { fn bfs(t: &Tree, res: &mut Vec<Level>, sample_type: String) {
let mut total: i64 = 0; let mut total: i64 = 0;
let mut root_children: &Vec<TreeNodeV2> = &Vec::new(); let mut root_children: &Vec<Arc<TreeNodeV2>> = &Vec::new();
if t.nodes.contains_key(&(0u64)) { if t.nodes.contains_key(&(0u64)) {
root_children = t.nodes.get(&(0u64)).unwrap(); root_children = t.nodes.get(&(0u64)).unwrap();
} }
@ -291,22 +337,22 @@ fn bfs(t: &Tree, res: &mut Vec<Level>, sample_type: String) {
} }
lazy_static! { lazy_static! {
static ref CTX: Mutex<HashMap<u32, Tree>> = Mutex::new(HashMap::new()); static ref CTX: Mutex<HashMap<u32, Mutex<Tree>>> = Mutex::new(HashMap::new());
} }
fn upsert_tree(ctx: &mut HashMap<u32, Tree>, id: u32, sample_types: Vec<String>) { fn upsert_tree(ctx: &mut HashMap<u32, Mutex<Tree>>, id: u32, sample_types: Vec<String>) {
if !ctx.contains_key(&id) { if !ctx.contains_key(&id) {
let _len = sample_types.len().clone(); let _len = sample_types.len().clone();
ctx.insert( ctx.insert(
id, id,
Tree { Mutex::new(Tree {
names: vec!["total".to_string(), "n/a".to_string()], names: vec!["total".to_string(), "n/a".to_string()],
names_map: HashMap::new(), names_map: HashMap::new(),
nodes: HashMap::new(), nodes: HashMap::new(),
sample_types, sample_types,
max_self: vec![0; _len], max_self: vec![0; _len],
nodes_num: 1, nodes_num: 1,
}, }),
); );
} }
} }
@ -413,18 +459,11 @@ fn merge_trie(tree: &mut Tree, bytes: &[u8], samples_type: &String) {
n = find_node(node_id, tree.nodes.get(&parent_id).unwrap()); n = find_node(node_id, tree.nodes.get(&parent_id).unwrap());
} }
if n != -1 { if n != -1 {
tree.nodes let mut __node = tree.nodes.get_mut(&parent_id).unwrap().get_mut(n as usize).unwrap().clone();
.get_mut(&parent_id) let mut _node = __node.as_ref().clone();
.unwrap() _node.total[sample_type_index] += total[sample_type_index];
.get_mut(n as usize) _node.slf[sample_type_index] += slf[sample_type_index];
.unwrap() tree.nodes.get_mut(&parent_id).unwrap()[n as usize] = Arc::new(_node);
.total[sample_type_index] += total[sample_type_index];
tree.nodes
.get_mut(&parent_id)
.unwrap()
.get_mut(n as usize)
.unwrap()
.slf[sample_type_index] += slf[sample_type_index];
} }
if tree.nodes_num >= 2000000 { if tree.nodes_num >= 2000000 {
return; return;
@ -432,13 +471,13 @@ fn merge_trie(tree: &mut Tree, bytes: &[u8], samples_type: &String) {
if !tree.nodes.contains_key(&parent_id) { if !tree.nodes.contains_key(&parent_id) {
tree.nodes.insert(parent_id, Vec::new()); tree.nodes.insert(parent_id, Vec::new());
} }
tree.nodes.get_mut(&parent_id).unwrap().push(TreeNodeV2 { tree.nodes.get_mut(&parent_id).unwrap().push(Arc::new(TreeNodeV2 {
fn_id, fn_id,
//parent_id, //parent_id,
node_id, node_id,
slf, slf,
total, total,
}); }));
tree.nodes_num += 1; tree.nodes_num += 1;
} }
} }
@ -551,12 +590,25 @@ fn merge_trie(tree: &mut Tree, bytes: &[u8], samples_type: &String) {
inject_functions(prof, tree, 0, vec![], type_idx); inject_functions(prof, tree, 0, vec![], type_idx);
}*/ }*/
fn assert_positive(t: &Tree) -> bool{
for n in t.nodes.keys() {
for _n in 0..t.nodes.get(&n).unwrap().len() {
for __n in 0..t.nodes.get(&n).unwrap()[_n].slf.len() {
if t.nodes.get(&n).unwrap()[_n].slf[__n] < 0 {
return false;
}
}
}
}
true
}
#[wasm_bindgen] #[wasm_bindgen]
pub fn merge_prof(id: u32, bytes: &[u8], sample_type: String) { pub fn merge_prof(id: u32, bytes: &[u8], sample_type: String) {
let p = panic::catch_unwind(|| { let p = panic::catch_unwind(|| {
let mut ctx = CTX.lock().unwrap(); let mut ctx = CTX.lock().unwrap();
upsert_tree(&mut ctx, id, vec![sample_type]); upsert_tree(&mut ctx, id, vec![sample_type]);
let mut tree = ctx.get_mut(&id).unwrap(); let mut tree = ctx.get_mut(&id).unwrap().lock().unwrap();
let prof = Profile::decode(bytes).unwrap(); let prof = Profile::decode(bytes).unwrap();
merge(&mut tree, &prof); merge(&mut tree, &prof);
}); });
@ -571,7 +623,7 @@ pub fn merge_tree(id: u32, bytes: &[u8], sample_type: String) {
let result = panic::catch_unwind(|| { let result = panic::catch_unwind(|| {
let mut ctx = CTX.lock().unwrap(); let mut ctx = CTX.lock().unwrap();
upsert_tree(&mut ctx, id, vec![sample_type.clone()]); upsert_tree(&mut ctx, id, vec![sample_type.clone()]);
let mut tree = ctx.get_mut(&id).unwrap(); let mut tree = ctx.get_mut(&id).unwrap().lock().unwrap();
merge_trie(&mut tree, bytes, &sample_type); merge_trie(&mut tree, bytes, &sample_type);
0 0
}); });
@ -581,25 +633,275 @@ pub fn merge_tree(id: u32, bytes: &[u8], sample_type: String) {
} }
} }
#[wasm_bindgen]
pub fn diff_tree(id1: u32, id2: u32, sample_type: String) -> Vec<u8> {
let mut ctx = CTX.lock().unwrap();
let _ctx = &mut ctx;
upsert_tree(_ctx, id1, vec![sample_type.clone()]);
upsert_tree(_ctx, id2, vec![sample_type.clone()]);
let mut t1 = _ctx.get(&id1).unwrap().lock().unwrap();
let mut t2 = _ctx.get(&id2).unwrap().lock().unwrap();
let mut is_positive = assert_positive(&t1);
if !is_positive {
panic!("Tree 1 is not positive");
}
is_positive = assert_positive(&t2);
if!is_positive {
panic!("Tree 2 is not positive");
}
for n in t1.names_map.keys() {
if !t2.names_map.contains_key(&n) {
t2.names.push(t1.names[*t1.names_map.get(&n).unwrap()].clone());
let idx = t2.names.len() - 1;
t2.names_map.insert(*n, idx);
}
}
for n in t2.names_map.keys() {
if !t1.names_map.contains_key(&n) {
let idx = t2.names_map.get(&n).unwrap().clone();
t1.names.push(t2.names[idx].clone());
let idx2 = t1.names.len() - 1;
t1.names_map.insert(*n, idx2);
}
}
let keys = t1.nodes.keys().map(|x| (*x).clone()).collect::<Vec<_>>();
for n in keys {
if !t2.nodes.contains_key(&n) {
t2.nodes.insert(n, vec![]);
}
let lnodes = t1.nodes.get_mut(&n).unwrap();
let rnodes = t2.nodes.get_mut(&n).unwrap();
lnodes.sort_by(|x, y|
if x.node_id < y.node_id { Ordering::Less } else { Ordering::Greater });
rnodes.sort_by(|x, y|
if x.node_id < y.node_id { Ordering::Less } else { Ordering::Greater });
let mut i = 0;
let mut j = 0;
let mut new_t1_nodes: Vec<Arc<TreeNodeV2>> = vec![];
let mut new_t2_nodes: Vec<Arc<TreeNodeV2>> = vec![];
let t1_nodes = t1.nodes.get(&n).unwrap();
let t2_nodes = t2.nodes.get(&n).unwrap();
while i < t1_nodes.len() && j < t2_nodes.len() {
if n == 0 {
println!("{:?}:{:?} - {:?}:{:?}",
t1_nodes[i].node_id,
t1.names[*t1.names_map.get(&t1_nodes[i].fn_id).unwrap() as usize],
t2_nodes[j].node_id,
t2.names[*t2.names_map.get(&t2_nodes[j].fn_id).unwrap() as usize]
)
}
if t1_nodes[i].node_id == t2_nodes[j].node_id {
new_t1_nodes.push(t1_nodes[i].clone());
new_t2_nodes.push(t2_nodes[j].clone());
i += 1;
j += 1;
continue;
}
if t1_nodes[i].node_id < t2_nodes[j].node_id {
new_t1_nodes.push(t1_nodes[i].clone());
new_t2_nodes.push(Arc::new(TreeNodeV2{
node_id: t1_nodes[i].node_id,
fn_id: t1_nodes[i].fn_id,
slf: vec![0],
total: vec![0],
}));
i += 1;
} else {
new_t2_nodes.push(t2_nodes[j].clone());
new_t1_nodes.push(Arc::new(TreeNodeV2{
node_id: t2_nodes[j].node_id,
fn_id: t2_nodes[j].fn_id,
slf: vec![0],
total: vec![0],
}));
j += 1;
}
}
while i < t1_nodes.len() {
new_t1_nodes.push(t1_nodes[i].clone());
new_t2_nodes.push(Arc::new(TreeNodeV2{
node_id: t1_nodes[i].node_id,
fn_id: t1_nodes[i].fn_id,
slf: vec![0],
total: vec![0],
}));
i += 1;
}
while j < t2_nodes.len() {
new_t2_nodes.push(t2_nodes[j].clone());
new_t1_nodes.push(Arc::new(TreeNodeV2{
node_id: t2_nodes[j].node_id,
fn_id: t2_nodes[j].fn_id,
slf: vec![0],
total: vec![0],
}));
j+=1;
}
t1.nodes.insert(n, new_t1_nodes);
t2.nodes.insert(n, new_t2_nodes);
}
for n in t2.nodes.keys().clone() {
if!t1.nodes.contains_key(&n) {
let mut new_t1_nodes: Vec<Arc<TreeNodeV2>> = vec![];
for _n in t2.nodes.get(&n).unwrap() {
new_t1_nodes.push(Arc::new(TreeNodeV2{
node_id: _n.node_id,
fn_id: _n.fn_id,
slf: vec![0],
total: vec![0],
}))
}
t1.nodes.insert(*n, new_t1_nodes);
}
}
let total_left = t1.total();
let total_right = t2.total();
let mut min_val = 0 as i64;
let tn = Arc::new(TreeNodeV2{
fn_id: 0,
node_id: 0,
slf: vec![0],
total: vec![total_left],
});
let mut left_nodes = vec![tn];
let tn2 = Arc::new(TreeNodeV2{
fn_id: 0,
node_id: 0,
slf: vec![0],
total: vec![total_right],
});
let mut right_nodes = vec![tn2];
let mut x_left_offsets = vec![0 as i64];
let mut x_right_offsets = vec![0 as i64];
let mut levels = vec![0 as i64];
let mut name_location_cache: HashMap<String, i64> = HashMap::new();
let mut res = FlameGraphDiff::default();
res.left_ticks = total_left;
res.right_ticks = total_right;
res.total = total_left + total_right;
while left_nodes.len() > 0 {
let left = left_nodes.pop().unwrap();
let right = right_nodes.pop().unwrap();
let mut x_left_offset = x_left_offsets.pop().unwrap();
let mut x_right_offset = x_right_offsets.pop().unwrap();
let level = levels.pop().unwrap();
let mut name: String = "total".to_string();
if left.fn_id != 0 {
name = t1.names[t1.names_map.get(&left.fn_id).unwrap().clone() as usize].clone();
}
if left.total[0] >= min_val || right.total[0] >= min_val || name == "other" {
let mut i = 0 as i64;
if !name_location_cache.contains_key(&name) {
res.names.push(name.clone().to_string());
name_location_cache.insert(name, (res.names.len() - 1) as i64);
i = res.names.len() as i64 - 1;
} else {
i = *name_location_cache.get(name.as_str()).unwrap();
}
if level == res.levels.len() as i64 {
res.levels.push(Level::default())
}
if res.max_self < left.slf[0] {
res.max_self = left.slf[0];
}
if res.max_self < right.slf[0] {
res.max_self = right.slf[0];
}
let mut values = vec![x_left_offset, left.total[0], left.slf[0],
x_right_offset, right.total[0], right.slf[0], i];
res.levels[level as usize].values.extend(values);
let mut other_left_total = 0 as i64;
let mut other_right_total = 0 as i64;
let mut nodes_len = 0;
if t1.nodes.contains_key(&left.node_id) {
nodes_len = t1.nodes.get(&left.node_id).unwrap().len().clone();
}
for j in 0..nodes_len {
let _left = t1.nodes.get(&left.node_id).unwrap()[j].clone();
let _right = t2.nodes.get(&left.node_id).unwrap()[j].clone();
if _left.total[0] >= min_val || _right.total[0] >= min_val {
levels.insert(0, level + 1);
x_left_offsets.insert(0, x_left_offset);
x_right_offsets.insert(0, x_right_offset);
x_left_offset += _left.total[0].clone() as i64;
x_right_offset += _right.total[0].clone() as i64;
left_nodes.insert(0, _left.clone());
right_nodes.insert(0, _right.clone());
} else {
other_left_total += _left.total[0] as i64;
other_right_total += _right.total[0] as i64;
}
if other_left_total > 0 || other_right_total > 0 {
levels.insert(0, level + 1);
t1.add_name("other".to_string(), 1);
x_left_offsets.insert(0, x_left_offset);
left_nodes.insert(0, Arc::new(TreeNodeV2{
fn_id: 1,
node_id: 1,
slf: vec![other_left_total as i64],
total: vec![other_left_total as i64],
}));
t2.add_name("other".to_string(), 1);
x_right_offsets.insert(0, x_right_offset);
right_nodes.insert(0, Arc::new(TreeNodeV2{
fn_id: 1,
node_id: 1,
slf: vec![other_right_total as i64],
total: vec![other_right_total as i64],
}));
}
}
}
}
for i in 0..res.levels.len() {
let mut j = 0;
let mut prev = 0 as i64;
while j < res.levels[i].values.len() {
res.levels[i].values[j] -= prev;
prev += res.levels[i].values[j] + res.levels[i].values[j+1];
j += 7;
}
prev = 0;
j = 3;
while j < res.levels[i].values.len() {
res.levels[i].values[j] -= prev;
prev += res.levels[i].values[j] + res.levels[i].values[j+1];
j += 7;
}
}
res.encode_to_vec()
}
#[wasm_bindgen] #[wasm_bindgen]
pub fn export_tree(id: u32, sample_type: String) -> Vec<u8> { pub fn export_tree(id: u32, sample_type: String) -> Vec<u8> {
let p = panic::catch_unwind(|| { let p = panic::catch_unwind(|| {
let mut ctx = CTX.lock().unwrap(); let mut ctx = CTX.lock().unwrap();
let mut res = SelectMergeStacktracesResponse::default(); let mut res = SelectMergeStacktracesResponse::default();
upsert_tree(&mut ctx, id, vec![sample_type.clone()]); upsert_tree(&mut ctx, id, vec![sample_type.clone()]);
let tree = ctx.get_mut(&id).unwrap(); let tree = ctx.get_mut(&id).unwrap().lock().unwrap();
let mut fg = FlameGraph::default(); let mut fg = FlameGraph::default();
fg.names = tree.names.clone(); fg.names = tree.names.clone();
fg.max_self = tree.max_self[0 /* TODO */]; fg.max_self = tree.max_self[0 /* TODO */];
fg.total = 0; fg.total = 0;
let mut root_children: &Vec<TreeNodeV2> = &vec![]; let mut root_children: &Vec<Arc<TreeNodeV2>> = &vec![];
if tree.nodes.contains_key(&(0u64)) { if tree.nodes.contains_key(&(0u64)) {
root_children = tree.nodes.get(&(0u64)).unwrap(); root_children = tree.nodes.get(&(0u64)).unwrap();
} }
for n in root_children.iter() { for n in root_children.iter() {
fg.total += n.total[0 /*TODO*/] as i64; fg.total += n.total[0 /*TODO*/] as i64;
} }
bfs(tree, &mut fg.levels, sample_type.clone()); bfs(&tree, &mut fg.levels, sample_type.clone());
res.flamegraph = Some(fg); res.flamegraph = Some(fg);
return res.encode_to_vec(); return res.encode_to_vec();
}); });

View File

@ -9,6 +9,7 @@ const { QrynBadRequest } = require('../lib/handlers/errors')
const { clusterName } = require('../common') const { clusterName } = require('../common')
const logger = require('../lib/logger') const logger = require('../lib/logger')
const jsonParsers = require('./json_parsers') const jsonParsers = require('./json_parsers')
const renderDiff = require('./render_diff')
const { const {
parser, parser,
wrapResponse, wrapResponse,
@ -444,4 +445,5 @@ module.exports.init = (fastify) => {
} }
settings.init(fastify) settings.init(fastify)
render.init(fastify) render.init(fastify)
renderDiff.init(fastify)
} }

View File

@ -248,5 +248,7 @@ const init = (fastify) => {
} }
module.exports = { module.exports = {
init init,
parseQuery,
toFlamebearer
} }

75
pyroscope/render_diff.js Normal file
View File

@ -0,0 +1,75 @@
const { parseQuery, toFlamebearer } = require('./render')
const { importStackTraces, newCtxIdx } = require('./merge_stack_traces')
const pprofBin = require('./pprof-bin/pkg')
const querierMessages = require('./querier_pb')
const types = require('./types/v1/types_pb')
const renderDiff = async (req, res) => {
const [leftQuery, leftFromTimeSec, leftToTimeSec] =
parseParams(req.query.leftQuery, req.query.leftFrom, req.query.leftUntil);
const [rightQuery, rightFromTimeSec, rightToTimeSec] =
parseParams(req.query.rightQuery, req.query.rightFrom, req.query.rightUntil);
if (leftQuery.typeId != rightQuery.typeId) {
res.code(400).send('Different type IDs')
return
}
const leftCtxIdx = newCtxIdx()
await importStackTraces(leftQuery.typeDesc, '{' + leftQuery.labelSelector + '}', leftFromTimeSec, leftToTimeSec, req.log, leftCtxIdx, true)
const rightCtxIdx = newCtxIdx()
await importStackTraces(rightQuery.typeDesc, '{' + rightQuery.labelSelector + '}', rightFromTimeSec, rightToTimeSec, req.log, rightCtxIdx, true)
const flamegraphDiffBin = pprofBin.diff_tree(leftCtxIdx, rightCtxIdx,
`${leftQuery.typeDesc.sampleType}:${leftQuery.typeDesc.sampleUnit}`)
const profileType = new types.ProfileType()
profileType.setId(leftQuery.typeId)
profileType.setName(leftQuery.typeDesc.type)
profileType.setSampleType(leftQuery.typeDesc.sampleType)
profileType.setSampleUnit(leftQuery.typeDesc.sampleUnit)
profileType.setPeriodType(leftQuery.typeDesc.periodType)
profileType.setPeriodUnit(leftQuery.typeDesc.periodUnit)
const diff = querierMessages.FlameGraphDiff.deserializeBinary(flamegraphDiffBin)
return res.code(200).send(diffToFlamegraph(diff, profileType).flamebearerProfileV1)
}
/**
*
* @param diff
* @param type
*/
const diffToFlamegraph = (diff, type) => {
const fg = new querierMessages.FlameGraph()
fg.setNamesList(diff.getNamesList())
fg.setLevelsList(diff.getLevelsList())
fg.setTotal(diff.getTotal())
fg.setMaxSelf(diff.getMaxSelf())
const fb = toFlamebearer(fg, type)
fb.flamebearerProfileV1.leftTicks = diff.getLeftticks()
fb.flamebearerProfileV1.rightTicks = diff.getRightticks()
fb.flamebearerProfileV1.metadata = {
...(fb.flamebearerProfileV1.metadata || {}),
format: 'double'
}
return fb
}
const parseParams = (query, from, until) => {
const parsedQuery = parseQuery(query)
const fromTimeSec = from
? Math.floor(parseInt(from) / 1000)
: Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000)
const toTimeSec = until
? Math.floor(parseInt(until) / 1000)
: Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000)
if (!parsedQuery) {
throw new Error('Invalid query')
}
return [parsedQuery, fromTimeSec, toTimeSec]
}
const init = (fastify) => {
fastify.get('/pyroscope/render-diff', renderDiff)
}
module.exports = {
renderDiff,
init
}