mirror of
https://github.com/metrico/qryn.git
synced 2025-03-15 10:14:19 +00:00
Merge branch 'master' into lmangani-golang-xnet
This commit is contained in:
@ -23,7 +23,7 @@
|
||||
* **Secure**: Retain total control of data, using **ClickHouse**, **DuckDB** or **InfluxDB** IOx with **S3** object storage
|
||||
* **Indepentent**: Opensource, Community powered, Anti lock-in alternative to Vendor controlled stacks
|
||||
|
||||

|
||||
<!--  -->
|
||||
|
||||
|
||||
<br>
|
||||
|
@ -1,4 +1,7 @@
|
||||
const path = require('path')
|
||||
module.exports = {
|
||||
setupFilesAfterEnv: [path.join(__dirname, '/test/jest.setup.js')]
|
||||
setupFilesAfterEnv: [path.join(__dirname, '/test/jest.setup.js')],
|
||||
moduleNameMapper: {
|
||||
'^axios$': 'axios/dist/node/axios.cjs'
|
||||
}
|
||||
}
|
||||
|
@ -1363,12 +1363,27 @@ const samplesReadTable = {
|
||||
*/
|
||||
const rawRequest = async (query, data, database, config) => {
|
||||
try {
|
||||
if (data && !(Buffer.isBuffer(data) || data instanceof Uint8Array || typeof data === 'string')) {
|
||||
throw new Error('data must be Buffer, Uint8Array or String: currently the data is: ' + typeof data)
|
||||
}
|
||||
if (typeof data === 'string') {
|
||||
data = Buffer.from(data, 'utf8')
|
||||
}
|
||||
if (typeof query !== 'string') {
|
||||
throw new Error('query must be String: currently the query is: ' + typeof query)
|
||||
}
|
||||
const getParams = [
|
||||
(database ? `database=${encodeURIComponent(database)}` : null),
|
||||
(data ? `query=${encodeURIComponent(query)}` : null)
|
||||
].filter(p => p)
|
||||
const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}`
|
||||
return await axios.post(url, data || query, config)
|
||||
config = {
|
||||
...(config || {}),
|
||||
method: 'post',
|
||||
url: url,
|
||||
data: data || query
|
||||
}
|
||||
return await axios(config)
|
||||
} catch (e) {
|
||||
logger.error('rawRequest error: ' + query)
|
||||
e.response?.data && logger.error(e.response.data.toString())
|
||||
|
10
package-lock.json
generated
10
package-lock.json
generated
@ -25,7 +25,7 @@
|
||||
"@qxip/plugnplay": "^3.3.1",
|
||||
"@stricjs/router": "^5.0.6",
|
||||
"@stricjs/utils": "^1.6.1",
|
||||
"axios": "^0.28.0",
|
||||
"axios": "^1.6.8",
|
||||
"basic-auth": "^2.0.1",
|
||||
"bnf": "^1.0.1",
|
||||
"csv-writer": "^1.6.0",
|
||||
@ -3574,11 +3574,11 @@
|
||||
}
|
||||
},
|
||||
"node_modules/axios": {
|
||||
"version": "0.28.0",
|
||||
"resolved": "https://registry.npmjs.org/axios/-/axios-0.28.0.tgz",
|
||||
"integrity": "sha512-Tu7NYoGY4Yoc7I+Npf9HhUMtEEpV7ZiLH9yndTCoNhcpBH0kwcvFbzYN9/u5QKI5A6uefjsNNWaz5olJVYS62Q==",
|
||||
"version": "1.6.8",
|
||||
"resolved": "https://registry.npmjs.org/axios/-/axios-1.6.8.tgz",
|
||||
"integrity": "sha512-v/ZHtJDU39mDpyBoFVkETcd/uNdxrWRrg3bKpOKzXFA6Bvqopts6ALSMU3y6ijYxbw2B+wPrIv46egTzJXCLGQ==",
|
||||
"dependencies": {
|
||||
"follow-redirects": "^1.15.0",
|
||||
"follow-redirects": "^1.15.6",
|
||||
"form-data": "^4.0.0",
|
||||
"proxy-from-env": "^1.1.0"
|
||||
}
|
||||
|
@ -44,7 +44,7 @@
|
||||
"@qxip/influx-line-protocol-parser": "^0.2.1",
|
||||
"@qxip/plugnplay": "^3.3.1",
|
||||
"@stricjs/router": "^5.0.6",
|
||||
"axios": "^0.28.0",
|
||||
"axios": "^1.6.8",
|
||||
"bnf": "^1.0.1",
|
||||
"csv-writer": "^1.6.0",
|
||||
"date-fns": "^2.27.0",
|
||||
|
@ -260,7 +260,7 @@ const parsers = {
|
||||
const parser = find(parsers._parsers, [contentType, req.routeOptions.method, req.routeOptions.url]) ||
|
||||
find(parsers._parsers, ['*', req.routeOptions.method, req.routeOptions.url])
|
||||
if (!parser) {
|
||||
throw new Error('undefined parser')
|
||||
throw new Error(`undefined parser for ${contentType} ${req.routeOptions.method} ${req.routeOptions.url}`)
|
||||
}
|
||||
return await parser(req, payload)
|
||||
},
|
||||
|
2
test/e2e
2
test/e2e
Submodule test/e2e updated: 85e344c83e...8847ca00a0
@ -1,4 +1,4 @@
|
||||
const { getCompareFn, durationToNs, unquote } = require('./shared')
|
||||
const { getCompareFn, durationToNs, unquote, bitSet } = require('./shared')
|
||||
const Sql = require('@cloki/clickhouse-sql')
|
||||
module.exports = class Builder {
|
||||
constructor () {
|
||||
@ -71,6 +71,18 @@ module.exports = class Builder {
|
||||
const having = self.getCond(self.conds)
|
||||
self.aggregator(sel)
|
||||
sel.conditions = Sql.And(sel.conditions, Sql.Or(...self.where))
|
||||
if (Array.isArray(ctx.randomFilter) && Array.isArray(ctx.cachedTraceIds) && ctx.cachedTraceIds.length > 0) {
|
||||
sel.conditions = Sql.And(
|
||||
sel.conditions,
|
||||
Sql.Or(
|
||||
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])),
|
||||
new Sql.In('trace_id', 'in', ctx.cachedTraceIds.map(traceId => new Sql.Raw(`unhex('${traceId}')`)))
|
||||
))
|
||||
} else if (Array.isArray(ctx.randomFilter)) {
|
||||
sel.conditions = Sql.And(
|
||||
sel.conditions,
|
||||
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])))
|
||||
}
|
||||
sel.having(having)
|
||||
return sel
|
||||
}
|
||||
@ -248,20 +260,6 @@ function groupBitOr (left, alias) {
|
||||
return res
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param terms
|
||||
* @returns {SQLObject}
|
||||
*/
|
||||
function bitSet (terms) {
|
||||
const res = new Sql.Raw('')
|
||||
res.terms = terms
|
||||
res.toString = () => {
|
||||
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param attr {string}
|
||||
|
19
traceql/clickhouse_transpiler/attr_condition_eval.js
Normal file
19
traceql/clickhouse_transpiler/attr_condition_eval.js
Normal file
@ -0,0 +1,19 @@
|
||||
const attrCondition = require('./attr_condition')
|
||||
const {bitSet} = require('./shared')
|
||||
const Sql = require('@cloki/clickhouse-sql')
|
||||
module.exports = class Builder extends attrCondition {
|
||||
build () {
|
||||
const self = this
|
||||
const superBuild = super.build()
|
||||
/** @type {BuiltProcessFn} */
|
||||
const res = (ctx) => {
|
||||
const sel = superBuild(ctx)
|
||||
sel.having_conditions = []
|
||||
sel.aggregations = [bitSet(self.sqlConditions)]
|
||||
sel.select_list = [[new Sql.Raw('count()'), 'count']]
|
||||
sel.order_expressions = []
|
||||
return sel
|
||||
}
|
||||
return res
|
||||
}
|
||||
}
|
@ -7,9 +7,7 @@ module.exports = standardBuilder((sel, ctx) => {
|
||||
.with(withMain)
|
||||
.select(
|
||||
['trace_id', 'trace_id'],
|
||||
[new Sql.Raw('groupArray(span_id)'), 'span_id'],
|
||||
[new Sql.Raw('groupArray(duration)'), 'duration'],
|
||||
[new Sql.Raw('groupArray(timestamp_ns)'), 'timestamp_ns']
|
||||
[new Sql.Raw('groupArray(100)(span_id)'), 'span_id']
|
||||
).from(new Sql.WithReference(withMain))
|
||||
.groupBy('trace_id')
|
||||
.orderBy([new Sql.Raw('max(index_search.timestamp_ns)'), 'desc'])
|
||||
|
@ -1,4 +1,5 @@
|
||||
const AttrConditionPlanner = require('./attr_condition')
|
||||
const AttrConditionEvalPlanner = require('./attr_condition_eval')
|
||||
const InitIndexPlanner = require('./init')
|
||||
const IndexGroupByPlanner = require('./group_by')
|
||||
const AggregatorPlanner = require('./aggregator')
|
||||
@ -8,10 +9,17 @@ const TracesDataPlanner = require('./traces_data')
|
||||
/**
|
||||
* @param script {Token}
|
||||
*/
|
||||
module.exports = (script) => {
|
||||
module.exports.transpile = (script) => {
|
||||
return new Planner(script).plan()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param script {Token}
|
||||
*/
|
||||
module.exports.evaluateCmpl = (script) => {
|
||||
return new Planner(script).planEval()
|
||||
}
|
||||
|
||||
class Planner {
|
||||
/**
|
||||
*
|
||||
@ -53,6 +61,19 @@ class Planner {
|
||||
return res
|
||||
}
|
||||
|
||||
planEval () {
|
||||
this.check()
|
||||
this.analyze()
|
||||
const res = (new AttrConditionEvalPlanner())
|
||||
.withTerms(this.termIdx)
|
||||
.withConditions(this.cond)
|
||||
.withAggregatedAttr(this.aggregatedAttr)
|
||||
.withMain((new InitIndexPlanner()).build())
|
||||
.build()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
check () {
|
||||
if (this.script.Children('SYNTAX').length > 1) {
|
||||
throw new Error('more than one selector is not supported')
|
||||
|
@ -9,7 +9,9 @@ const { standardBuilder } = require('./shared')
|
||||
* limit: number,
|
||||
* isCluster: boolean,
|
||||
* tracesTable: string,
|
||||
* tracesDistTable: string
|
||||
* tracesDistTable: string,
|
||||
* randomFilter: number[]|undefined,
|
||||
* cachedTraceIds: string[]|undefined,
|
||||
* }} Context
|
||||
*/
|
||||
/**
|
||||
@ -21,7 +23,7 @@ const { standardBuilder } = require('./shared')
|
||||
*/
|
||||
module.exports = standardBuilder((sel, ctx) => {
|
||||
return (new Sql.Select()).select(['trace_id', 'trace_id'],
|
||||
[new Sql.Raw('lower(hex(span_id))'), 'span_id'],
|
||||
[new Sql.Raw('span_id'), 'span_id'],
|
||||
[new Sql.Raw('any(duration)'), 'duration'],
|
||||
[new Sql.Raw('any(timestamp_ns)'), 'timestamp_ns'])
|
||||
.from([ctx.tracesAttrsTable, 'traces_idx'])
|
||||
|
@ -82,3 +82,17 @@ module.exports.standardBuilder = (fn) => {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param terms {SQLObject[]}
|
||||
* @returns {SQLObject}
|
||||
*/
|
||||
module.exports.bitSet = (terms) => {
|
||||
const res = new Sql.Raw('')
|
||||
res.terms = terms
|
||||
res.toString = () => {
|
||||
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
@ -9,23 +9,23 @@ const processFn = (sel, ctx) => {
|
||||
const withTraceIds = new Sql.With('trace_ids', (new Sql.Select())
|
||||
.select('trace_id')
|
||||
.from(new Sql.WithReference(withMain)))
|
||||
const withTraceIdsSpanIds = new Sql.With('trace_span_ids', (new Sql.Select())
|
||||
.select('trace_id', 'span_id')
|
||||
.from(new Sql.WithReference(withMain))
|
||||
.join('span_id', 'array'))
|
||||
return (new Sql.Select())
|
||||
.with(withMain, withTraceIds)
|
||||
.with(withMain, withTraceIds, withTraceIdsSpanIds)
|
||||
.select(
|
||||
[new Sql.Raw('lower(hex(traces.trace_id))'), 'trace_id'],
|
||||
[new Sql.Raw('any(index_grouped.span_id)'), 'span_id'],
|
||||
[new Sql.Raw('any(index_grouped.duration)'), 'duration'],
|
||||
[new Sql.Raw('any(index_grouped.timestamp_ns)'), 'timestamp_ns'],
|
||||
[new Sql.Raw(`arrayMap(x -> lower(hex(x)), groupArrayIf(traces.span_id, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)}))`), 'span_id'],
|
||||
[new Sql.Raw(`groupArrayIf(traces.duration_ns, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'duration'],
|
||||
[new Sql.Raw(`groupArrayIf(traces.timestamp_ns, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'timestamp_ns'],
|
||||
[new Sql.Raw('min(traces.timestamp_ns)'), 'start_time_unix_nano'],
|
||||
[new Sql.Raw(
|
||||
'toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000'
|
||||
), 'duration_ms'],
|
||||
[new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name']
|
||||
).from([table, 'traces']).join(
|
||||
new Sql.WithReference(withMain),
|
||||
'left any',
|
||||
Sql.Eq(new Sql.Raw('traces.trace_id'), new Sql.Raw('index_grouped.trace_id'))
|
||||
).where(Sql.And(
|
||||
).from([table, 'traces']).where(Sql.And(
|
||||
new Sql.In(new Sql.Raw('traces.trace_id'), 'in', new Sql.WithReference(withTraceIds))
|
||||
)).groupBy('traces.trace_id')
|
||||
.orderBy(['start_time_unix_nano', 'desc'])
|
||||
|
@ -1,5 +1,5 @@
|
||||
const parser = require('./parser')
|
||||
const transpiler = require('./clickhouse_transpiler')
|
||||
const { transpile, evaluateCmpl } = require('./clickhouse_transpiler')
|
||||
const logger = require('../lib/logger')
|
||||
const { DATABASE_NAME } = require('../lib/utils')
|
||||
const { clusterName } = require('../common')
|
||||
@ -23,10 +23,94 @@ const search = async (query, limit, from, to) => {
|
||||
tracesAttrsTable: `${_dbname}.tempo_traces_attrs_gin`,
|
||||
from: from,
|
||||
to: to,
|
||||
limit: limit
|
||||
limit: limit,
|
||||
randomFilter: null
|
||||
}
|
||||
const scrpit = parser.ParseScript(query)
|
||||
const planner = transpiler(scrpit.rootToken)
|
||||
const complexity = await evaluateComplexity(ctx, scrpit.rootToken)
|
||||
let res = []
|
||||
if (complexity > 10000000) {
|
||||
res = await processComplexResult(ctx, scrpit.rootToken, complexity)
|
||||
} else {
|
||||
res = await processSmallResult(ctx, scrpit.rootToken)
|
||||
}
|
||||
res.forEach(t =>
|
||||
t.spanSets.forEach(
|
||||
ss => ss.spans.sort(
|
||||
(a, b) => b.startTimeUnixNano.localeCompare(a.startTimeUnixNano))
|
||||
)
|
||||
)
|
||||
return res
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param ctx {Context}
|
||||
* @param script {Token}
|
||||
*/
|
||||
const evaluateComplexity = async (ctx, script) => {
|
||||
const evaluator = evaluateCmpl(script)
|
||||
const sql = evaluator(ctx)
|
||||
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
|
||||
return response.data.data.reduce((acc, row) => Math.max(acc, row.count), 0)
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param ctx {Context}
|
||||
* @param script {Token}
|
||||
* @param complexity {number}
|
||||
*/
|
||||
async function processComplexResult (ctx, script, complexity) {
|
||||
const planner = transpile(script)
|
||||
const maxFilter = Math.floor(complexity / 10000000)
|
||||
let traces = []
|
||||
for (let i = 0; i < maxFilter; i++) {
|
||||
ctx.randomFilter = [maxFilter, i]
|
||||
const sql = planner(ctx)
|
||||
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
|
||||
if (response.data.data.length === parseInt(ctx.limit)) {
|
||||
const minStart = response.data.data.reduce((acc, row) =>
|
||||
acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0
|
||||
)
|
||||
ctx.from = new Date(Math.floor(minStart / 1000000))
|
||||
ctx.randomFilter = null
|
||||
complexity = await evaluateComplexity(ctx, script)
|
||||
if (complexity <= 10000000) {
|
||||
return await processSmallResult(ctx, script)
|
||||
}
|
||||
ctx.randomFilter = [maxFilter, i]
|
||||
}
|
||||
ctx.cachedTraceIds = response.data.data.map(row => row.trace_id)
|
||||
traces = response.data.data.map(row => ({
|
||||
traceID: row.trace_id,
|
||||
rootServiceName: row.root_service_name,
|
||||
rootTraceName: row.root_trace_name,
|
||||
startTimeUnixNano: row.start_time_unix_nano,
|
||||
durationMs: row.duration_ms,
|
||||
spanSets: [
|
||||
{
|
||||
spans: row.span_id.map((spanId, i) => ({
|
||||
spanID: spanId,
|
||||
startTimeUnixNano: row.timestamp_ns[i],
|
||||
durationNanos: row.duration[i],
|
||||
attributes: []
|
||||
})),
|
||||
matched: row.span_id.length
|
||||
}
|
||||
]
|
||||
}))
|
||||
}
|
||||
return traces
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param ctx {Context}
|
||||
* @param script {Token}
|
||||
*/
|
||||
async function processSmallResult (ctx, script) {
|
||||
const planner = transpile(script)
|
||||
const sql = planner(ctx)
|
||||
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
|
||||
const traces = response.data.data.map(row => ({
|
||||
|
@ -13,7 +13,7 @@ cmp ::= "="|"!="|"<="|">="|"<"|">"
|
||||
cmp_val ::= <number> [<measurement>]
|
||||
measurement ::= "ns"|"us"|"ms"|"s"|"m"|"h"|"d"
|
||||
|
||||
label_name ::= ("." | <ALPHA> | "_") *("." | <ALPHA> | "_" | <DIGITS>)
|
||||
label_name ::= ("." | <ALPHA> | "-" | "_") *("." | <ALPHA> | "_" | "-" | <DIGITS>)
|
||||
number ::= ["-"] <DIGITS> ["." <DIGITS>]
|
||||
|
||||
attr_selector ::= <label_name> <OWSP> <op> <OWSP> <value>
|
||||
|
Reference in New Issue
Block a user