diff --git a/promql/index.js b/promql/index.js index f70caaf..8bda90a 100644 --- a/promql/index.js +++ b/promql/index.js @@ -117,8 +117,15 @@ const getIdxSubquery = (conds, fromMs, toMs) => { ).groupBy('fingerprint') } -module.exports.getData = async (matchers, fromMs, toMs) => { +module.exports.getData = async (matchers, fromMs, toMs, subqueries) => { const db = DATABASE_NAME() + const subq = (subqueries || {})[getMetricName(matchers)] + if (subq) { + console.log(subq) + const data = await rawRequest(subq + ' FORMAT RowBinary', + null, db, { responseType: 'arraybuffer' }) + return new Uint8Array(data.data) + } const matches = getMatchersIdxCond(matchers) const idx = getIdxSubquery(matches, fromMs, toMs) const withIdx = new Sql.With('idx', idx, !!clusterName) @@ -176,4 +183,12 @@ module.exports.getData = async (matchers, fromMs, toMs) => { return new Uint8Array(data.data) } +function getMetricName(matchers) { + for (const matcher of matchers) { + if (matcher[0] === '__name__' && matcher[1] === '=') { + return matcher[2] + } + } +} + prometheus.getData = module.exports.getData diff --git a/test/traceql_parser.test.js b/test/traceql_parser.test.js index 7042cc6..0a6c09a 100644 --- a/test/traceql_parser.test.js +++ b/test/traceql_parser.test.js @@ -44,3 +44,8 @@ it('traceql: max duration', () => { const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms') expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | max(duration) > 8ms') }) + +it('traceql: select', () => { + const res = parser.ParseScript('{.testId="12345" &&.spanN>=8.9} | select(a, b)') + expect(res.rootToken.value).toEqual('{.testId="12345" &&.spanN>=8.9} | select(a, b)') +}) diff --git a/traceql/clickhouse_transpiler/index.js b/traceql/clickhouse_transpiler/index.js index 44b07ea..b8338a0 100644 --- a/traceql/clickhouse_transpiler/index.js +++ b/traceql/clickhouse_transpiler/index.js @@ -292,6 +292,9 @@ module.exports.Planner = class Planner { if (!agg) { return } + if (['count', 'sum', 'min', 'max', 'avg'].indexOf(agg.Child('fn').value) < 0) { + return + } this.aggFn = agg.Child('fn').value const labelName = agg.Child('attr').Child('label_name') this.aggregatedAttr = labelName ? labelName.value : '' diff --git a/traceql/clickhouse_transpiler/traces_data.js b/traceql/clickhouse_transpiler/traces_data.js index fc3ab19..2a263ea 100644 --- a/traceql/clickhouse_transpiler/traces_data.js +++ b/traceql/clickhouse_transpiler/traces_data.js @@ -24,7 +24,9 @@ const processFn = (sel, ctx) => { [new Sql.Raw( 'toFloat64(max(traces.timestamp_ns + traces.duration_ns) - min(traces.timestamp_ns)) / 1000000' ), 'duration_ms'], - [new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name'] + [new Sql.Raw('argMin(traces.name, traces.timestamp_ns)', 'root_service_name'), 'root_service_name'], + [new Sql.Raw(`groupArrayIf(base64Encode(traces.payload), (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload'], + [new Sql.Raw(`groupArrayIf(traces.payload_type, (traces.trace_id, traces.span_id) IN ${new Sql.WithReference(withTraceIdsSpanIds)})`), 'payload_type'] ).from([table, 'traces']).where(Sql.And( new Sql.In(new Sql.Raw('traces.trace_id'), 'in', new Sql.WithReference(withTraceIds)) )).groupBy('traces.trace_id') diff --git a/traceql/index.js b/traceql/index.js index bec8b96..71690db 100644 --- a/traceql/index.js +++ b/traceql/index.js @@ -4,6 +4,7 @@ const logger = require('../lib/logger') const { DATABASE_NAME } = require('../lib/utils') const { clusterName } = require('../common') const { rawRequest } = require('../lib/db/clickhouse') +const { postProcess } = require('./post_processor') /** * @@ -39,12 +40,14 @@ const search = async (query, limit, from, to) => { } else { res = await processSmallResult(ctx, scrpit.rootToken) } + res = postProcess(res, scrpit.rootToken) res.forEach(t => t.spanSets.forEach( ss => ss.spans.sort( (a, b) => b.startTimeUnixNano.localeCompare(a.startTimeUnixNano)) ) ) + console.log(JSON.stringify(res, 2)) return res } @@ -70,11 +73,12 @@ const evaluateComplexity = async (ctx, script) => { async function processComplexResult (ctx, script, complexity) { const planner = ctx.planner.plan() const maxFilter = Math.floor(complexity / 10000000) - let traces = [] + //let traces = [] + let response = null for (let i = 0; i < maxFilter; i++) { ctx.randomFilter = [maxFilter, i] const sql = planner(ctx) - const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) + response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) if (response.data.data.length === parseInt(ctx.limit)) { const minStart = response.data.data.reduce((acc, row) => acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0 @@ -88,7 +92,7 @@ async function processComplexResult (ctx, script, complexity) { ctx.randomFilter = [maxFilter, i] } ctx.cachedTraceIds = response.data.data.map(row => row.trace_id) - traces = response.data.data.map(row => ({ + /*traces = response.data.data.map(row => ({ traceID: row.trace_id, rootServiceName: row.root_service_name, rootTraceName: row.root_trace_name, @@ -105,9 +109,9 @@ async function processComplexResult (ctx, script, complexity) { matched: row.span_id.length } ] - })) + }))*/ } - return traces + return response.data.data } /** @@ -119,7 +123,7 @@ async function processSmallResult (ctx, script) { const planner = ctx.planner.plan() const sql = planner(ctx) const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME()) - const traces = response.data.data.map(row => ({ + /*const traces = response.data.data.map(row => ({ traceID: row.trace_id, rootServiceName: row.root_service_name, rootTraceName: row.root_trace_name, @@ -146,8 +150,8 @@ async function processSmallResult (ctx, script) { matched: row.span_id.length } ] - })) - return traces + }))*/ + return response.data.data } module.exports = { diff --git a/traceql/post_processor/index.js b/traceql/post_processor/index.js new file mode 100644 index 0000000..b3515af --- /dev/null +++ b/traceql/post_processor/index.js @@ -0,0 +1,74 @@ +const Otlp = require('../../lib/db/otlp') +const Zipkin = require('../../lib/db/zipkin') +const protobufjs = require('protobufjs') +const path = require('path') +const OTLPSpan = protobufjs.loadSync(path.join(__dirname, '..', '..', + 'lib', 'otlp.proto')).lookupType('Span') +/** + * + * @param rows {Row[]} + * @param script {Token} + */ +function postProcess (rows, script) { + const selectAttrs = script.Children('aggregator') + .filter(x => x.Child('fn').value === 'select') + .map(x => x.Children('label_name')) + .reduce((acc, x) => { + let attrs = x.map(y => ({ + name: y.value, + path: y.value.split('.').filter(y => y) + })) + if (attrs[0] === 'span' || attrs[0] === 'resource') { + attrs = attrs.slice(1) + } + return [...acc, ...attrs] + }, []) + rows = rows.map(row => ({ + ...row, + objs: row.payload.map((payload, i) => { + let span = null + switch (row.payload_type[i]) { + case 1: + return new Zipkin(JSON.parse(Buffer.from(payload, 'base64').toString())) + case 2: + span = OTLPSpan.toObject( + OTLPSpan.decode(Buffer.from(payload, 'base64')), { + longs: String, + bytes: String + }) + return new Otlp(span) + } + return null + }) + })) + const spans = (row) => row.span_id.map((spanId, i) => ({ + spanID: spanId, + startTimeUnixNano: row.timestamp_ns[i], + durationNanos: row.duration[i], + attributes: selectAttrs.map(attr => ({ + key: attr.name, + value: { + stringValue: (row.objs[i].tags.find(t => t[0] === attr.path.join('.')) || [null, null])[1] + } + })).filter(x => x.value.stringValue) + })) + const traces = rows.map(row => ({ + traceID: row.trace_id, + rootServiceName: row.root_service_name, + rootTraceName: row.root_trace_name, + startTimeUnixNano: row.start_time_unix_nano, + durationMs: row.duration_ms, + spanSet: { spans: spans(row) }, + spanSets: [ + { + spans: spans(row), + matched: row.span_id.length + } + ] + })) + return traces +} + +module.exports = { + postProcess +} diff --git a/traceql/post_processor/types.d.ts b/traceql/post_processor/types.d.ts new file mode 100644 index 0000000..5a92e85 --- /dev/null +++ b/traceql/post_processor/types.d.ts @@ -0,0 +1,11 @@ +export interface Row { + trace_id: string; + span_id: string[]; + duration: string[]; + timestamp_ns: string[]; + start_time_unix_nano: string; + duration_ms: number; + root_service_name: string; + payload: string[]; + payload_type: number[]; +} \ No newline at end of file diff --git a/traceql/traceql.bnf b/traceql/traceql.bnf index 366698c..28e447b 100644 --- a/traceql/traceql.bnf +++ b/traceql/traceql.bnf @@ -6,14 +6,15 @@ complex_head ::= "(" ")" tail ::= and_or ::= "&&" | "||" -aggregator ::= "|" -fn ::= "count"|"sum"|"min"|"max"|"avg" -attr ::= "(" [ ] ")" +aggregator ::= "|" [ ] +fn ::= "count"|"sum"|"min"|"max"|"avg"|"select" +attr ::= "(" [ ] ")" cmp ::= "="|"!="|"<="|">="|"<"|">" cmp_val ::= [] measurement ::= "ns"|"us"|"ms"|"s"|"m"|"h"|"d" label_name ::= ("." | | "-" | "_") *("." | | "_" | "-" | ) +label_names ::= *( "," ) number ::= ["-"] ["." ] attr_selector ::= diff --git a/wasm_parts/main.go b/wasm_parts/main.go index 0dcfae3..7f3ad84 100644 --- a/wasm_parts/main.go +++ b/wasm_parts/main.go @@ -4,8 +4,6 @@ import ( "context" "fmt" gcContext "github.com/metrico/micro-gc/context" - "sync" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -15,6 +13,8 @@ import ( "strings" "time" "unsafe" + promql2 "wasm_parts/promql" + shared2 "wasm_parts/promql/shared" sql "wasm_parts/sql_select" parser2 "wasm_parts/traceql/parser" traceql_transpiler "wasm_parts/traceql/transpiler" @@ -165,12 +165,12 @@ func stats() { } //export pqlRangeQuery -func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 { +func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64, optimizable uint32) uint32 { ctxId := gcContext.GetContextID() gcContext.SetContext(id) defer gcContext.SetContext(ctxId) - return pql(id, data[id], func() (promql.Query, error) { + return pql(id, data[id], optimizable != 0, int64(fromMS), int64(toMS), int64(stepMS), func() (promql.Query, error) { queriable := &TestQueryable{id: id, stepMs: int64(stepMS)} return getEng().NewRangeQuery( queriable, @@ -184,19 +184,20 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint } //export pqlInstantQuery -func pqlInstantQuery(id uint32, timeMS float64) uint32 { +func pqlInstantQuery(id uint32, timeMS float64, optimizable uint32) uint32 { ctxId := gcContext.GetContextID() gcContext.SetContext(id) defer gcContext.SetContext(ctxId) - return pql(id, data[id], func() (promql.Query, error) { - queriable := &TestQueryable{id: id, stepMs: 15000} - return getEng().NewInstantQuery( - queriable, - nil, - string(data[id].request), - time.Unix(0, int64(timeMS)*1000000)) - }) + return pql(id, data[id], optimizable != 0, int64(timeMS-300000), int64(timeMS), 15000, + func() (promql.Query, error) { + queriable := &TestQueryable{id: id, stepMs: 15000} + return getEng().NewInstantQuery( + queriable, + nil, + string(data[id].request), + time.Unix(0, int64(timeMS)*1000000)) + }) } //export pqlSeries @@ -255,13 +256,16 @@ func wrapErrorStr(err error) string { return err.Error() } -func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 { +func pql(id uint32, c *ctx, optimizable bool, + fromMs int64, toMs int64, stepMs int64, + query func() (promql.Query, error)) uint32 { rq, err := query() if err != nil { c.response = wrapError(err) return 1 } + var walk func(node parser.Node, i func(node parser.Node)) walk = func(node parser.Node, i func(node parser.Node)) { i(node) @@ -269,9 +273,39 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 { walk(n, i) } } + + maxDurationMs := getMaxDurationMs(rq.Statement()) + fromMs -= maxDurationMs + + subsels := strings.Builder{} + subsels.WriteString("{") + if optimizable { + var ( + subselsMap map[string]string + err error + ) + subselsMap, rq, err = optimizeQuery(rq, fromMs, toMs, stepMs) + if err != nil { + c.response = wrapError(err) + return 1 + } + i := 0 + for k, v := range subselsMap { + if i != 0 { + subsels.WriteString(",") + } + subsels.WriteString(fmt.Sprintf(`%s:%s`, strconv.Quote(k), strconv.Quote(v))) + i++ + } + } + subsels.WriteString("}") + matchersJSON := getmatchersJSON(rq) - c.response = []byte(matchersJSON) + c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s, "fromMs": %d}`, + subsels.String(), + matchersJSON, + fromMs)) c.onDataLoad = func(c *ctx) { ctxId := gcContext.GetContextID() gcContext.SetContext(id) @@ -284,6 +318,83 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 { return 0 } +func getMaxDurationMs(q parser.Node) int64 { + maxDurationMs := int64(0) + for _, c := range parser.Children(q) { + _m := getMaxDurationMs(c) + if _m > maxDurationMs { + maxDurationMs = _m + } + } + ms, _ := q.(*parser.MatrixSelector) + if ms != nil && maxDurationMs < ms.Range.Milliseconds() { + return ms.Range.Milliseconds() + } + return maxDurationMs +} + +func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[string]string, promql.Query, error) { + appliableNodes := findAppliableNodes(q.Statement(), nil) + var err error + subsels := make(map[string]string) + for _, m := range appliableNodes { + fmt.Println(m) + opt := m.optimizer + opt = &promql2.FinalizerOptimizer{ + SubOptimizer: opt, + } + opt, err = promql2.PlanOptimize(m.node, opt) + if err != nil { + return nil, nil, err + } + planner, err := opt.Optimize(m.node) + if err != nil { + return nil, nil, err + } + fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano()) + swapChild(m.parent, m.node, &parser.VectorSelector{ + Name: fakeMetric, + OriginalOffset: 0, + Offset: 0, + Timestamp: nil, + StartOrEnd: 0, + LabelMatchers: []*labels.Matcher{ + { + Type: labels.MatchEqual, + Name: "__name__", + Value: fakeMetric, + }, + }, + UnexpandedSeriesSet: nil, + Series: nil, + PosRange: parser.PositionRange{}, + }) + sel, err := planner.Process(&shared2.PlannerContext{ + IsCluster: false, + From: time.Unix(0, fromMs*1000000), + To: time.Unix(0, toMs*1000000), + Step: time.Millisecond * 15000, /*time.Duration(stepMs)*/ + TimeSeriesTable: "time_series", + TimeSeriesDistTable: "time_series_dist", + TimeSeriesGinTable: "time_series_gin", + MetricsTable: "metrics_15s", + MetricsDistTable: "metrics_15s_dist", + }) + if err != nil { + return nil, nil, err + } + strSel, err := sel.String(&sql.Ctx{ + Params: map[string]sql.SQLObject{}, + Result: map[string]sql.SQLObject{}, + }) + if err != nil { + return nil, nil, err + } + subsels[fakeMetric] = strSel + } + return subsels, q, nil +} + //export onDataLoad func onDataLoad(idx uint32) { data[idx].onDataLoad(data[idx]) @@ -358,9 +469,101 @@ func writeVector(v promql.Vector) string { } func main() { - p := sync.Pool{} - a := p.Get() - _ = a +} + +func getOptimizer(n parser.Node) promql2.IOptimizer { + for _, f := range promql2.Optimizers { + opt := f() + if opt.IsAppliable(n) { + return opt + } + } + return nil +} + +func isRate(node parser.Node) (bool, bool) { + opt := getOptimizer(node) + if opt == nil { + return false, true + } + return true, false +} + +type MatchNode struct { + node parser.Node + parent parser.Node + optimizer promql2.IOptimizer +} + +func findAppliableNodes(root parser.Node, parent parser.Node) []MatchNode { + var res []MatchNode + optimizer := getOptimizer(root) + if optimizer != nil { + res = append(res, MatchNode{ + node: root, + parent: parent, + optimizer: optimizer, + }) + return res + } + for _, n := range parser.Children(root) { + res = append(res, findAppliableNodes(n, root)...) + } + return res +} + +func swapChild(node parser.Node, child parser.Node, newChild parser.Expr) { + // For some reasons these switches have significantly better performance than interfaces + switch n := node.(type) { + case *parser.EvalStmt: + n.Expr = newChild + case parser.Expressions: + for i, e := range n { + if e.String() == child.String() { + n[i] = newChild + } + } + case *parser.AggregateExpr: + if n.Expr == nil && n.Param == nil { + return + } else if n.Expr == nil { + n.Param = newChild + } else if n.Param == nil { + n.Expr = newChild + } else { + if n.Expr.String() == child.String() { + n.Expr = newChild + } else { + n.Param = newChild + } + } + case *parser.BinaryExpr: + if n.LHS.String() == child.String() { + n.LHS = newChild + } else if n.RHS.String() == child.String() { + n.RHS = newChild + } + case *parser.Call: + for i, e := range n.Args { + if e.String() == child.String() { + n.Args[i] = newChild + } + } + case *parser.SubqueryExpr: + n.Expr = newChild + case *parser.ParenExpr: + n.Expr = newChild + case *parser.UnaryExpr: + n.Expr = newChild + case *parser.MatrixSelector: + n.VectorSelector = newChild + case *parser.StepInvariantExpr: + n.Expr = newChild + } +} + +func getChildren(e parser.Node) []parser.Node { + return parser.Children(e) } type TestLogger struct{} @@ -444,10 +647,11 @@ type TestSeries struct { data []byte stepMs int64 - labels labels.Labels - tsMs int64 - val float64 - i int + labels labels.Labels + tsMs int64 + val float64 + lastValTs int64 + i int state int } @@ -469,11 +673,14 @@ func (t *TestSeries) Next() bool { t.tsMs += t.stepMs if t.tsMs >= ts { t.state = 0 + } else if t.lastValTs+300000 < t.tsMs { + t.state = 0 } } if t.state == 0 { t.tsMs = ts t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8])) + t.lastValTs = t.tsMs t.i++ t.state = 1 } @@ -585,3 +792,17 @@ func matchers2Str(labelMatchers []*labels.Matcher) string { matchersJson.WriteString("]") return matchersJson.String() } + +type pqlRequest struct { + optimizable bool + body string +} + +func (p *pqlRequest) Read(body []byte) { + r := BinaryReader{buffer: body} + p.optimizable = r.ReadULeb32() != 0 + p.body = r.ReadString() + if !p.optimizable { + return + } +} diff --git a/wasm_parts/main.js b/wasm_parts/main.js index bd15ac7..0ef187a 100644 --- a/wasm_parts/main.js +++ b/wasm_parts/main.js @@ -61,8 +61,8 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) => const end = endMs || Date.now() const step = stepMs || 15000 return await pql(query, - (ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step), - (matchers) => getData(matchers, start, end)) + (ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0), + (matchers, subq, startMs) => getData(matchers, startMs, end, subq)) } /** @@ -75,9 +75,10 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) => module.exports.pqlInstantQuery = async (query, timeMs, getData) => { const time = timeMs || Date.now() const _wasm = getWasm() + const start = time - 300000 return await pql(query, - (ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time), - (matchers) => getData(matchers, time - 300000, time)) + (ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0), + (matchers, subq, start) => getData(matchers, start, time, subq)) } module.exports.pqlMatchers = (query) => { @@ -162,8 +163,8 @@ const pql = async (query, wasmCall, getData) => { const matchersObj = JSON.parse(ctx.read()) const matchersResults = await Promise.all( - matchersObj.map(async (matchers, i) => { - const data = await getData(matchers) + matchersObj.matchers.map(async (matchers, i) => { + const data = await getData(matchers, matchersObj.subqueries, matchersObj.fromMs) return { matchers, data } })) diff --git a/wasm_parts/main.wasm.gz b/wasm_parts/main.wasm.gz index 469487e..eba7be7 100644 Binary files a/wasm_parts/main.wasm.gz and b/wasm_parts/main.wasm.gz differ diff --git a/wasm_parts/promql/aggregate.go b/wasm_parts/promql/aggregate.go new file mode 100644 index 0000000..0fd2ed1 --- /dev/null +++ b/wasm_parts/promql/aggregate.go @@ -0,0 +1,71 @@ +package promql + +import ( + "fmt" + "github.com/prometheus/prometheus/promql/parser" + "wasm_parts/promql/planners" + "wasm_parts/promql/shared" +) + +type AggregateOptimizer struct { + WithLabelsIn string + WithLabelsOut string + + subOptimizer IOptimizer +} + +func (a *AggregateOptimizer) IsAppliable(node parser.Node) bool { + aggExpr, ok := node.(*parser.AggregateExpr) + if !ok { + return false + } + if aggExpr.Op != parser.SUM { + return false + } + return GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory)) != nil +} + +func (a *AggregateOptimizer) PlanOptimize(node parser.Node) error { + aggExpr := node.(*parser.AggregateExpr) + a.subOptimizer = GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory)) + return a.subOptimizer.PlanOptimize(node) +} + +func (a *AggregateOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) { + aggExpr := node.(*parser.AggregateExpr) + planner, err := a.subOptimizer.Optimize(aggExpr.Expr) + if err != nil { + return nil, err + } + withLabelsIn := a.WithLabelsIn + if withLabelsIn == "" { + planner = &planners.LabelsInitPlanner{ + Main: planner, + FingerprintsAlias: "fp_sel", + } + withLabelsIn = "labels" + } + if a.WithLabelsOut == "" { + return nil, fmt.Errorf("AggregateOptimizer.WithLabelsOut is empty") + } + byWithout := "by" + if aggExpr.Without { + byWithout = "without" + } + planner = &planners.ByWithoutPlanner{ + Main: planner, + FingerprintWithName: withLabelsIn, + FingerprintsOutName: a.WithLabelsOut, + ByWithout: byWithout, + Labels: aggExpr.Grouping, + } + planner = &planners.SumPlanner{ + Main: planner, + LabelsAlias: a.WithLabelsOut, + } + return planner, nil +} + +func (a *AggregateOptimizer) Children() []IOptimizer { + return []IOptimizer{a.subOptimizer} +} diff --git a/wasm_parts/promql/finalize.go b/wasm_parts/promql/finalize.go new file mode 100644 index 0000000..d15bcf2 --- /dev/null +++ b/wasm_parts/promql/finalize.go @@ -0,0 +1,45 @@ +package promql + +import ( + "github.com/prometheus/prometheus/promql/parser" + "wasm_parts/promql/planners" + "wasm_parts/promql/shared" +) + +type FinalizerOptimizer struct { + LabelsIn string + SubOptimizer IOptimizer +} + +func (f *FinalizerOptimizer) IsAppliable(node parser.Node) bool { + return false +} + +func (f *FinalizerOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) { + planner, err := f.SubOptimizer.Optimize(node) + if err != nil { + return nil, err + } + labelsIn := f.LabelsIn + if labelsIn == "" { + planner = &planners.LabelsInitPlanner{ + Main: planner, + FingerprintsAlias: "fp_sel", + } + labelsIn = "labels" + } + + planner = &planners.FinalizePlanner{ + LabelsAlias: labelsIn, + Main: planner, + } + return planner, nil +} + +func (f *FinalizerOptimizer) PlanOptimize(node parser.Node) error { + return f.SubOptimizer.PlanOptimize(node) +} + +func (f *FinalizerOptimizer) Children() []IOptimizer { + return []IOptimizer{f.SubOptimizer} +} diff --git a/wasm_parts/promql/optimize.go b/wasm_parts/promql/optimize.go new file mode 100644 index 0000000..2ea382e --- /dev/null +++ b/wasm_parts/promql/optimize.go @@ -0,0 +1,37 @@ +package promql + +import ( + "fmt" + "github.com/prometheus/prometheus/promql/parser" +) + +func PlanOptimize(node parser.Node, optimizer IOptimizer) (IOptimizer, error) { + err := optimizer.PlanOptimize(node) + if err != nil { + return nil, err + } + + var checkLabelAliases func(opt IOptimizer, i int) int + checkLabelAliases = func(opt IOptimizer, i int) int { + var _i int + for _, c := range opt.Children() { + _i = checkLabelAliases(c, i) + } + switch opt.(type) { + case *AggregateOptimizer: + if _i != 0 { + opt.(*AggregateOptimizer).WithLabelsIn = fmt.Sprintf("labels_", _i) + } + opt.(*AggregateOptimizer).WithLabelsOut = fmt.Sprintf("labels_%d", _i+1) + _i++ + case *FinalizerOptimizer: + if _i != 0 { + opt.(*FinalizerOptimizer).LabelsIn = fmt.Sprintf("labels_%d", _i) + } + _i++ + } + return _i + } + checkLabelAliases(optimizer, 0) + return optimizer, nil +} diff --git a/wasm_parts/promql/planners/aggregate.go b/wasm_parts/promql/planners/aggregate.go new file mode 100644 index 0000000..a1f6cf0 --- /dev/null +++ b/wasm_parts/promql/planners/aggregate.go @@ -0,0 +1,48 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type SumPlanner struct { + Main shared.RequestPlanner + LabelsAlias string +} + +func (s *SumPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := s.Main.Process(ctx) + if err != nil { + return nil, err + } + + var withLabels *sql.With + for _, w := range main.GetWith() { + if w.GetAlias() == s.LabelsAlias { + withLabels = w + break + } + } + if withLabels == nil { + return nil, fmt.Errorf("labels subrequest not found") + } + withMain := sql.NewWith(main, "pre_sum") + + res := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol(withLabels.GetAlias()+".new_fingerprint", "fingerprint"), + sql.NewSimpleCol("pre_sum.timestamp_ms", "timestamp_ms"), + sql.NewSimpleCol("sum(pre_sum.value)", "value")). + From(sql.NewWithRef(withMain)). + Join(sql.NewJoin( + "ANY LEFT", + sql.NewWithRef(withLabels), + sql.Eq( + sql.NewRawObject("pre_sum.fingerprint"), + sql.NewRawObject(withLabels.GetAlias()+".fingerprint")))). + GroupBy( + sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint"), + sql.NewRawObject("pre_sum.timestamp_ms")) + return res, nil +} diff --git a/wasm_parts/promql/planners/by_without.go b/wasm_parts/promql/planners/by_without.go new file mode 100644 index 0000000..de38b83 --- /dev/null +++ b/wasm_parts/promql/planners/by_without.go @@ -0,0 +1,59 @@ +package planners + +import ( + "fmt" + "strings" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type ByWithoutPlanner struct { + Main shared.RequestPlanner + FingerprintWithName string + FingerprintsOutName string + ByWithout string + Labels []string +} + +func (b *ByWithoutPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := b.Main.Process(ctx) + if err != nil { + return nil, err + } + var fp *sql.With + withs := main.GetWith() + for _, w := range withs { + if w.GetAlias() == b.FingerprintWithName { + fp = w + break + } + } + if fp == nil { + return nil, fmt.Errorf("fingerprints subrequest not found") + } + labelsCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + cond := "IN" + if b.ByWithout == "without" { + cond = "NOT IN" + } + values := make([]string, len(b.Labels)) + var err error + for i, l := range b.Labels { + values[i], err = sql.NewStringVal(l).String(ctx, options...) + if err != nil { + return "", err + } + } + return fmt.Sprintf("mapFilter((k,v) -> k %s (%s), labels)", cond, strings.Join(values, ",")), nil + }) + newFpCol := "cityHash64(arraySort(arrayZip(mapKeys(labels), mapValues(labels))))" + newFp := sql.NewSelect(). + Select( + sql.NewSimpleCol(fp.GetAlias()+".new_fingerprint", "fingerprint"), + sql.NewCol(labelsCol, "labels"), + sql.NewSimpleCol(newFpCol, "new_fingerprint"), + ). + From(sql.NewWithRef(fp)) + withNewFp := sql.NewWith(newFp, b.FingerprintsOutName) + return main.AddWith(withNewFp), nil +} diff --git a/wasm_parts/promql/planners/finalize.go b/wasm_parts/promql/planners/finalize.go new file mode 100644 index 0000000..f3064f2 --- /dev/null +++ b/wasm_parts/promql/planners/finalize.go @@ -0,0 +1,47 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type FinalizePlanner struct { + LabelsAlias string + Main shared.RequestPlanner +} + +func (f *FinalizePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := f.Main.Process(ctx) + if err != nil { + return nil, err + } + + var withLabels *sql.With + for _, w := range main.GetWith() { + if w.GetAlias() == f.LabelsAlias { + withLabels = w + break + } + } + + if withLabels == nil { + return nil, fmt.Errorf("FinalizePlanner.Process: %s CTE not found", f.LabelsAlias) + } + + withMain := sql.NewWith(main, "pre_final") + res := sql.NewSelect().With(withMain).Select(withMain). + Select( + sql.NewSimpleCol(withLabels.GetAlias()+".labels", "labels"), + sql.NewSimpleCol("arraySort(groupArray((pre_final.timestamp_ms, pre_final.value)))", "values"), + ).From(sql.NewWithRef(withMain)). + //AndWhere(sql.Neq(sql.NewRawObject("pre_final.value"), sql.NewIntVal(0))). + Join(sql.NewJoin( + "ANY LEFT", + sql.NewWithRef(withLabels), + sql.Eq( + sql.NewRawObject("pre_final.fingerprint"), + sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint")))). + GroupBy(sql.NewRawObject(withLabels.GetAlias() + ".labels")) + return res, nil +} diff --git a/wasm_parts/promql/planners/labels_init.go b/wasm_parts/promql/planners/labels_init.go new file mode 100644 index 0000000..35c060f --- /dev/null +++ b/wasm_parts/promql/planners/labels_init.go @@ -0,0 +1,48 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type LabelsInitPlanner struct { + Main shared.RequestPlanner + FingerprintsAlias string +} + +func (l *LabelsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := l.Main.Process(ctx) + if err != nil { + return nil, err + } + + var withFp *sql.With + for _, w := range main.GetWith() { + if w.GetAlias() == l.FingerprintsAlias { + withFp = w + break + } + } + + if withFp == nil { + return nil, fmt.Errorf("fingerprints subrequest not found") + } + + labelsCol := "mapFromArrays(" + + "arrayMap(x -> x.1, JSONExtractKeysAndValues(time_series.labels, 'String') as ts_kv), " + + "arrayMap(x -> x.2, ts_kv))" + + labelsSubSel := sql.NewSelect().Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol(labelsCol, "labels"), + sql.NewSimpleCol("fingerprint", "new_fingerprint")). + From(sql.NewSimpleCol(ctx.TimeSeriesTable, "time_series")). + AndWhere( + sql.Ge(sql.NewRawObject("date"), sql.NewStringVal(ctx.From.Format("2006-01-02"))), + sql.Le(sql.NewRawObject("date"), sql.NewStringVal(ctx.To.Format("2006-01-02"))), + sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFp))) + withLabelsSubSel := sql.NewWith(labelsSubSel, "labels") + + return main.AddWith(withLabelsSubSel), nil +} diff --git a/wasm_parts/promql/planners/metrics_extend.go b/wasm_parts/promql/planners/metrics_extend.go new file mode 100644 index 0000000..52e1916 --- /dev/null +++ b/wasm_parts/promql/planners/metrics_extend.go @@ -0,0 +1,46 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type MetricsExtendPlanner struct { + Main shared.RequestPlanner +} + +func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := m.Main.Process(ctx) + if err != nil { + return nil, err + } + extendCnt := 300000 / ctx.Step.Milliseconds() + if extendCnt < 1 { + return main, nil + } + withMain := sql.NewWith(main, "pre_extend") + extendedCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "argMaxIf(value, timestamp_ms, pre_extend.original = 1) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", extendCnt), nil + }) + origCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "max(original) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", extendCnt), nil + }) + extend := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewCol(extendedCol, "value"), + sql.NewCol(origCol, "original")). + From(sql.NewWithRef(withMain)) + withExtend := sql.NewWith(extend, "extend") + return sql.NewSelect().With(withExtend).Select(sql.NewRawObject("*")). + From(sql.NewWithRef(withExtend)). + AndWhere(sql.Eq(sql.NewRawObject("original"), sql.NewIntVal(1))), nil +} diff --git a/wasm_parts/promql/planners/metrics_rate.go b/wasm_parts/promql/planners/metrics_rate.go new file mode 100644 index 0000000..4a472c4 --- /dev/null +++ b/wasm_parts/promql/planners/metrics_rate.go @@ -0,0 +1,77 @@ +package planners + +import ( + "fmt" + "time" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type RatePlanner struct { + Main shared.RequestPlanner + Duration time.Duration +} + +func (m *RatePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := m.Main.Process(ctx) + if err != nil { + return nil, err + } + rateCnt := m.Duration.Milliseconds() / ctx.Step.Milliseconds() + if rateCnt < 1 { + rateCnt = 1 + } + withMain := sql.NewWith(main, "pre_rate") + lastCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "argMax(value, timestamp_ms) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", rateCnt), nil + }) + firstCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "argMin(value, timestamp_ms) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", rateCnt), nil + }) + resetCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf( + "if(value < (any(value) OVER (" + + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" + + ") as lastValue), lastValue, 0)"), nil + }) + reset := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewCol(resetCol, "reset"), + sql.NewSimpleCol("value", "value")). + From(sql.NewWithRef(withMain)) + withReset := sql.NewWith(reset, "pre_reset") + resetColSum := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) { + _rateCnt := rateCnt - 1 + if rateCnt <= 1 { + _rateCnt = 1 + } + return fmt.Sprintf( + "sum(reset) OVER ("+ + "PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+ + ")", _rateCnt), nil + }) + extend := sql.NewSelect().With(withReset). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewCol(lastCol, "last"), + sql.NewCol(firstCol, "first"), + sql.NewCol(resetColSum, "reset"), + sql.NewSimpleCol(fmt.Sprintf("(last - first + reset) / %f", m.Duration.Seconds()), "_value")). + From(sql.NewWithRef(withReset)) + withExtend := sql.NewWith(extend, "rate") + return sql.NewSelect(). + With(withExtend). + Select(sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewSimpleCol("_value", "value")). + From(sql.NewWithRef(withExtend)), nil +} diff --git a/wasm_parts/promql/planners/metrics_raw_init.go b/wasm_parts/promql/planners/metrics_raw_init.go new file mode 100644 index 0000000..4cc233c --- /dev/null +++ b/wasm_parts/promql/planners/metrics_raw_init.go @@ -0,0 +1,37 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type MetricsInitPlanner struct { + ValueCol sql.SQLObject + Fingerprint shared.RequestPlanner +} + +func (m *MetricsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + fpReq, err := m.Fingerprint.Process(ctx) + if err != nil { + return nil, err + } + withFpReq := sql.NewWith(fpReq, "fp_sel") + if m.ValueCol == nil { + m.ValueCol = sql.NewRawObject("argMaxMerge(last)") + } + tsNsCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("intDiv(timestamp_ns, %d) * %d", ctx.Step.Nanoseconds(), ctx.Step.Milliseconds()), nil + }) + return sql.NewSelect().With(withFpReq).Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewCol(tsNsCol, "timestamp_ms"), + sql.NewCol(m.ValueCol, "value"), + sql.NewSimpleCol("1::UInt8", "original")). + From(sql.NewSimpleCol(ctx.MetricsTable, "metrics")). + AndWhere( + sql.Ge(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())), + sql.Le(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.To.UnixNano())), + sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFpReq))). + GroupBy(sql.NewRawObject("fingerprint"), sql.NewRawObject("timestamp_ms")), nil +} diff --git a/wasm_parts/promql/planners/metrics_zerofill.go b/wasm_parts/promql/planners/metrics_zerofill.go new file mode 100644 index 0000000..4f8fc70 --- /dev/null +++ b/wasm_parts/promql/planners/metrics_zerofill.go @@ -0,0 +1,50 @@ +package planners + +import ( + "fmt" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type MetricsZeroFillPlanner struct { + Main shared.RequestPlanner +} + +func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := m.Main.Process(ctx) + if err != nil { + return nil, err + } + main.OrderBy(sql.NewRawObject("fingerprint"), sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("timestamp_ms WITH FILL FROM %d TO %d STEP %d", + ctx.From.UnixMilli(), ctx.To.UnixMilli(), ctx.Step.Milliseconds()), nil + })) + return main, nil + /*withMain := sql.NewWith(main, "prezerofill") + arrLen := (ctx.To.UnixNano()-ctx.From.UnixNano())/ctx.Step.Nanoseconds() + 1 + zeroFillCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("groupArrayInsertAt(nan, %d)(value, toUInt32(intDiv(timestamp_ms - %d, %d)))", + arrLen, ctx.From.UnixMilli(), ctx.Step.Milliseconds()), nil + }) + zeroFill := sql.NewSelect().With(withMain). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewCol(zeroFillCol, "values")). + From(sql.NewWithRef(withMain)). + GroupBy(sql.NewRawObject("fingerprint")) + withZeroFill := sql.NewWith(zeroFill, "zerofill") + + joinZeroFillStmt := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) { + return fmt.Sprintf("arrayMap((x,y) -> (y * %d + %d, x), values, range(%d))", + ctx.Step.Milliseconds(), ctx.From.UnixMilli(), arrLen), nil + }) + + postZeroFill := sql.NewSelect().With(withZeroFill). + Select( + sql.NewSimpleCol("fingerprint", "fingerprint"), + sql.NewSimpleCol("timestamp_ms", "timestamp_ms"), + sql.NewSimpleCol("val.2", "value")). + From(sql.NewWithRef(withMain)). + Join(sql.NewJoin("array", sql.NewCol(joinZeroFillStmt, "val"), nil)) + return postZeroFill, nil*/ +} diff --git a/wasm_parts/promql/planners/stream_select_planner.go b/wasm_parts/promql/planners/stream_select_planner.go new file mode 100644 index 0000000..af095b1 --- /dev/null +++ b/wasm_parts/promql/planners/stream_select_planner.go @@ -0,0 +1,102 @@ +package planners + +import ( + "fmt" + "github.com/prometheus/prometheus/model/labels" + "strings" + "wasm_parts/promql/shared" + sql "wasm_parts/sql_select" +) + +type StreamSelectPlanner struct { + Main shared.RequestPlanner + Matchers []*labels.Matcher +} + +func (s *StreamSelectPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) { + main, err := s.Main.Process(ctx) + if err != nil { + return nil, err + } + conds := make([]sql.SQLCondition, len(s.Matchers)) + for i, m := range s.Matchers { + conds[i], err = s.getCond(m) + if err != nil { + return nil, err + } + } + main.AndWhere(sql.Or(conds...)) + + bitSetEntries := make([]*bitSetEntry, len(conds)) + for i, c := range conds { + bitSetEntries[i] = &bitSetEntry{c, i} + } + main.AndHaving(sql.Eq(&bitSet{entries: bitSetEntries}, sql.NewIntVal((int64(1)<