mirror of
https://github.com/metrico/qryn.git
synced 2025-03-15 10:14:19 +00:00
@ -117,8 +117,15 @@ const getIdxSubquery = (conds, fromMs, toMs) => {
|
||||
).groupBy('fingerprint')
|
||||
}
|
||||
|
||||
module.exports.getData = async (matchers, fromMs, toMs) => {
|
||||
module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
|
||||
const db = DATABASE_NAME()
|
||||
const subq = (subqueries || {})[getMetricName(matchers)]
|
||||
if (subq) {
|
||||
console.log(subq)
|
||||
const data = await rawRequest(subq + ' FORMAT RowBinary',
|
||||
null, db, { responseType: 'arraybuffer' })
|
||||
return new Uint8Array(data.data)
|
||||
}
|
||||
const matches = getMatchersIdxCond(matchers)
|
||||
const idx = getIdxSubquery(matches, fromMs, toMs)
|
||||
const withIdx = new Sql.With('idx', idx, !!clusterName)
|
||||
@ -176,4 +183,12 @@ module.exports.getData = async (matchers, fromMs, toMs) => {
|
||||
return new Uint8Array(data.data)
|
||||
}
|
||||
|
||||
function getMetricName(matchers) {
|
||||
for (const matcher of matchers) {
|
||||
if (matcher[0] === '__name__' && matcher[1] === '=') {
|
||||
return matcher[2]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prometheus.getData = module.exports.getData
|
||||
|
@ -44,3 +44,8 @@ it('traceql: max duration', () => {
|
||||
const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms')
|
||||
expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms')
|
||||
})
|
||||
|
||||
it('traceql: select', () => {
|
||||
const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | select(a, b)')
|
||||
expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | select(a, b)')
|
||||
})
|
||||
|
@ -292,6 +292,9 @@ module.exports.Planner = class Planner {
|
||||
if (!agg) {
|
||||
return
|
||||
}
|
||||
if (['count', 'sum', 'min', 'max', 'avg'].indexOf(agg.Child('fn').value) < 0) {
|
||||
return
|
||||
}
|
||||
this.aggFn = agg.Child('fn').value
|
||||
const labelName = agg.Child('attr').Child('label_name')
|
||||
this.aggregatedAttr = labelName ? labelName.value : ''
|
||||
|
@ -24,7 +24,9 @@ const processFn = (sel, ctx) => {
|
||||
[new Sql.Raw(
|
||||
'toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000'
|
||||
), 'duration_ms'],
|
||||
[new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name']
|
||||
[new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name'],
|
||||
[new Sql.Raw(`groupArrayIf(base64Encode(traces.payload), (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload'],
|
||||
[new Sql.Raw(`groupArrayIf(traces.payload_type, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload_type']
|
||||
).from([table, 'traces']).where(Sql.And(
|
||||
new Sql.In(new Sql.Raw('traces.trace_id'), 'in', new Sql.WithReference(withTraceIds))
|
||||
)).groupBy('traces.trace_id')
|
||||
|
@ -4,6 +4,7 @@ const logger = require('../lib/logger')
|
||||
const { DATABASE_NAME } = require('../lib/utils')
|
||||
const { clusterName } = require('../common')
|
||||
const { rawRequest } = require('../lib/db/clickhouse')
|
||||
const { postProcess } = require('./post_processor')
|
||||
|
||||
/**
|
||||
*
|
||||
@ -39,12 +40,14 @@ const search = async (query, limit, from, to) => {
|
||||
} else {
|
||||
res = await processSmallResult(ctx, scrpit.rootToken)
|
||||
}
|
||||
res = postProcess(res, scrpit.rootToken)
|
||||
res.forEach(t =>
|
||||
t.spanSets.forEach(
|
||||
ss => ss.spans.sort(
|
||||
(a, b) => b.startTimeUnixNano.localeCompare(a.startTimeUnixNano))
|
||||
)
|
||||
)
|
||||
console.log(JSON.stringify(res, 2))
|
||||
return res
|
||||
}
|
||||
|
||||
@ -70,11 +73,12 @@ const evaluateComplexity = async (ctx, script) => {
|
||||
async function processComplexResult (ctx, script, complexity) {
|
||||
const planner = ctx.planner.plan()
|
||||
const maxFilter = Math.floor(complexity / 10000000)
|
||||
let traces = []
|
||||
//let traces = []
|
||||
let response = null
|
||||
for (let i = 0; i < maxFilter; i++) {
|
||||
ctx.randomFilter = [maxFilter, i]
|
||||
const sql = planner(ctx)
|
||||
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
|
||||
response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
|
||||
if (response.data.data.length === parseInt(ctx.limit)) {
|
||||
const minStart = response.data.data.reduce((acc, row) =>
|
||||
acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0
|
||||
@ -88,7 +92,7 @@ async function processComplexResult (ctx, script, complexity) {
|
||||
ctx.randomFilter = [maxFilter, i]
|
||||
}
|
||||
ctx.cachedTraceIds = response.data.data.map(row => row.trace_id)
|
||||
traces = response.data.data.map(row => ({
|
||||
/*traces = response.data.data.map(row => ({
|
||||
traceID: row.trace_id,
|
||||
rootServiceName: row.root_service_name,
|
||||
rootTraceName: row.root_trace_name,
|
||||
@ -105,9 +109,9 @@ async function processComplexResult (ctx, script, complexity) {
|
||||
matched: row.span_id.length
|
||||
}
|
||||
]
|
||||
}))
|
||||
}))*/
|
||||
}
|
||||
return traces
|
||||
return response.data.data
|
||||
}
|
||||
|
||||
/**
|
||||
@ -119,7 +123,7 @@ async function processSmallResult (ctx, script) {
|
||||
const planner = ctx.planner.plan()
|
||||
const sql = planner(ctx)
|
||||
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
|
||||
const traces = response.data.data.map(row => ({
|
||||
/*const traces = response.data.data.map(row => ({
|
||||
traceID: row.trace_id,
|
||||
rootServiceName: row.root_service_name,
|
||||
rootTraceName: row.root_trace_name,
|
||||
@ -146,8 +150,8 @@ async function processSmallResult (ctx, script) {
|
||||
matched: row.span_id.length
|
||||
}
|
||||
]
|
||||
}))
|
||||
return traces
|
||||
}))*/
|
||||
return response.data.data
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
74
traceql/post_processor/index.js
Normal file
74
traceql/post_processor/index.js
Normal file
@ -0,0 +1,74 @@
|
||||
const Otlp = require('../../lib/db/otlp')
|
||||
const Zipkin = require('../../lib/db/zipkin')
|
||||
const protobufjs = require('protobufjs')
|
||||
const path = require('path')
|
||||
const OTLPSpan = protobufjs.loadSync(path.join(__dirname, '..', '..',
|
||||
'lib', 'otlp.proto')).lookupType('Span')
|
||||
/**
|
||||
*
|
||||
* @param rows {Row[]}
|
||||
* @param script {Token}
|
||||
*/
|
||||
function postProcess (rows, script) {
|
||||
const selectAttrs = script.Children('aggregator')
|
||||
.filter(x => x.Child('fn').value === 'select')
|
||||
.map(x => x.Children('label_name'))
|
||||
.reduce((acc, x) => {
|
||||
let attrs = x.map(y => ({
|
||||
name: y.value,
|
||||
path: y.value.split('.').filter(y => y)
|
||||
}))
|
||||
if (attrs[0] === 'span' || attrs[0] === 'resource') {
|
||||
attrs = attrs.slice(1)
|
||||
}
|
||||
return [...acc, ...attrs]
|
||||
}, [])
|
||||
rows = rows.map(row => ({
|
||||
...row,
|
||||
objs: row.payload.map((payload, i) => {
|
||||
let span = null
|
||||
switch (row.payload_type[i]) {
|
||||
case 1:
|
||||
return new Zipkin(JSON.parse(Buffer.from(payload, 'base64').toString()))
|
||||
case 2:
|
||||
span = OTLPSpan.toObject(
|
||||
OTLPSpan.decode(Buffer.from(payload, 'base64')), {
|
||||
longs: String,
|
||||
bytes: String
|
||||
})
|
||||
return new Otlp(span)
|
||||
}
|
||||
return null
|
||||
})
|
||||
}))
|
||||
const spans = (row) => row.span_id.map((spanId, i) => ({
|
||||
spanID: spanId,
|
||||
startTimeUnixNano: row.timestamp_ns[i],
|
||||
durationNanos: row.duration[i],
|
||||
attributes: selectAttrs.map(attr => ({
|
||||
key: attr.name,
|
||||
value: {
|
||||
stringValue: (row.objs[i].tags.find(t => t[0] === attr.path.join('.')) || [null, null])[1]
|
||||
}
|
||||
})).filter(x => x.value.stringValue)
|
||||
}))
|
||||
const traces = rows.map(row => ({
|
||||
traceID: row.trace_id,
|
||||
rootServiceName: row.root_service_name,
|
||||
rootTraceName: row.root_trace_name,
|
||||
startTimeUnixNano: row.start_time_unix_nano,
|
||||
durationMs: row.duration_ms,
|
||||
spanSet: { spans: spans(row) },
|
||||
spanSets: [
|
||||
{
|
||||
spans: spans(row),
|
||||
matched: row.span_id.length
|
||||
}
|
||||
]
|
||||
}))
|
||||
return traces
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
postProcess
|
||||
}
|
11
traceql/post_processor/types.d.ts
vendored
Normal file
11
traceql/post_processor/types.d.ts
vendored
Normal file
@ -0,0 +1,11 @@
|
||||
export interface Row {
|
||||
trace_id: string;
|
||||
span_id: string[];
|
||||
duration: string[];
|
||||
timestamp_ns: string[];
|
||||
start_time_unix_nano: string;
|
||||
duration_ms: number;
|
||||
root_service_name: string;
|
||||
payload: string[];
|
||||
payload_type: number[];
|
||||
}
|
@ -6,14 +6,15 @@ complex_head ::= "(" <OWSP> <attr_selector_exp> <OWSP> ")"
|
||||
tail ::= <attr_selector_exp>
|
||||
and_or ::= "&&" | "||"
|
||||
|
||||
aggregator ::= "|" <OWSP> <fn> <OWSP> <attr> <OWSP> <cmp> <OWSP> <cmp_val>
|
||||
fn ::= "count"|"sum"|"min"|"max"|"avg"
|
||||
attr ::= "(" <OWSP> [ <label_name> ] <OWSP> ")"
|
||||
aggregator ::= "|" <OWSP> <fn> <OWSP> <attr> <OWSP> [ <cmp> <OWSP> <cmp_val> ]
|
||||
fn ::= "count"|"sum"|"min"|"max"|"avg"|"select"
|
||||
attr ::= "(" <OWSP> [ <label_names> ] <OWSP> ")"
|
||||
cmp ::= "="|"!="|"<="|">="|"<"|">"
|
||||
cmp_val ::= <number> [<measurement>]
|
||||
measurement ::= "ns"|"us"|"ms"|"s"|"m"|"h"|"d"
|
||||
|
||||
label_name ::= ("." | <ALPHA> | "-" | "_") *("." | <ALPHA> | "_" | "-" | <DIGITS>)
|
||||
label_names ::= <label_name> *(<OWSP> "," <OWSP> <label_name>)
|
||||
number ::= ["-"] <DIGITS> ["." <DIGITS>]
|
||||
|
||||
attr_selector ::= <label_name> <OWSP> <op> <OWSP> <value>
|
||||
|
@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
gcContext "github.com/metrico/micro-gc/context"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
@ -15,6 +13,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
"unsafe"
|
||||
promql2 "wasm_parts/promql"
|
||||
shared2 "wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
parser2 "wasm_parts/traceql/parser"
|
||||
traceql_transpiler "wasm_parts/traceql/transpiler"
|
||||
@ -165,12 +165,12 @@ func stats() {
|
||||
}
|
||||
|
||||
//export pqlRangeQuery
|
||||
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 {
|
||||
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64, optimizable uint32) uint32 {
|
||||
ctxId := gcContext.GetContextID()
|
||||
gcContext.SetContext(id)
|
||||
defer gcContext.SetContext(ctxId)
|
||||
|
||||
return pql(id, data[id], func() (promql.Query, error) {
|
||||
return pql(id, data[id], optimizable != 0, int64(fromMS), int64(toMS), int64(stepMS), func() (promql.Query, error) {
|
||||
queriable := &TestQueryable{id: id, stepMs: int64(stepMS)}
|
||||
return getEng().NewRangeQuery(
|
||||
queriable,
|
||||
@ -184,19 +184,20 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint
|
||||
}
|
||||
|
||||
//export pqlInstantQuery
|
||||
func pqlInstantQuery(id uint32, timeMS float64) uint32 {
|
||||
func pqlInstantQuery(id uint32, timeMS float64, optimizable uint32) uint32 {
|
||||
ctxId := gcContext.GetContextID()
|
||||
gcContext.SetContext(id)
|
||||
defer gcContext.SetContext(ctxId)
|
||||
|
||||
return pql(id, data[id], func() (promql.Query, error) {
|
||||
queriable := &TestQueryable{id: id, stepMs: 15000}
|
||||
return getEng().NewInstantQuery(
|
||||
queriable,
|
||||
nil,
|
||||
string(data[id].request),
|
||||
time.Unix(0, int64(timeMS)*1000000))
|
||||
})
|
||||
return pql(id, data[id], optimizable != 0, int64(timeMS-300000), int64(timeMS), 15000,
|
||||
func() (promql.Query, error) {
|
||||
queriable := &TestQueryable{id: id, stepMs: 15000}
|
||||
return getEng().NewInstantQuery(
|
||||
queriable,
|
||||
nil,
|
||||
string(data[id].request),
|
||||
time.Unix(0, int64(timeMS)*1000000))
|
||||
})
|
||||
}
|
||||
|
||||
//export pqlSeries
|
||||
@ -255,13 +256,16 @@ func wrapErrorStr(err error) string {
|
||||
return err.Error()
|
||||
}
|
||||
|
||||
func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
|
||||
func pql(id uint32, c *ctx, optimizable bool,
|
||||
fromMs int64, toMs int64, stepMs int64,
|
||||
query func() (promql.Query, error)) uint32 {
|
||||
rq, err := query()
|
||||
|
||||
if err != nil {
|
||||
c.response = wrapError(err)
|
||||
return 1
|
||||
}
|
||||
|
||||
var walk func(node parser.Node, i func(node parser.Node))
|
||||
walk = func(node parser.Node, i func(node parser.Node)) {
|
||||
i(node)
|
||||
@ -269,9 +273,39 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
|
||||
walk(n, i)
|
||||
}
|
||||
}
|
||||
|
||||
maxDurationMs := getMaxDurationMs(rq.Statement())
|
||||
fromMs -= maxDurationMs
|
||||
|
||||
subsels := strings.Builder{}
|
||||
subsels.WriteString("{")
|
||||
if optimizable {
|
||||
var (
|
||||
subselsMap map[string]string
|
||||
err error
|
||||
)
|
||||
subselsMap, rq, err = optimizeQuery(rq, fromMs, toMs, stepMs)
|
||||
if err != nil {
|
||||
c.response = wrapError(err)
|
||||
return 1
|
||||
}
|
||||
i := 0
|
||||
for k, v := range subselsMap {
|
||||
if i != 0 {
|
||||
subsels.WriteString(",")
|
||||
}
|
||||
subsels.WriteString(fmt.Sprintf(`%s:%s`, strconv.Quote(k), strconv.Quote(v)))
|
||||
i++
|
||||
}
|
||||
}
|
||||
subsels.WriteString("}")
|
||||
|
||||
matchersJSON := getmatchersJSON(rq)
|
||||
|
||||
c.response = []byte(matchersJSON)
|
||||
c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s, "fromMs": %d}`,
|
||||
subsels.String(),
|
||||
matchersJSON,
|
||||
fromMs))
|
||||
c.onDataLoad = func(c *ctx) {
|
||||
ctxId := gcContext.GetContextID()
|
||||
gcContext.SetContext(id)
|
||||
@ -284,6 +318,83 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func getMaxDurationMs(q parser.Node) int64 {
|
||||
maxDurationMs := int64(0)
|
||||
for _, c := range parser.Children(q) {
|
||||
_m := getMaxDurationMs(c)
|
||||
if _m > maxDurationMs {
|
||||
maxDurationMs = _m
|
||||
}
|
||||
}
|
||||
ms, _ := q.(*parser.MatrixSelector)
|
||||
if ms != nil && maxDurationMs < ms.Range.Milliseconds() {
|
||||
return ms.Range.Milliseconds()
|
||||
}
|
||||
return maxDurationMs
|
||||
}
|
||||
|
||||
func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[string]string, promql.Query, error) {
|
||||
appliableNodes := findAppliableNodes(q.Statement(), nil)
|
||||
var err error
|
||||
subsels := make(map[string]string)
|
||||
for _, m := range appliableNodes {
|
||||
fmt.Println(m)
|
||||
opt := m.optimizer
|
||||
opt = &promql2.FinalizerOptimizer{
|
||||
SubOptimizer: opt,
|
||||
}
|
||||
opt, err = promql2.PlanOptimize(m.node, opt)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
planner, err := opt.Optimize(m.node)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano())
|
||||
swapChild(m.parent, m.node, &parser.VectorSelector{
|
||||
Name: fakeMetric,
|
||||
OriginalOffset: 0,
|
||||
Offset: 0,
|
||||
Timestamp: nil,
|
||||
StartOrEnd: 0,
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
{
|
||||
Type: labels.MatchEqual,
|
||||
Name: "__name__",
|
||||
Value: fakeMetric,
|
||||
},
|
||||
},
|
||||
UnexpandedSeriesSet: nil,
|
||||
Series: nil,
|
||||
PosRange: parser.PositionRange{},
|
||||
})
|
||||
sel, err := planner.Process(&shared2.PlannerContext{
|
||||
IsCluster: false,
|
||||
From: time.Unix(0, fromMs*1000000),
|
||||
To: time.Unix(0, toMs*1000000),
|
||||
Step: time.Millisecond * 15000, /*time.Duration(stepMs)*/
|
||||
TimeSeriesTable: "time_series",
|
||||
TimeSeriesDistTable: "time_series_dist",
|
||||
TimeSeriesGinTable: "time_series_gin",
|
||||
MetricsTable: "metrics_15s",
|
||||
MetricsDistTable: "metrics_15s_dist",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
strSel, err := sel.String(&sql.Ctx{
|
||||
Params: map[string]sql.SQLObject{},
|
||||
Result: map[string]sql.SQLObject{},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
subsels[fakeMetric] = strSel
|
||||
}
|
||||
return subsels, q, nil
|
||||
}
|
||||
|
||||
//export onDataLoad
|
||||
func onDataLoad(idx uint32) {
|
||||
data[idx].onDataLoad(data[idx])
|
||||
@ -358,9 +469,101 @@ func writeVector(v promql.Vector) string {
|
||||
}
|
||||
|
||||
func main() {
|
||||
p := sync.Pool{}
|
||||
a := p.Get()
|
||||
_ = a
|
||||
}
|
||||
|
||||
func getOptimizer(n parser.Node) promql2.IOptimizer {
|
||||
for _, f := range promql2.Optimizers {
|
||||
opt := f()
|
||||
if opt.IsAppliable(n) {
|
||||
return opt
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isRate(node parser.Node) (bool, bool) {
|
||||
opt := getOptimizer(node)
|
||||
if opt == nil {
|
||||
return false, true
|
||||
}
|
||||
return true, false
|
||||
}
|
||||
|
||||
type MatchNode struct {
|
||||
node parser.Node
|
||||
parent parser.Node
|
||||
optimizer promql2.IOptimizer
|
||||
}
|
||||
|
||||
func findAppliableNodes(root parser.Node, parent parser.Node) []MatchNode {
|
||||
var res []MatchNode
|
||||
optimizer := getOptimizer(root)
|
||||
if optimizer != nil {
|
||||
res = append(res, MatchNode{
|
||||
node: root,
|
||||
parent: parent,
|
||||
optimizer: optimizer,
|
||||
})
|
||||
return res
|
||||
}
|
||||
for _, n := range parser.Children(root) {
|
||||
res = append(res, findAppliableNodes(n, root)...)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func swapChild(node parser.Node, child parser.Node, newChild parser.Expr) {
|
||||
// For some reasons these switches have significantly better performance than interfaces
|
||||
switch n := node.(type) {
|
||||
case *parser.EvalStmt:
|
||||
n.Expr = newChild
|
||||
case parser.Expressions:
|
||||
for i, e := range n {
|
||||
if e.String() == child.String() {
|
||||
n[i] = newChild
|
||||
}
|
||||
}
|
||||
case *parser.AggregateExpr:
|
||||
if n.Expr == nil && n.Param == nil {
|
||||
return
|
||||
} else if n.Expr == nil {
|
||||
n.Param = newChild
|
||||
} else if n.Param == nil {
|
||||
n.Expr = newChild
|
||||
} else {
|
||||
if n.Expr.String() == child.String() {
|
||||
n.Expr = newChild
|
||||
} else {
|
||||
n.Param = newChild
|
||||
}
|
||||
}
|
||||
case *parser.BinaryExpr:
|
||||
if n.LHS.String() == child.String() {
|
||||
n.LHS = newChild
|
||||
} else if n.RHS.String() == child.String() {
|
||||
n.RHS = newChild
|
||||
}
|
||||
case *parser.Call:
|
||||
for i, e := range n.Args {
|
||||
if e.String() == child.String() {
|
||||
n.Args[i] = newChild
|
||||
}
|
||||
}
|
||||
case *parser.SubqueryExpr:
|
||||
n.Expr = newChild
|
||||
case *parser.ParenExpr:
|
||||
n.Expr = newChild
|
||||
case *parser.UnaryExpr:
|
||||
n.Expr = newChild
|
||||
case *parser.MatrixSelector:
|
||||
n.VectorSelector = newChild
|
||||
case *parser.StepInvariantExpr:
|
||||
n.Expr = newChild
|
||||
}
|
||||
}
|
||||
|
||||
func getChildren(e parser.Node) []parser.Node {
|
||||
return parser.Children(e)
|
||||
}
|
||||
|
||||
type TestLogger struct{}
|
||||
@ -444,10 +647,11 @@ type TestSeries struct {
|
||||
data []byte
|
||||
stepMs int64
|
||||
|
||||
labels labels.Labels
|
||||
tsMs int64
|
||||
val float64
|
||||
i int
|
||||
labels labels.Labels
|
||||
tsMs int64
|
||||
val float64
|
||||
lastValTs int64
|
||||
i int
|
||||
|
||||
state int
|
||||
}
|
||||
@ -469,11 +673,14 @@ func (t *TestSeries) Next() bool {
|
||||
t.tsMs += t.stepMs
|
||||
if t.tsMs >= ts {
|
||||
t.state = 0
|
||||
} else if t.lastValTs+300000 < t.tsMs {
|
||||
t.state = 0
|
||||
}
|
||||
}
|
||||
if t.state == 0 {
|
||||
t.tsMs = ts
|
||||
t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
|
||||
t.lastValTs = t.tsMs
|
||||
t.i++
|
||||
t.state = 1
|
||||
}
|
||||
@ -585,3 +792,17 @@ func matchers2Str(labelMatchers []*labels.Matcher) string {
|
||||
matchersJson.WriteString("]")
|
||||
return matchersJson.String()
|
||||
}
|
||||
|
||||
type pqlRequest struct {
|
||||
optimizable bool
|
||||
body string
|
||||
}
|
||||
|
||||
func (p *pqlRequest) Read(body []byte) {
|
||||
r := BinaryReader{buffer: body}
|
||||
p.optimizable = r.ReadULeb32() != 0
|
||||
p.body = r.ReadString()
|
||||
if !p.optimizable {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -61,8 +61,8 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
|
||||
const end = endMs || Date.now()
|
||||
const step = stepMs || 15000
|
||||
return await pql(query,
|
||||
(ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step),
|
||||
(matchers) => getData(matchers, start, end))
|
||||
(ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
|
||||
(matchers, subq, startMs) => getData(matchers, startMs, end, subq))
|
||||
}
|
||||
|
||||
/**
|
||||
@ -75,9 +75,10 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
|
||||
module.exports.pqlInstantQuery = async (query, timeMs, getData) => {
|
||||
const time = timeMs || Date.now()
|
||||
const _wasm = getWasm()
|
||||
const start = time - 300000
|
||||
return await pql(query,
|
||||
(ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time),
|
||||
(matchers) => getData(matchers, time - 300000, time))
|
||||
(ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
|
||||
(matchers, subq, start) => getData(matchers, start, time, subq))
|
||||
}
|
||||
|
||||
module.exports.pqlMatchers = (query) => {
|
||||
@ -162,8 +163,8 @@ const pql = async (query, wasmCall, getData) => {
|
||||
const matchersObj = JSON.parse(ctx.read())
|
||||
|
||||
const matchersResults = await Promise.all(
|
||||
matchersObj.map(async (matchers, i) => {
|
||||
const data = await getData(matchers)
|
||||
matchersObj.matchers.map(async (matchers, i) => {
|
||||
const data = await getData(matchers, matchersObj.subqueries, matchersObj.fromMs)
|
||||
return { matchers, data }
|
||||
}))
|
||||
|
||||
|
Binary file not shown.
71
wasm_parts/promql/aggregate.go
Normal file
71
wasm_parts/promql/aggregate.go
Normal file
@ -0,0 +1,71 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type AggregateOptimizer struct {
|
||||
WithLabelsIn string
|
||||
WithLabelsOut string
|
||||
|
||||
subOptimizer IOptimizer
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) IsAppliable(node parser.Node) bool {
|
||||
aggExpr, ok := node.(*parser.AggregateExpr)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if aggExpr.Op != parser.SUM {
|
||||
return false
|
||||
}
|
||||
return GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory)) != nil
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) PlanOptimize(node parser.Node) error {
|
||||
aggExpr := node.(*parser.AggregateExpr)
|
||||
a.subOptimizer = GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory))
|
||||
return a.subOptimizer.PlanOptimize(node)
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
aggExpr := node.(*parser.AggregateExpr)
|
||||
planner, err := a.subOptimizer.Optimize(aggExpr.Expr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
withLabelsIn := a.WithLabelsIn
|
||||
if withLabelsIn == "" {
|
||||
planner = &planners.LabelsInitPlanner{
|
||||
Main: planner,
|
||||
FingerprintsAlias: "fp_sel",
|
||||
}
|
||||
withLabelsIn = "labels"
|
||||
}
|
||||
if a.WithLabelsOut == "" {
|
||||
return nil, fmt.Errorf("AggregateOptimizer.WithLabelsOut is empty")
|
||||
}
|
||||
byWithout := "by"
|
||||
if aggExpr.Without {
|
||||
byWithout = "without"
|
||||
}
|
||||
planner = &planners.ByWithoutPlanner{
|
||||
Main: planner,
|
||||
FingerprintWithName: withLabelsIn,
|
||||
FingerprintsOutName: a.WithLabelsOut,
|
||||
ByWithout: byWithout,
|
||||
Labels: aggExpr.Grouping,
|
||||
}
|
||||
planner = &planners.SumPlanner{
|
||||
Main: planner,
|
||||
LabelsAlias: a.WithLabelsOut,
|
||||
}
|
||||
return planner, nil
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) Children() []IOptimizer {
|
||||
return []IOptimizer{a.subOptimizer}
|
||||
}
|
45
wasm_parts/promql/finalize.go
Normal file
45
wasm_parts/promql/finalize.go
Normal file
@ -0,0 +1,45 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type FinalizerOptimizer struct {
|
||||
LabelsIn string
|
||||
SubOptimizer IOptimizer
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) IsAppliable(node parser.Node) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
planner, err := f.SubOptimizer.Optimize(node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
labelsIn := f.LabelsIn
|
||||
if labelsIn == "" {
|
||||
planner = &planners.LabelsInitPlanner{
|
||||
Main: planner,
|
||||
FingerprintsAlias: "fp_sel",
|
||||
}
|
||||
labelsIn = "labels"
|
||||
}
|
||||
|
||||
planner = &planners.FinalizePlanner{
|
||||
LabelsAlias: labelsIn,
|
||||
Main: planner,
|
||||
}
|
||||
return planner, nil
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) PlanOptimize(node parser.Node) error {
|
||||
return f.SubOptimizer.PlanOptimize(node)
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) Children() []IOptimizer {
|
||||
return []IOptimizer{f.SubOptimizer}
|
||||
}
|
37
wasm_parts/promql/optimize.go
Normal file
37
wasm_parts/promql/optimize.go
Normal file
@ -0,0 +1,37 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
)
|
||||
|
||||
func PlanOptimize(node parser.Node, optimizer IOptimizer) (IOptimizer, error) {
|
||||
err := optimizer.PlanOptimize(node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var checkLabelAliases func(opt IOptimizer, i int) int
|
||||
checkLabelAliases = func(opt IOptimizer, i int) int {
|
||||
var _i int
|
||||
for _, c := range opt.Children() {
|
||||
_i = checkLabelAliases(c, i)
|
||||
}
|
||||
switch opt.(type) {
|
||||
case *AggregateOptimizer:
|
||||
if _i != 0 {
|
||||
opt.(*AggregateOptimizer).WithLabelsIn = fmt.Sprintf("labels_", _i)
|
||||
}
|
||||
opt.(*AggregateOptimizer).WithLabelsOut = fmt.Sprintf("labels_%d", _i+1)
|
||||
_i++
|
||||
case *FinalizerOptimizer:
|
||||
if _i != 0 {
|
||||
opt.(*FinalizerOptimizer).LabelsIn = fmt.Sprintf("labels_%d", _i)
|
||||
}
|
||||
_i++
|
||||
}
|
||||
return _i
|
||||
}
|
||||
checkLabelAliases(optimizer, 0)
|
||||
return optimizer, nil
|
||||
}
|
48
wasm_parts/promql/planners/aggregate.go
Normal file
48
wasm_parts/promql/planners/aggregate.go
Normal file
@ -0,0 +1,48 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type SumPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
LabelsAlias string
|
||||
}
|
||||
|
||||
func (s *SumPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := s.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var withLabels *sql.With
|
||||
for _, w := range main.GetWith() {
|
||||
if w.GetAlias() == s.LabelsAlias {
|
||||
withLabels = w
|
||||
break
|
||||
}
|
||||
}
|
||||
if withLabels == nil {
|
||||
return nil, fmt.Errorf("labels subrequest not found")
|
||||
}
|
||||
withMain := sql.NewWith(main, "pre_sum")
|
||||
|
||||
res := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol(withLabels.GetAlias()+".new_fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("pre_sum.timestamp_ms", "timestamp_ms"),
|
||||
sql.NewSimpleCol("sum(pre_sum.value)", "value")).
|
||||
From(sql.NewWithRef(withMain)).
|
||||
Join(sql.NewJoin(
|
||||
"ANY LEFT",
|
||||
sql.NewWithRef(withLabels),
|
||||
sql.Eq(
|
||||
sql.NewRawObject("pre_sum.fingerprint"),
|
||||
sql.NewRawObject(withLabels.GetAlias()+".fingerprint")))).
|
||||
GroupBy(
|
||||
sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint"),
|
||||
sql.NewRawObject("pre_sum.timestamp_ms"))
|
||||
return res, nil
|
||||
}
|
59
wasm_parts/promql/planners/by_without.go
Normal file
59
wasm_parts/promql/planners/by_without.go
Normal file
@ -0,0 +1,59 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type ByWithoutPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
FingerprintWithName string
|
||||
FingerprintsOutName string
|
||||
ByWithout string
|
||||
Labels []string
|
||||
}
|
||||
|
||||
func (b *ByWithoutPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := b.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var fp *sql.With
|
||||
withs := main.GetWith()
|
||||
for _, w := range withs {
|
||||
if w.GetAlias() == b.FingerprintWithName {
|
||||
fp = w
|
||||
break
|
||||
}
|
||||
}
|
||||
if fp == nil {
|
||||
return nil, fmt.Errorf("fingerprints subrequest not found")
|
||||
}
|
||||
labelsCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
cond := "IN"
|
||||
if b.ByWithout == "without" {
|
||||
cond = "NOT IN"
|
||||
}
|
||||
values := make([]string, len(b.Labels))
|
||||
var err error
|
||||
for i, l := range b.Labels {
|
||||
values[i], err = sql.NewStringVal(l).String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("mapFilter((k,v) -> k %s (%s), labels)", cond, strings.Join(values, ",")), nil
|
||||
})
|
||||
newFpCol := "cityHash64(arraySort(arrayZip(mapKeys(labels), mapValues(labels))))"
|
||||
newFp := sql.NewSelect().
|
||||
Select(
|
||||
sql.NewSimpleCol(fp.GetAlias()+".new_fingerprint", "fingerprint"),
|
||||
sql.NewCol(labelsCol, "labels"),
|
||||
sql.NewSimpleCol(newFpCol, "new_fingerprint"),
|
||||
).
|
||||
From(sql.NewWithRef(fp))
|
||||
withNewFp := sql.NewWith(newFp, b.FingerprintsOutName)
|
||||
return main.AddWith(withNewFp), nil
|
||||
}
|
47
wasm_parts/promql/planners/finalize.go
Normal file
47
wasm_parts/promql/planners/finalize.go
Normal file
@ -0,0 +1,47 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type FinalizePlanner struct {
|
||||
LabelsAlias string
|
||||
Main shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (f *FinalizePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := f.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var withLabels *sql.With
|
||||
for _, w := range main.GetWith() {
|
||||
if w.GetAlias() == f.LabelsAlias {
|
||||
withLabels = w
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if withLabels == nil {
|
||||
return nil, fmt.Errorf("FinalizePlanner.Process: %s CTE not found", f.LabelsAlias)
|
||||
}
|
||||
|
||||
withMain := sql.NewWith(main, "pre_final")
|
||||
res := sql.NewSelect().With(withMain).Select(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol(withLabels.GetAlias()+".labels", "labels"),
|
||||
sql.NewSimpleCol("arraySort(groupArray((pre_final.timestamp_ms, pre_final.value)))", "values"),
|
||||
).From(sql.NewWithRef(withMain)).
|
||||
//AndWhere(sql.Neq(sql.NewRawObject("pre_final.value"), sql.NewIntVal(0))).
|
||||
Join(sql.NewJoin(
|
||||
"ANY LEFT",
|
||||
sql.NewWithRef(withLabels),
|
||||
sql.Eq(
|
||||
sql.NewRawObject("pre_final.fingerprint"),
|
||||
sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint")))).
|
||||
GroupBy(sql.NewRawObject(withLabels.GetAlias() + ".labels"))
|
||||
return res, nil
|
||||
}
|
48
wasm_parts/promql/planners/labels_init.go
Normal file
48
wasm_parts/promql/planners/labels_init.go
Normal file
@ -0,0 +1,48 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type LabelsInitPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
FingerprintsAlias string
|
||||
}
|
||||
|
||||
func (l *LabelsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := l.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var withFp *sql.With
|
||||
for _, w := range main.GetWith() {
|
||||
if w.GetAlias() == l.FingerprintsAlias {
|
||||
withFp = w
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if withFp == nil {
|
||||
return nil, fmt.Errorf("fingerprints subrequest not found")
|
||||
}
|
||||
|
||||
labelsCol := "mapFromArrays(" +
|
||||
"arrayMap(x -> x.1, JSONExtractKeysAndValues(time_series.labels, 'String') as ts_kv), " +
|
||||
"arrayMap(x -> x.2, ts_kv))"
|
||||
|
||||
labelsSubSel := sql.NewSelect().Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol(labelsCol, "labels"),
|
||||
sql.NewSimpleCol("fingerprint", "new_fingerprint")).
|
||||
From(sql.NewSimpleCol(ctx.TimeSeriesTable, "time_series")).
|
||||
AndWhere(
|
||||
sql.Ge(sql.NewRawObject("date"), sql.NewStringVal(ctx.From.Format("2006-01-02"))),
|
||||
sql.Le(sql.NewRawObject("date"), sql.NewStringVal(ctx.To.Format("2006-01-02"))),
|
||||
sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFp)))
|
||||
withLabelsSubSel := sql.NewWith(labelsSubSel, "labels")
|
||||
|
||||
return main.AddWith(withLabelsSubSel), nil
|
||||
}
|
46
wasm_parts/promql/planners/metrics_extend.go
Normal file
46
wasm_parts/promql/planners/metrics_extend.go
Normal file
@ -0,0 +1,46 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type MetricsExtendPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := m.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
extendCnt := 300000 / ctx.Step.Milliseconds()
|
||||
if extendCnt < 1 {
|
||||
return main, nil
|
||||
}
|
||||
withMain := sql.NewWith(main, "pre_extend")
|
||||
extendedCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"argMaxIf(value, timestamp_ms, pre_extend.original = 1) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", extendCnt), nil
|
||||
})
|
||||
origCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"max(original) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", extendCnt), nil
|
||||
})
|
||||
extend := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewCol(extendedCol, "value"),
|
||||
sql.NewCol(origCol, "original")).
|
||||
From(sql.NewWithRef(withMain))
|
||||
withExtend := sql.NewWith(extend, "extend")
|
||||
return sql.NewSelect().With(withExtend).Select(sql.NewRawObject("*")).
|
||||
From(sql.NewWithRef(withExtend)).
|
||||
AndWhere(sql.Eq(sql.NewRawObject("original"), sql.NewIntVal(1))), nil
|
||||
}
|
77
wasm_parts/promql/planners/metrics_rate.go
Normal file
77
wasm_parts/promql/planners/metrics_rate.go
Normal file
@ -0,0 +1,77 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type RatePlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
func (m *RatePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := m.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rateCnt := m.Duration.Milliseconds() / ctx.Step.Milliseconds()
|
||||
if rateCnt < 1 {
|
||||
rateCnt = 1
|
||||
}
|
||||
withMain := sql.NewWith(main, "pre_rate")
|
||||
lastCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"argMax(value, timestamp_ms) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", rateCnt), nil
|
||||
})
|
||||
firstCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"argMin(value, timestamp_ms) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", rateCnt), nil
|
||||
})
|
||||
resetCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"if(value < (any(value) OVER (" +
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" +
|
||||
") as lastValue), lastValue, 0)"), nil
|
||||
})
|
||||
reset := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewCol(resetCol, "reset"),
|
||||
sql.NewSimpleCol("value", "value")).
|
||||
From(sql.NewWithRef(withMain))
|
||||
withReset := sql.NewWith(reset, "pre_reset")
|
||||
resetColSum := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
_rateCnt := rateCnt - 1
|
||||
if rateCnt <= 1 {
|
||||
_rateCnt = 1
|
||||
}
|
||||
return fmt.Sprintf(
|
||||
"sum(reset) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", _rateCnt), nil
|
||||
})
|
||||
extend := sql.NewSelect().With(withReset).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewCol(lastCol, "last"),
|
||||
sql.NewCol(firstCol, "first"),
|
||||
sql.NewCol(resetColSum, "reset"),
|
||||
sql.NewSimpleCol(fmt.Sprintf("(last - first + reset) / %f", m.Duration.Seconds()), "_value")).
|
||||
From(sql.NewWithRef(withReset))
|
||||
withExtend := sql.NewWith(extend, "rate")
|
||||
return sql.NewSelect().
|
||||
With(withExtend).
|
||||
Select(sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewSimpleCol("_value", "value")).
|
||||
From(sql.NewWithRef(withExtend)), nil
|
||||
}
|
37
wasm_parts/promql/planners/metrics_raw_init.go
Normal file
37
wasm_parts/promql/planners/metrics_raw_init.go
Normal file
@ -0,0 +1,37 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type MetricsInitPlanner struct {
|
||||
ValueCol sql.SQLObject
|
||||
Fingerprint shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (m *MetricsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
fpReq, err := m.Fingerprint.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
withFpReq := sql.NewWith(fpReq, "fp_sel")
|
||||
if m.ValueCol == nil {
|
||||
m.ValueCol = sql.NewRawObject("argMaxMerge(last)")
|
||||
}
|
||||
tsNsCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf("intDiv(timestamp_ns, %d) * %d", ctx.Step.Nanoseconds(), ctx.Step.Milliseconds()), nil
|
||||
})
|
||||
return sql.NewSelect().With(withFpReq).Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewCol(tsNsCol, "timestamp_ms"),
|
||||
sql.NewCol(m.ValueCol, "value"),
|
||||
sql.NewSimpleCol("1::UInt8", "original")).
|
||||
From(sql.NewSimpleCol(ctx.MetricsTable, "metrics")).
|
||||
AndWhere(
|
||||
sql.Ge(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())),
|
||||
sql.Le(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.To.UnixNano())),
|
||||
sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFpReq))).
|
||||
GroupBy(sql.NewRawObject("fingerprint"), sql.NewRawObject("timestamp_ms")), nil
|
||||
}
|
50
wasm_parts/promql/planners/metrics_zerofill.go
Normal file
50
wasm_parts/promql/planners/metrics_zerofill.go
Normal file
@ -0,0 +1,50 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type MetricsZeroFillPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := m.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
main.OrderBy(sql.NewRawObject("fingerprint"), sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf("timestamp_ms WITH FILL FROM %d TO %d STEP %d",
|
||||
ctx.From.UnixMilli(), ctx.To.UnixMilli(), ctx.Step.Milliseconds()), nil
|
||||
}))
|
||||
return main, nil
|
||||
/*withMain := sql.NewWith(main, "prezerofill")
|
||||
arrLen := (ctx.To.UnixNano()-ctx.From.UnixNano())/ctx.Step.Nanoseconds() + 1
|
||||
zeroFillCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf("groupArrayInsertAt(nan, %d)(value, toUInt32(intDiv(timestamp_ms - %d, %d)))",
|
||||
arrLen, ctx.From.UnixMilli(), ctx.Step.Milliseconds()), nil
|
||||
})
|
||||
zeroFill := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewCol(zeroFillCol, "values")).
|
||||
From(sql.NewWithRef(withMain)).
|
||||
GroupBy(sql.NewRawObject("fingerprint"))
|
||||
withZeroFill := sql.NewWith(zeroFill, "zerofill")
|
||||
|
||||
joinZeroFillStmt := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf("arrayMap((x,y) -> (y * %d + %d, x), values, range(%d))",
|
||||
ctx.Step.Milliseconds(), ctx.From.UnixMilli(), arrLen), nil
|
||||
})
|
||||
|
||||
postZeroFill := sql.NewSelect().With(withZeroFill).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewSimpleCol("val.2", "value")).
|
||||
From(sql.NewWithRef(withMain)).
|
||||
Join(sql.NewJoin("array", sql.NewCol(joinZeroFillStmt, "val"), nil))
|
||||
return postZeroFill, nil*/
|
||||
}
|
102
wasm_parts/promql/planners/stream_select_planner.go
Normal file
102
wasm_parts/promql/planners/stream_select_planner.go
Normal file
@ -0,0 +1,102 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"strings"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type StreamSelectPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
Matchers []*labels.Matcher
|
||||
}
|
||||
|
||||
func (s *StreamSelectPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := s.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conds := make([]sql.SQLCondition, len(s.Matchers))
|
||||
for i, m := range s.Matchers {
|
||||
conds[i], err = s.getCond(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
main.AndWhere(sql.Or(conds...))
|
||||
|
||||
bitSetEntries := make([]*bitSetEntry, len(conds))
|
||||
for i, c := range conds {
|
||||
bitSetEntries[i] = &bitSetEntry{c, i}
|
||||
}
|
||||
main.AndHaving(sql.Eq(&bitSet{entries: bitSetEntries}, sql.NewIntVal((int64(1)<<uint(len(conds)))-1)))
|
||||
return main, nil
|
||||
}
|
||||
|
||||
func (s *StreamSelectPlanner) getCond(m *labels.Matcher) (sql.SQLCondition, error) {
|
||||
keyCond := sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(m.Name))
|
||||
var valCond sql.SQLCondition
|
||||
switch m.Type {
|
||||
case labels.MatchEqual:
|
||||
valCond = sql.Eq(sql.NewRawObject("val"), sql.NewStringVal(m.Value))
|
||||
case labels.MatchNotEqual:
|
||||
valCond = sql.Neq(sql.NewRawObject("val"), sql.NewStringVal(m.Value))
|
||||
case labels.MatchRegexp:
|
||||
valCond = sql.Eq(&pregMatch{sql.NewRawObject("val"), sql.NewStringVal(m.Value)},
|
||||
sql.NewIntVal(1))
|
||||
case labels.MatchNotRegexp:
|
||||
valCond = sql.Eq(&pregMatch{sql.NewRawObject("val"), sql.NewStringVal(m.Value)},
|
||||
sql.NewIntVal(0))
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown matcher type: %v", m.Type)
|
||||
}
|
||||
return sql.And(keyCond, valCond), nil
|
||||
}
|
||||
|
||||
type pregMatch struct {
|
||||
key sql.SQLObject
|
||||
val sql.SQLObject
|
||||
}
|
||||
|
||||
func (p *pregMatch) String(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
strK, err := p.key.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
strV, err := p.val.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("match(%s, %s)", strK, strV), nil
|
||||
}
|
||||
|
||||
type bitSetEntry struct {
|
||||
cond sql.SQLCondition
|
||||
idx int
|
||||
}
|
||||
|
||||
func (b bitSetEntry) String(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
strCond, err := b.cond.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("bitShiftLeft(toUInt64(%s), %d)", strCond, b.idx), nil
|
||||
}
|
||||
|
||||
type bitSet struct {
|
||||
entries []*bitSetEntry
|
||||
}
|
||||
|
||||
func (b bitSet) String(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
strEntries := make([]string, len(b.entries))
|
||||
var err error
|
||||
for i, e := range b.entries {
|
||||
strEntries[i], err = e.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("groupBitOr(%s)", strings.Join(strEntries, "+")), nil
|
||||
}
|
20
wasm_parts/promql/planners/time_series_gin_init.go
Normal file
20
wasm_parts/promql/planners/time_series_gin_init.go
Normal file
@ -0,0 +1,20 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type TimeSeriesGinInitPlanner struct {
|
||||
}
|
||||
|
||||
func (t *TimeSeriesGinInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
return sql.NewSelect().
|
||||
Select(sql.NewSimpleCol("fingerprint", "fingerprint")).
|
||||
From(sql.NewSimpleCol(ctx.TimeSeriesGinTable, "ts_gin")).
|
||||
AndWhere(
|
||||
sql.Ge(sql.NewRawObject("date"), sql.NewStringVal(ctx.From.Format("2006-01-02"))),
|
||||
sql.Le(sql.NewRawObject("date"), sql.NewStringVal(ctx.To.Format("2006-01-02"))),
|
||||
sql.NewIn(sql.NewRawObject("type"), sql.NewIntVal(0), sql.NewIntVal(2))).
|
||||
GroupBy(sql.NewRawObject("fingerprint")), nil
|
||||
}
|
62
wasm_parts/promql/rate.go
Normal file
62
wasm_parts/promql/rate.go
Normal file
@ -0,0 +1,62 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type RateOptimizer struct {
|
||||
vectorSelectorOptimizer *VectorSelectorOptimizer
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) IsAppliable(node parser.Node) bool {
|
||||
_node, ok := node.(*parser.Call)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
vectorSelector := r.getVectorSelector(_node)
|
||||
return vectorSelector != nil && (&VectorSelectorOptimizer{}).IsAppliable(vectorSelector)
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
_node, ok := node.(*parser.Call)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
vectorSelector := r.getVectorSelector(_node)
|
||||
matrixSelector := _node.Args[0].(*parser.MatrixSelector)
|
||||
res, err := (&VectorSelectorOptimizer{}).Optimize(vectorSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = &planners.RatePlanner{
|
||||
Main: res,
|
||||
Duration: matrixSelector.Range,
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (v *RateOptimizer) PlanOptimize(node parser.Node) error {
|
||||
v.vectorSelectorOptimizer = &VectorSelectorOptimizer{}
|
||||
return v.vectorSelectorOptimizer.PlanOptimize(node)
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) getVectorSelector(node *parser.Call) *parser.VectorSelector {
|
||||
if node.Func.Name != "rate" || len(node.Args) != 1 {
|
||||
return nil
|
||||
}
|
||||
_matrixSelector, ok := node.Args[0].(*parser.MatrixSelector)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
vectorSelector, ok := _matrixSelector.VectorSelector.(*parser.VectorSelector)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return vectorSelector
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) Children() []IOptimizer {
|
||||
return []IOptimizer{r.vectorSelectorOptimizer}
|
||||
}
|
22
wasm_parts/promql/shared/types.go
Normal file
22
wasm_parts/promql/shared/types.go
Normal file
@ -0,0 +1,22 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"time"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type RequestPlanner interface {
|
||||
Process(ctx *PlannerContext) (sql.ISelect, error)
|
||||
}
|
||||
|
||||
type PlannerContext struct {
|
||||
IsCluster bool
|
||||
From time.Time
|
||||
To time.Time
|
||||
Step time.Duration
|
||||
TimeSeriesTable string
|
||||
TimeSeriesDistTable string
|
||||
TimeSeriesGinTable string
|
||||
MetricsTable string
|
||||
MetricsDistTable string
|
||||
}
|
45
wasm_parts/promql/smart_optimizers.go
Normal file
45
wasm_parts/promql/smart_optimizers.go
Normal file
@ -0,0 +1,45 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type IOptimizer interface {
|
||||
IsAppliable(node parser.Node) bool
|
||||
Optimize(node parser.Node) (shared.RequestPlanner, error)
|
||||
PlanOptimize(node parser.Node) error
|
||||
Children() []IOptimizer
|
||||
}
|
||||
|
||||
type OptimizerFactory func() IOptimizer
|
||||
|
||||
var VectorSelectorOptimizerFactory OptimizerFactory = func() IOptimizer {
|
||||
return &VectorSelectorOptimizer{}
|
||||
}
|
||||
|
||||
var FinalizerOptimizerFactory OptimizerFactory = func() IOptimizer {
|
||||
return &FinalizerOptimizer{}
|
||||
}
|
||||
|
||||
var Optimizers = []OptimizerFactory{
|
||||
func() IOptimizer {
|
||||
return &RateOptimizer{}
|
||||
},
|
||||
func() IOptimizer {
|
||||
return &AggregateOptimizer{}
|
||||
},
|
||||
}
|
||||
|
||||
func GetAppliableOptimizer(node parser.Node, factories []OptimizerFactory) IOptimizer {
|
||||
if factories == nil {
|
||||
factories = Optimizers
|
||||
}
|
||||
for _, factory := range factories {
|
||||
opt := factory()
|
||||
if opt.IsAppliable(node) {
|
||||
return opt
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
39
wasm_parts/promql/vector.go
Normal file
39
wasm_parts/promql/vector.go
Normal file
@ -0,0 +1,39 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type VectorSelectorOptimizer struct {
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) IsAppliable(node parser.Node) bool {
|
||||
_, ok := node.(*parser.VectorSelector)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) PlanOptimize(node parser.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
_node := node.(*parser.VectorSelector)
|
||||
var res shared.RequestPlanner = &planners.TimeSeriesGinInitPlanner{}
|
||||
res = &planners.StreamSelectPlanner{
|
||||
Main: res,
|
||||
Matchers: _node.LabelMatchers,
|
||||
}
|
||||
res = &planners.MetricsInitPlanner{
|
||||
ValueCol: nil,
|
||||
Fingerprint: res,
|
||||
}
|
||||
res = &planners.MetricsZeroFillPlanner{Main: res}
|
||||
res = &planners.MetricsExtendPlanner{Main: res}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) Children() []IOptimizer {
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user