mirror of
https://github.com/metrico/qryn.git
synced 2025-03-14 10:07:18 +00:00
correct processing of {a=""} and {a!=""} cases
This commit is contained in:
119
promql/index.js
119
promql/index.js
@ -45,8 +45,7 @@ module.exports.series = async (query, fromMs, toMs) => {
|
||||
const fromS = Math.floor(fromMs / 1000)
|
||||
const toS = Math.floor(toMs / 1000)
|
||||
const matchers = prometheus.pqlMatchers(query)
|
||||
const conds = getMatchersIdxCond(matchers[0])
|
||||
const idx = getIdxSubquery(conds, fromMs, toMs)
|
||||
const idx = getIdxSubqueryV2(matchers[0], fromMs, toMs)
|
||||
const withIdx = new Sql.With('idx', idx, !!clusterName)
|
||||
const req = (new Sql.Select())
|
||||
.with(withIdx)
|
||||
@ -70,51 +69,90 @@ module.exports.series = async (query, fromMs, toMs) => {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param matcher {[string]}
|
||||
*/
|
||||
const getMatcherIdxCond = (matcher) => {
|
||||
const res = [
|
||||
Sql.Eq('key', matcher[0])
|
||||
]
|
||||
switch (matcher[1]) {
|
||||
case '=':
|
||||
res.push(Sql.Eq('val', matcher[2]))
|
||||
break
|
||||
case '!=':
|
||||
res.push(Sql.Ne('val', matcher[2]))
|
||||
break
|
||||
case '=~':
|
||||
res.push(Sql.Eq(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
|
||||
break
|
||||
case '!~':
|
||||
res.push(Sql.Ne(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param matchers {[[string]]}
|
||||
*/
|
||||
const getMatchersIdxCond = (matchers) => {
|
||||
const matchesCond = []
|
||||
for (const matcher of matchers) {
|
||||
const _matcher = [
|
||||
Sql.Eq('key', matcher[0])
|
||||
]
|
||||
switch (matcher[1]) {
|
||||
case '=':
|
||||
_matcher.push(Sql.Eq('val', matcher[2]))
|
||||
break
|
||||
case '!=':
|
||||
_matcher.push(Sql.Ne('val', matcher[2]))
|
||||
break
|
||||
case '=~':
|
||||
_matcher.push(Sql.Eq(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
|
||||
break
|
||||
case '!~':
|
||||
_matcher.push(Sql.Ne(new Sql.Raw(`match(val, ${Sql.quoteVal(matcher[2])})`), 1))
|
||||
}
|
||||
matchesCond.push(Sql.And(..._matcher))
|
||||
}
|
||||
return matchesCond
|
||||
return matchers.map(matcher => Sql.And(...getMatcherIdxCond(matcher)))
|
||||
}
|
||||
|
||||
const getIdxSubquery = (conds, fromMs, toMs) => {
|
||||
const getIdxSubqueryV2 = (matchers, fromMs, toMs) => {
|
||||
const fromS = Math.floor(fromMs / 1000)
|
||||
const toS = Math.floor(toMs / 1000)
|
||||
return (new Sql.Select())
|
||||
.select('fingerprint')
|
||||
.from([DATABASE_NAME() + '.time_series_gin', 'time_series_gin'])
|
||||
.where(Sql.And(
|
||||
Sql.Or(...conds),
|
||||
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
|
||||
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`)),
|
||||
new Sql.In('type', 'in', [bothType, metricType])))
|
||||
.having(
|
||||
Sql.Eq(
|
||||
new Sql.Raw('groupBitOr(' + conds.map(
|
||||
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
|
||||
).join('+') + ')'), (1 << conds.length) - 1)
|
||||
).groupBy('fingerprint')
|
||||
const nonEmptyMatchers = matchers.filter(m => m[2] !== '')
|
||||
const emptyMatchers = matchers.filter(m => m[2] === '' && ['=', '!='].includes(m[1]))
|
||||
let req = null
|
||||
if (nonEmptyMatchers.length) {
|
||||
const nonEmptyConds = getMatchersIdxCond(nonEmptyMatchers)
|
||||
req = (new Sql.Select())
|
||||
.select('fingerprint')
|
||||
.from([DATABASE_NAME() + '.time_series_gin', 'time_series_gin'])
|
||||
.where(Sql.And(
|
||||
Sql.Or(...nonEmptyConds),
|
||||
Sql.Gte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${fromS}))`)),
|
||||
Sql.Lte('date', new Sql.Raw(`toDate(fromUnixTimestamp(${toS}))`)),
|
||||
new Sql.In('type', 'in', [bothType, metricType])))
|
||||
.having(
|
||||
Sql.Eq(
|
||||
new Sql.Raw('groupBitOr(' + nonEmptyConds.map(
|
||||
(m, i) => new Sql.Raw(`bitShiftLeft((${m})::UInt64, ${i})`)
|
||||
).join('+') + ')'), (1 << nonEmptyConds.length) - 1)
|
||||
).groupBy('fingerprint')
|
||||
}
|
||||
if (emptyMatchers.length) {
|
||||
const emptyConds = emptyMatchers.map(m => {
|
||||
const visitParamHas = new Sql.Raw('')
|
||||
visitParamHas.toString = function () {
|
||||
return `visitParamHas(labels, ${Sql.quoteVal(m[0])})`
|
||||
}
|
||||
switch (m[1]) {
|
||||
case '=':
|
||||
return Sql.Eq(visitParamHas, new Sql.Raw('0'))
|
||||
case '!=':
|
||||
return Sql.Ne(visitParamHas, new Sql.Raw('1'))
|
||||
default:
|
||||
return null
|
||||
}
|
||||
}).filter(m => !!m)
|
||||
const emptyReq = (new Sql.Select())
|
||||
.select('fingerprint')
|
||||
.from(`time_series${_dist}`)
|
||||
.where(Sql.And(...emptyConds))
|
||||
if (nonEmptyMatchers.length) {
|
||||
const withNonEmptyIdx = new Sql.With('nonEmptyIdx', req, !!clusterName)
|
||||
emptyReq.with(withNonEmptyIdx)
|
||||
.where(
|
||||
new Sql.In('fingerprint', 'in', new Sql.WithReference(withNonEmptyIdx))
|
||||
)
|
||||
}
|
||||
req = emptyReq
|
||||
}
|
||||
return req
|
||||
}
|
||||
|
||||
module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
|
||||
@ -126,8 +164,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
|
||||
null, db, { responseType: 'arraybuffer' })
|
||||
return new Uint8Array(data.data)
|
||||
}
|
||||
const matches = getMatchersIdxCond(matchers)
|
||||
const idx = getIdxSubquery(matches, fromMs, toMs)
|
||||
const idx = getIdxSubqueryV2(matchers, fromMs, toMs)
|
||||
const withIdx = new Sql.With('idx', idx, !!clusterName)
|
||||
const timeSeries = (new Sql.Select())
|
||||
.select(
|
||||
@ -136,7 +173,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
|
||||
).from(DATABASE_NAME() + '.time_series')
|
||||
.where(Sql.And(
|
||||
new Sql.In('fingerprint', 'in', new Sql.WithReference(withIdx)),
|
||||
new Sql.In('type', 'in', [bothType,metricType])))
|
||||
new Sql.In('type', 'in', [bothType, metricType])))
|
||||
const withTimeSeries = new Sql.With('timeSeries', timeSeries, !!clusterName)
|
||||
const raw = (new Sql.Select())
|
||||
.with(withIdx)
|
||||
|
Reference in New Issue
Block a user