mirror of
https://github.com/metrico/qryn.git
synced 2025-03-14 10:07:18 +00:00
pql optimization init
This commit is contained in:
@ -117,8 +117,14 @@ const getIdxSubquery = (conds, fromMs, toMs) => {
|
||||
).groupBy('fingerprint')
|
||||
}
|
||||
|
||||
module.exports.getData = async (matchers, fromMs, toMs) => {
|
||||
module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
|
||||
const db = DATABASE_NAME()
|
||||
const subq = (subqueries || {})[getMetricName(matchers)]
|
||||
if (subq) {
|
||||
const data = await rawRequest(subq + ' FORMAT RowBinary',
|
||||
null, db, { responseType: 'arraybuffer' })
|
||||
return new Uint8Array(data.data)
|
||||
}
|
||||
const matches = getMatchersIdxCond(matchers)
|
||||
const idx = getIdxSubquery(matches, fromMs, toMs)
|
||||
const withIdx = new Sql.With('idx', idx, !!clusterName)
|
||||
@ -176,4 +182,12 @@ module.exports.getData = async (matchers, fromMs, toMs) => {
|
||||
return new Uint8Array(data.data)
|
||||
}
|
||||
|
||||
function getMetricName(matchers) {
|
||||
for (const matcher of matchers) {
|
||||
if (matcher[0] === '__name__' && matcher[1] === '=') {
|
||||
return matcher[2]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
prometheus.getData = module.exports.getData
|
||||
|
@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
gcContext "github.com/metrico/micro-gc/context"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
@ -15,6 +13,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
"unsafe"
|
||||
promql2 "wasm_parts/promql"
|
||||
shared2 "wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
parser2 "wasm_parts/traceql/parser"
|
||||
traceql_transpiler "wasm_parts/traceql/transpiler"
|
||||
@ -165,12 +165,12 @@ func stats() {
|
||||
}
|
||||
|
||||
//export pqlRangeQuery
|
||||
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint32 {
|
||||
func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64, optimizable uint32) uint32 {
|
||||
ctxId := gcContext.GetContextID()
|
||||
gcContext.SetContext(id)
|
||||
defer gcContext.SetContext(ctxId)
|
||||
|
||||
return pql(id, data[id], func() (promql.Query, error) {
|
||||
return pql(id, data[id], optimizable != 0, int64(fromMS), int64(toMS), int64(stepMS), func() (promql.Query, error) {
|
||||
queriable := &TestQueryable{id: id, stepMs: int64(stepMS)}
|
||||
return getEng().NewRangeQuery(
|
||||
queriable,
|
||||
@ -184,19 +184,20 @@ func pqlRangeQuery(id uint32, fromMS float64, toMS float64, stepMS float64) uint
|
||||
}
|
||||
|
||||
//export pqlInstantQuery
|
||||
func pqlInstantQuery(id uint32, timeMS float64) uint32 {
|
||||
func pqlInstantQuery(id uint32, timeMS float64, optimizable uint32) uint32 {
|
||||
ctxId := gcContext.GetContextID()
|
||||
gcContext.SetContext(id)
|
||||
defer gcContext.SetContext(ctxId)
|
||||
|
||||
return pql(id, data[id], func() (promql.Query, error) {
|
||||
queriable := &TestQueryable{id: id, stepMs: 15000}
|
||||
return getEng().NewInstantQuery(
|
||||
queriable,
|
||||
nil,
|
||||
string(data[id].request),
|
||||
time.Unix(0, int64(timeMS)*1000000))
|
||||
})
|
||||
return pql(id, data[id], optimizable != 0, int64(timeMS-300000), int64(timeMS), 15000,
|
||||
func() (promql.Query, error) {
|
||||
queriable := &TestQueryable{id: id, stepMs: 15000}
|
||||
return getEng().NewInstantQuery(
|
||||
queriable,
|
||||
nil,
|
||||
string(data[id].request),
|
||||
time.Unix(0, int64(timeMS)*1000000))
|
||||
})
|
||||
}
|
||||
|
||||
//export pqlSeries
|
||||
@ -255,13 +256,16 @@ func wrapErrorStr(err error) string {
|
||||
return err.Error()
|
||||
}
|
||||
|
||||
func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
|
||||
func pql(id uint32, c *ctx, optimizable bool,
|
||||
fromMs int64, toMs int64, stepMs int64,
|
||||
query func() (promql.Query, error)) uint32 {
|
||||
rq, err := query()
|
||||
|
||||
if err != nil {
|
||||
c.response = wrapError(err)
|
||||
return 1
|
||||
}
|
||||
|
||||
var walk func(node parser.Node, i func(node parser.Node))
|
||||
walk = func(node parser.Node, i func(node parser.Node)) {
|
||||
i(node)
|
||||
@ -269,9 +273,33 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
|
||||
walk(n, i)
|
||||
}
|
||||
}
|
||||
|
||||
subsels := strings.Builder{}
|
||||
subsels.WriteString("{")
|
||||
if optimizable {
|
||||
var (
|
||||
subselsMap map[string]string
|
||||
err error
|
||||
)
|
||||
subselsMap, rq, err = optimizeQuery(rq, fromMs, toMs, stepMs)
|
||||
if err != nil {
|
||||
c.response = wrapError(err)
|
||||
return 1
|
||||
}
|
||||
i := 0
|
||||
for k, v := range subselsMap {
|
||||
if i != 0 {
|
||||
subsels.WriteString(",")
|
||||
}
|
||||
subsels.WriteString(fmt.Sprintf(`"%s":"%s"`, strconv.Quote(k), strconv.Quote(v)))
|
||||
i++
|
||||
}
|
||||
}
|
||||
subsels.WriteString("}")
|
||||
|
||||
matchersJSON := getmatchersJSON(rq)
|
||||
|
||||
c.response = []byte(matchersJSON)
|
||||
c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s}`, subsels.String(), matchersJSON))
|
||||
c.onDataLoad = func(c *ctx) {
|
||||
ctxId := gcContext.GetContextID()
|
||||
gcContext.SetContext(id)
|
||||
@ -284,6 +312,68 @@ func pql(id uint32, c *ctx, query func() (promql.Query, error)) uint32 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[string]string, promql.Query, error) {
|
||||
appliableNodes := findAppliableNodes(q.Statement(), nil)
|
||||
var err error
|
||||
subsels := make(map[string]string)
|
||||
for _, m := range appliableNodes {
|
||||
fmt.Println(m)
|
||||
opt := m.optimizer
|
||||
opt = &promql2.FinalizerOptimizer{
|
||||
SubOptimizer: opt,
|
||||
}
|
||||
opt, err = promql2.PlanOptimize(m.node, opt)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
planner, err := opt.Optimize(m.node)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano())
|
||||
swapChild(m.parent, m.node, &parser.VectorSelector{
|
||||
Name: fakeMetric,
|
||||
OriginalOffset: 0,
|
||||
Offset: 0,
|
||||
Timestamp: nil,
|
||||
StartOrEnd: 0,
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
{
|
||||
Type: labels.MatchEqual,
|
||||
Name: "__name__",
|
||||
Value: fakeMetric,
|
||||
},
|
||||
},
|
||||
UnexpandedSeriesSet: nil,
|
||||
Series: nil,
|
||||
PosRange: parser.PositionRange{},
|
||||
})
|
||||
sel, err := planner.Process(&shared2.PlannerContext{
|
||||
IsCluster: false,
|
||||
From: time.Unix(0, fromMs*1000000),
|
||||
To: time.Unix(0, toMs*1000000),
|
||||
Step: time.Millisecond * time.Duration(stepMs),
|
||||
TimeSeriesTable: "time_series",
|
||||
TimeSeriesDistTable: "time_series_dist",
|
||||
TimeSeriesGinTable: "time_series_gin",
|
||||
MetricsTable: "metrics_15s",
|
||||
MetricsDistTable: "metrics_15s_dist",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
strSel, err := sel.String(&sql.Ctx{
|
||||
Params: map[string]sql.SQLObject{},
|
||||
Result: map[string]sql.SQLObject{},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
subsels[fakeMetric] = strSel
|
||||
}
|
||||
return subsels, q, nil
|
||||
}
|
||||
|
||||
//export onDataLoad
|
||||
func onDataLoad(idx uint32) {
|
||||
data[idx].onDataLoad(data[idx])
|
||||
@ -358,9 +448,171 @@ func writeVector(v promql.Vector) string {
|
||||
}
|
||||
|
||||
func main() {
|
||||
p := sync.Pool{}
|
||||
a := p.Get()
|
||||
_ = a
|
||||
queriable := &TestQueryable{id: 0, stepMs: 15000}
|
||||
rq, err := getEng().NewRangeQuery(
|
||||
queriable,
|
||||
nil,
|
||||
"histogram_quantile(0.5, sum by (container) (rate(network_usage{container=~\"awesome\"}[5m])))",
|
||||
time.Now().Add(time.Hour*-24),
|
||||
time.Now(),
|
||||
time.Millisecond*time.Duration(15000))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
matchers := findAppliableNodes(rq.Statement(), nil)
|
||||
for _, m := range matchers {
|
||||
fmt.Println(m)
|
||||
opt := m.optimizer
|
||||
opt = &promql2.FinalizerOptimizer{
|
||||
SubOptimizer: opt,
|
||||
}
|
||||
opt, err = promql2.PlanOptimize(m.node, opt)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
planner, err := opt.Optimize(m.node)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano())
|
||||
fmt.Println(rq.Statement())
|
||||
swapChild(m.parent, m.node, &parser.VectorSelector{
|
||||
Name: fakeMetric,
|
||||
OriginalOffset: 0,
|
||||
Offset: 0,
|
||||
Timestamp: nil,
|
||||
StartOrEnd: 0,
|
||||
LabelMatchers: []*labels.Matcher{
|
||||
{
|
||||
Type: labels.MatchEqual,
|
||||
Name: "__name__",
|
||||
Value: fakeMetric,
|
||||
},
|
||||
},
|
||||
UnexpandedSeriesSet: nil,
|
||||
Series: nil,
|
||||
PosRange: parser.PositionRange{},
|
||||
})
|
||||
fmt.Println(rq.Statement())
|
||||
sel, err := planner.Process(&shared2.PlannerContext{
|
||||
IsCluster: false,
|
||||
From: time.Now().Add(time.Hour * -24),
|
||||
To: time.Now(),
|
||||
Step: time.Millisecond * time.Duration(15000),
|
||||
TimeSeriesTable: "time_series",
|
||||
TimeSeriesDistTable: "time_series_dist",
|
||||
TimeSeriesGinTable: "time_series_gin",
|
||||
MetricsTable: "metrics_15s",
|
||||
MetricsDistTable: "metrics_15s_dist",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
strSel, err := sel.String(&sql.Ctx{
|
||||
Params: map[string]sql.SQLObject{},
|
||||
Result: map[string]sql.SQLObject{},
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
println(strSel)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getOptimizer(n parser.Node) promql2.IOptimizer {
|
||||
for _, f := range promql2.Optimizers {
|
||||
opt := f()
|
||||
if opt.IsAppliable(n) {
|
||||
return opt
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isRate(node parser.Node) (bool, bool) {
|
||||
opt := getOptimizer(node)
|
||||
if opt == nil {
|
||||
return false, true
|
||||
}
|
||||
return true, false
|
||||
}
|
||||
|
||||
type MatchNode struct {
|
||||
node parser.Node
|
||||
parent parser.Node
|
||||
optimizer promql2.IOptimizer
|
||||
}
|
||||
|
||||
func findAppliableNodes(root parser.Node, parent parser.Node) []MatchNode {
|
||||
var res []MatchNode
|
||||
optimizer := getOptimizer(root)
|
||||
if optimizer != nil {
|
||||
res = append(res, MatchNode{
|
||||
node: root,
|
||||
parent: parent,
|
||||
optimizer: optimizer,
|
||||
})
|
||||
return res
|
||||
}
|
||||
for _, n := range parser.Children(root) {
|
||||
res = append(res, findAppliableNodes(n, root)...)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func swapChild(node parser.Node, child parser.Node, newChild parser.Expr) {
|
||||
// For some reasons these switches have significantly better performance than interfaces
|
||||
switch n := node.(type) {
|
||||
case *parser.EvalStmt:
|
||||
n.Expr = newChild
|
||||
case parser.Expressions:
|
||||
for i, e := range n {
|
||||
if e.String() == child.String() {
|
||||
n[i] = newChild
|
||||
}
|
||||
}
|
||||
case *parser.AggregateExpr:
|
||||
if n.Expr == nil && n.Param == nil {
|
||||
return
|
||||
} else if n.Expr == nil {
|
||||
n.Param = newChild
|
||||
} else if n.Param == nil {
|
||||
n.Expr = newChild
|
||||
} else {
|
||||
if n.Expr.String() == child.String() {
|
||||
n.Expr = newChild
|
||||
} else {
|
||||
n.Param = newChild
|
||||
}
|
||||
}
|
||||
case *parser.BinaryExpr:
|
||||
if n.LHS.String() == child.String() {
|
||||
n.LHS = newChild
|
||||
} else if n.RHS.String() == child.String() {
|
||||
n.RHS = newChild
|
||||
}
|
||||
case *parser.Call:
|
||||
for i, e := range n.Args {
|
||||
if e.String() == child.String() {
|
||||
n.Args[i] = newChild
|
||||
}
|
||||
}
|
||||
case *parser.SubqueryExpr:
|
||||
n.Expr = newChild
|
||||
case *parser.ParenExpr:
|
||||
n.Expr = newChild
|
||||
case *parser.UnaryExpr:
|
||||
n.Expr = newChild
|
||||
case *parser.MatrixSelector:
|
||||
n.VectorSelector = newChild
|
||||
case *parser.StepInvariantExpr:
|
||||
n.Expr = newChild
|
||||
}
|
||||
}
|
||||
|
||||
func getChildren(e parser.Node) []parser.Node {
|
||||
return parser.Children(e)
|
||||
}
|
||||
|
||||
type TestLogger struct{}
|
||||
@ -585,3 +837,17 @@ func matchers2Str(labelMatchers []*labels.Matcher) string {
|
||||
matchersJson.WriteString("]")
|
||||
return matchersJson.String()
|
||||
}
|
||||
|
||||
type pqlRequest struct {
|
||||
optimizable bool
|
||||
body string
|
||||
}
|
||||
|
||||
func (p *pqlRequest) Read(body []byte) {
|
||||
r := BinaryReader{buffer: body}
|
||||
p.optimizable = r.ReadULeb32() != 0
|
||||
p.body = r.ReadString()
|
||||
if !p.optimizable {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -61,8 +61,8 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
|
||||
const end = endMs || Date.now()
|
||||
const step = stepMs || 15000
|
||||
return await pql(query,
|
||||
(ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step),
|
||||
(matchers) => getData(matchers, start, end))
|
||||
(ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
|
||||
(matchers, subq) => getData(matchers, start, end, subq))
|
||||
}
|
||||
|
||||
/**
|
||||
@ -76,8 +76,8 @@ module.exports.pqlInstantQuery = async (query, timeMs, getData) => {
|
||||
const time = timeMs || Date.now()
|
||||
const _wasm = getWasm()
|
||||
return await pql(query,
|
||||
(ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time),
|
||||
(matchers) => getData(matchers, time - 300000, time))
|
||||
(ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
|
||||
(matchers, subq) => getData(matchers, time - 300000, time, subq))
|
||||
}
|
||||
|
||||
module.exports.pqlMatchers = (query) => {
|
||||
@ -163,7 +163,7 @@ const pql = async (query, wasmCall, getData) => {
|
||||
|
||||
const matchersResults = await Promise.all(
|
||||
matchersObj.map(async (matchers, i) => {
|
||||
const data = await getData(matchers)
|
||||
const data = await getData(matchers.matchers, matchers.subqueries)
|
||||
return { matchers, data }
|
||||
}))
|
||||
|
||||
|
71
wasm_parts/promql/aggregate.go
Normal file
71
wasm_parts/promql/aggregate.go
Normal file
@ -0,0 +1,71 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type AggregateOptimizer struct {
|
||||
WithLabelsIn string
|
||||
WithLabelsOut string
|
||||
|
||||
subOptimizer IOptimizer
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) IsAppliable(node parser.Node) bool {
|
||||
aggExpr, ok := node.(*parser.AggregateExpr)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if aggExpr.Op != parser.SUM {
|
||||
return false
|
||||
}
|
||||
return GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory)) != nil
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) PlanOptimize(node parser.Node) error {
|
||||
aggExpr := node.(*parser.AggregateExpr)
|
||||
a.subOptimizer = GetAppliableOptimizer(aggExpr.Expr, append(Optimizers, VectorSelectorOptimizerFactory))
|
||||
return a.subOptimizer.PlanOptimize(node)
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
aggExpr := node.(*parser.AggregateExpr)
|
||||
planner, err := a.subOptimizer.Optimize(aggExpr.Expr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
withLabelsIn := a.WithLabelsIn
|
||||
if withLabelsIn == "" {
|
||||
planner = &planners.LabelsInitPlanner{
|
||||
Main: planner,
|
||||
FingerprintsAlias: "fp_sel",
|
||||
}
|
||||
withLabelsIn = "labels"
|
||||
}
|
||||
if a.WithLabelsOut == "" {
|
||||
return nil, fmt.Errorf("AggregateOptimizer.WithLabelsOut is empty")
|
||||
}
|
||||
byWithout := "by"
|
||||
if aggExpr.Without {
|
||||
byWithout = "without"
|
||||
}
|
||||
planner = &planners.ByWithoutPlanner{
|
||||
Main: planner,
|
||||
FingerprintWithName: withLabelsIn,
|
||||
FingerprintsOutName: a.WithLabelsOut,
|
||||
ByWithout: byWithout,
|
||||
Labels: aggExpr.Grouping,
|
||||
}
|
||||
planner = &planners.SumPlanner{
|
||||
Main: planner,
|
||||
LabelsAlias: a.WithLabelsOut,
|
||||
}
|
||||
return planner, nil
|
||||
}
|
||||
|
||||
func (a *AggregateOptimizer) Children() []IOptimizer {
|
||||
return []IOptimizer{a.subOptimizer}
|
||||
}
|
45
wasm_parts/promql/finalize.go
Normal file
45
wasm_parts/promql/finalize.go
Normal file
@ -0,0 +1,45 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type FinalizerOptimizer struct {
|
||||
LabelsIn string
|
||||
SubOptimizer IOptimizer
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) IsAppliable(node parser.Node) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
planner, err := f.SubOptimizer.Optimize(node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
labelsIn := f.LabelsIn
|
||||
if labelsIn == "" {
|
||||
planner = &planners.LabelsInitPlanner{
|
||||
Main: planner,
|
||||
FingerprintsAlias: "fp_sel",
|
||||
}
|
||||
labelsIn = "labels"
|
||||
}
|
||||
|
||||
planner = &planners.FinalizePlanner{
|
||||
LabelsAlias: labelsIn,
|
||||
Main: planner,
|
||||
}
|
||||
return planner, nil
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) PlanOptimize(node parser.Node) error {
|
||||
return f.SubOptimizer.PlanOptimize(node)
|
||||
}
|
||||
|
||||
func (f *FinalizerOptimizer) Children() []IOptimizer {
|
||||
return []IOptimizer{f.SubOptimizer}
|
||||
}
|
37
wasm_parts/promql/optimize.go
Normal file
37
wasm_parts/promql/optimize.go
Normal file
@ -0,0 +1,37 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
)
|
||||
|
||||
func PlanOptimize(node parser.Node, optimizer IOptimizer) (IOptimizer, error) {
|
||||
err := optimizer.PlanOptimize(node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var checkLabelAliases func(opt IOptimizer, i int) int
|
||||
checkLabelAliases = func(opt IOptimizer, i int) int {
|
||||
var _i int
|
||||
for _, c := range opt.Children() {
|
||||
_i = checkLabelAliases(c, i)
|
||||
}
|
||||
switch opt.(type) {
|
||||
case *AggregateOptimizer:
|
||||
if _i != 0 {
|
||||
opt.(*AggregateOptimizer).WithLabelsIn = fmt.Sprintf("labels_", _i)
|
||||
}
|
||||
opt.(*AggregateOptimizer).WithLabelsOut = fmt.Sprintf("labels_%d", _i+1)
|
||||
_i++
|
||||
case *FinalizerOptimizer:
|
||||
if _i != 0 {
|
||||
opt.(*FinalizerOptimizer).LabelsIn = fmt.Sprintf("labels_%d", _i)
|
||||
}
|
||||
_i++
|
||||
}
|
||||
return _i
|
||||
}
|
||||
checkLabelAliases(optimizer, 0)
|
||||
return optimizer, nil
|
||||
}
|
48
wasm_parts/promql/planners/aggregate.go
Normal file
48
wasm_parts/promql/planners/aggregate.go
Normal file
@ -0,0 +1,48 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type SumPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
LabelsAlias string
|
||||
}
|
||||
|
||||
func (s *SumPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := s.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var withLabels *sql.With
|
||||
for _, w := range main.GetWith() {
|
||||
if w.GetAlias() == s.LabelsAlias {
|
||||
withLabels = w
|
||||
break
|
||||
}
|
||||
}
|
||||
if withLabels == nil {
|
||||
return nil, fmt.Errorf("labels subrequest not found")
|
||||
}
|
||||
withMain := sql.NewWith(main, "pre_sum")
|
||||
|
||||
res := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol(withLabels.GetAlias()+".new_fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("pre_sum.timestamp_ms", "timestamp_ms"),
|
||||
sql.NewSimpleCol("sum(pre_sum.value)", "value")).
|
||||
From(sql.NewWithRef(withMain)).
|
||||
Join(sql.NewJoin(
|
||||
"ANY LEFT",
|
||||
sql.NewWithRef(withLabels),
|
||||
sql.Eq(
|
||||
sql.NewRawObject("pre_sum.fingerprint"),
|
||||
sql.NewRawObject(withLabels.GetAlias()+".fingerprint")))).
|
||||
GroupBy(
|
||||
sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint"),
|
||||
sql.NewRawObject("pre_sum.timestamp_ms"))
|
||||
return res, nil
|
||||
}
|
59
wasm_parts/promql/planners/by_without.go
Normal file
59
wasm_parts/promql/planners/by_without.go
Normal file
@ -0,0 +1,59 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type ByWithoutPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
FingerprintWithName string
|
||||
FingerprintsOutName string
|
||||
ByWithout string
|
||||
Labels []string
|
||||
}
|
||||
|
||||
func (b *ByWithoutPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := b.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var fp *sql.With
|
||||
withs := main.GetWith()
|
||||
for _, w := range withs {
|
||||
if w.GetAlias() == b.FingerprintWithName {
|
||||
fp = w
|
||||
break
|
||||
}
|
||||
}
|
||||
if fp == nil {
|
||||
return nil, fmt.Errorf("fingerprints subrequest not found")
|
||||
}
|
||||
labelsCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
cond := "IN"
|
||||
if b.ByWithout == "without" {
|
||||
cond = "NOT IN"
|
||||
}
|
||||
values := make([]string, len(b.Labels))
|
||||
var err error
|
||||
for i, l := range b.Labels {
|
||||
values[i], err = sql.NewStringVal(l).String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("mapFilter((k,v) -> k %s (%s), labels)", cond, strings.Join(values, ",")), nil
|
||||
})
|
||||
newFpCol := "cityHash64(arraySort(arrayZip(mapKeys(labels), mapValues(labels))))"
|
||||
newFp := sql.NewSelect().
|
||||
Select(
|
||||
sql.NewSimpleCol(fp.GetAlias()+".new_fingerprint", "fingerprint"),
|
||||
sql.NewCol(labelsCol, "labels"),
|
||||
sql.NewSimpleCol(newFpCol, "new_fingerprint"),
|
||||
).
|
||||
From(sql.NewWithRef(fp))
|
||||
withNewFp := sql.NewWith(newFp, b.FingerprintsOutName)
|
||||
return main.AddWith(withNewFp), nil
|
||||
}
|
47
wasm_parts/promql/planners/finalize.go
Normal file
47
wasm_parts/promql/planners/finalize.go
Normal file
@ -0,0 +1,47 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type FinalizePlanner struct {
|
||||
LabelsAlias string
|
||||
Main shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (f *FinalizePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := f.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var withLabels *sql.With
|
||||
for _, w := range main.GetWith() {
|
||||
if w.GetAlias() == f.LabelsAlias {
|
||||
withLabels = w
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if withLabels == nil {
|
||||
return nil, fmt.Errorf("FinalizePlanner.Process: %s CTE not found", f.LabelsAlias)
|
||||
}
|
||||
|
||||
withMain := sql.NewWith(main, "pre_final")
|
||||
res := sql.NewSelect().With(withMain).Select(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol("pre_final.fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol(withLabels.GetAlias()+".labels", "labels"),
|
||||
sql.NewSimpleCol("arraySort(groupArray((pre_final.timestamp_ms, pre_final.value)))", "values"),
|
||||
).From(sql.NewWithRef(withMain)).
|
||||
Join(sql.NewJoin(
|
||||
"ANY LEFT",
|
||||
sql.NewWithRef(withLabels),
|
||||
sql.Eq(
|
||||
sql.NewRawObject("pre_final.fingerprint"),
|
||||
sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint")))).
|
||||
GroupBy(sql.NewRawObject("pre_final.fingerprint"), sql.NewRawObject(withLabels.GetAlias()+".labels"))
|
||||
return res, nil
|
||||
}
|
48
wasm_parts/promql/planners/labels_init.go
Normal file
48
wasm_parts/promql/planners/labels_init.go
Normal file
@ -0,0 +1,48 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type LabelsInitPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
FingerprintsAlias string
|
||||
}
|
||||
|
||||
func (l *LabelsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := l.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var withFp *sql.With
|
||||
for _, w := range main.GetWith() {
|
||||
if w.GetAlias() == l.FingerprintsAlias {
|
||||
withFp = w
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if withFp == nil {
|
||||
return nil, fmt.Errorf("fingerprints subrequest not found")
|
||||
}
|
||||
|
||||
labelsCol := "mapFromArrays(" +
|
||||
"arrayMap(x -> x.1, JSONExtractKeysAndValues(time_series.labels, 'String') as ts_kv), " +
|
||||
"arrayMap(x -> x.2, ts_kv))"
|
||||
|
||||
labelsSubSel := sql.NewSelect().Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol(labelsCol, "labels"),
|
||||
sql.NewSimpleCol("fingerprint", "new_fingerprint")).
|
||||
From(sql.NewSimpleCol(ctx.TimeSeriesTable, "time_series")).
|
||||
AndWhere(
|
||||
sql.Ge(sql.NewRawObject("date"), sql.NewStringVal(ctx.From.Format("2006-01-02"))),
|
||||
sql.Le(sql.NewRawObject("date"), sql.NewStringVal(ctx.To.Format("2006-01-02"))),
|
||||
sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFp)))
|
||||
withLabelsSubSel := sql.NewWith(labelsSubSel, "labels")
|
||||
|
||||
return main.AddWith(withLabelsSubSel), nil
|
||||
}
|
36
wasm_parts/promql/planners/metrics_extend.go
Normal file
36
wasm_parts/promql/planners/metrics_extend.go
Normal file
@ -0,0 +1,36 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type MetricsExtendPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := m.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
extendCnt := 300000 / ctx.Step.Milliseconds()
|
||||
if extendCnt < 1 {
|
||||
return main, nil
|
||||
}
|
||||
withMain := sql.NewWith(main, "pre_extend")
|
||||
extendedCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"argMaxIf(value, timestamp_ms, isNaN(value) = 0) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", extendCnt), nil
|
||||
})
|
||||
extend := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewCol(extendedCol, "value")).
|
||||
From(sql.NewWithRef(withMain))
|
||||
return extend, nil
|
||||
}
|
56
wasm_parts/promql/planners/metrics_rate.go
Normal file
56
wasm_parts/promql/planners/metrics_rate.go
Normal file
@ -0,0 +1,56 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type RatePlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
func (m *RatePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := m.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rateCnt := m.Duration.Milliseconds() / ctx.Step.Milliseconds()
|
||||
if rateCnt < 1 {
|
||||
rateCnt = 1
|
||||
}
|
||||
withMain := sql.NewWith(main, "pre_rate")
|
||||
lastCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"argMax(value, timestamp_ms) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", rateCnt), nil
|
||||
})
|
||||
firstCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"argMin(value, timestamp_ms) OVER ("+
|
||||
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
|
||||
")", rateCnt), nil
|
||||
})
|
||||
valueCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf(
|
||||
"if(last > first, last - first, last) / %f", m.Duration.Seconds()), nil
|
||||
})
|
||||
extend := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewCol(lastCol, "last"),
|
||||
sql.NewCol(firstCol, "first"),
|
||||
sql.NewCol(valueCol, "_value")).
|
||||
From(sql.NewWithRef(withMain))
|
||||
withExtend := sql.NewWith(extend, "rate")
|
||||
return sql.NewSelect().
|
||||
With(withExtend).
|
||||
Select(sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
|
||||
sql.NewSimpleCol("_value", "value")).
|
||||
From(sql.NewWithRef(withExtend)), nil
|
||||
}
|
36
wasm_parts/promql/planners/metrics_raw_init.go
Normal file
36
wasm_parts/promql/planners/metrics_raw_init.go
Normal file
@ -0,0 +1,36 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type MetricsInitPlanner struct {
|
||||
ValueCol sql.SQLObject
|
||||
Fingerprint shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (m *MetricsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
fpReq, err := m.Fingerprint.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
withFpReq := sql.NewWith(fpReq, "fp_sel")
|
||||
if m.ValueCol == nil {
|
||||
m.ValueCol = sql.NewRawObject("argMaxMerge(last)")
|
||||
}
|
||||
tsNsCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf("intDiv(timestamp_ns, %d) * %d", ctx.Step.Nanoseconds(), ctx.Step.Milliseconds()), nil
|
||||
})
|
||||
return sql.NewSelect().With(withFpReq).Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewCol(tsNsCol, "timestamp_ms"),
|
||||
sql.NewCol(m.ValueCol, "value")).
|
||||
From(sql.NewSimpleCol(ctx.MetricsTable, "metrics")).
|
||||
AndWhere(
|
||||
sql.Ge(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())),
|
||||
sql.Le(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.To.UnixNano())),
|
||||
sql.NewIn(sql.NewRawObject("fingerprint"), sql.NewWithRef(withFpReq))).
|
||||
GroupBy(sql.NewRawObject("fingerprint"), sql.NewRawObject("timestamp_ms")), nil
|
||||
}
|
45
wasm_parts/promql/planners/metrics_zerofill.go
Normal file
45
wasm_parts/promql/planners/metrics_zerofill.go
Normal file
@ -0,0 +1,45 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type MetricsZeroFillPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
}
|
||||
|
||||
func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := m.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
withMain := sql.NewWith(main, "prezerofill")
|
||||
arrLen := (ctx.To.UnixNano()-ctx.From.UnixNano())/ctx.Step.Nanoseconds() + 1
|
||||
zeroFillCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf("groupArrayInsertAt(nan, %d)(value, toUInt32(intDiv(timestamp_ms - %d, %d)))",
|
||||
arrLen, ctx.From.UnixMilli(), ctx.Step.Milliseconds()), nil
|
||||
})
|
||||
zeroFill := sql.NewSelect().With(withMain).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewCol(zeroFillCol, "values")).
|
||||
From(sql.NewWithRef(withMain)).
|
||||
GroupBy(sql.NewRawObject("fingerprint"))
|
||||
withZeroFill := sql.NewWith(zeroFill, "zerofill")
|
||||
|
||||
joinZeroFillStmt := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
|
||||
return fmt.Sprintf("arrayMap((x,y) -> (y * %d + %d, x), values, range(%d))",
|
||||
ctx.Step.Milliseconds(), ctx.From.UnixMilli(), arrLen), nil
|
||||
})
|
||||
|
||||
postZeroFill := sql.NewSelect().With(withZeroFill).
|
||||
Select(
|
||||
sql.NewSimpleCol("fingerprint", "fingerprint"),
|
||||
sql.NewSimpleCol("val.1", "timestamp_ms"),
|
||||
sql.NewSimpleCol("val.2", "value")).
|
||||
From(sql.NewWithRef(withZeroFill)).
|
||||
Join(sql.NewJoin("array", sql.NewCol(joinZeroFillStmt, "val"), nil))
|
||||
return postZeroFill, nil
|
||||
}
|
102
wasm_parts/promql/planners/stream_select_planner.go
Normal file
102
wasm_parts/promql/planners/stream_select_planner.go
Normal file
@ -0,0 +1,102 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"strings"
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type StreamSelectPlanner struct {
|
||||
Main shared.RequestPlanner
|
||||
Matchers []*labels.Matcher
|
||||
}
|
||||
|
||||
func (s *StreamSelectPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
main, err := s.Main.Process(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conds := make([]sql.SQLCondition, len(s.Matchers))
|
||||
for i, m := range s.Matchers {
|
||||
conds[i], err = s.getCond(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
main.AndWhere(sql.Or(conds...))
|
||||
|
||||
bitSetEntries := make([]*bitSetEntry, len(conds))
|
||||
for i, c := range conds {
|
||||
bitSetEntries[i] = &bitSetEntry{c, i}
|
||||
}
|
||||
main.AndHaving(sql.Eq(&bitSet{entries: bitSetEntries}, sql.NewIntVal((int64(1)<<uint(len(conds)))-1)))
|
||||
return main, nil
|
||||
}
|
||||
|
||||
func (s *StreamSelectPlanner) getCond(m *labels.Matcher) (sql.SQLCondition, error) {
|
||||
keyCond := sql.Eq(sql.NewRawObject("key"), sql.NewStringVal(m.Name))
|
||||
var valCond sql.SQLCondition
|
||||
switch m.Type {
|
||||
case labels.MatchEqual:
|
||||
valCond = sql.Eq(sql.NewRawObject("val"), sql.NewStringVal(m.Value))
|
||||
case labels.MatchNotEqual:
|
||||
valCond = sql.Neq(sql.NewRawObject("val"), sql.NewStringVal(m.Value))
|
||||
case labels.MatchRegexp:
|
||||
valCond = sql.Eq(&pregMatch{sql.NewRawObject("val"), sql.NewStringVal(m.Value)},
|
||||
sql.NewIntVal(1))
|
||||
case labels.MatchNotRegexp:
|
||||
valCond = sql.Eq(&pregMatch{sql.NewRawObject("val"), sql.NewStringVal(m.Value)},
|
||||
sql.NewIntVal(0))
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown matcher type: %v", m.Type)
|
||||
}
|
||||
return sql.And(keyCond, valCond), nil
|
||||
}
|
||||
|
||||
type pregMatch struct {
|
||||
key sql.SQLObject
|
||||
val sql.SQLObject
|
||||
}
|
||||
|
||||
func (p *pregMatch) String(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
strK, err := p.key.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
strV, err := p.val.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("match(%s, %s)", strK, strV), nil
|
||||
}
|
||||
|
||||
type bitSetEntry struct {
|
||||
cond sql.SQLCondition
|
||||
idx int
|
||||
}
|
||||
|
||||
func (b bitSetEntry) String(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
strCond, err := b.cond.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("bitShiftLeft(toUInt64(%s), %d)", strCond, b.idx), nil
|
||||
}
|
||||
|
||||
type bitSet struct {
|
||||
entries []*bitSetEntry
|
||||
}
|
||||
|
||||
func (b bitSet) String(ctx *sql.Ctx, options ...int) (string, error) {
|
||||
strEntries := make([]string, len(b.entries))
|
||||
var err error
|
||||
for i, e := range b.entries {
|
||||
strEntries[i], err = e.String(ctx, options...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("groupBitOr(%s)", strings.Join(strEntries, "+")), nil
|
||||
}
|
20
wasm_parts/promql/planners/time_series_gin_init.go
Normal file
20
wasm_parts/promql/planners/time_series_gin_init.go
Normal file
@ -0,0 +1,20 @@
|
||||
package planners
|
||||
|
||||
import (
|
||||
"wasm_parts/promql/shared"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type TimeSeriesGinInitPlanner struct {
|
||||
}
|
||||
|
||||
func (t *TimeSeriesGinInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
|
||||
return sql.NewSelect().
|
||||
Select(sql.NewSimpleCol("fingerprint", "fingerprint")).
|
||||
From(sql.NewSimpleCol(ctx.TimeSeriesGinTable, "ts_gin")).
|
||||
AndWhere(
|
||||
sql.Ge(sql.NewRawObject("date"), sql.NewStringVal(ctx.From.Format("2006-01-02"))),
|
||||
sql.Le(sql.NewRawObject("date"), sql.NewStringVal(ctx.To.Format("2006-01-02"))),
|
||||
sql.NewIn(sql.NewRawObject("type"), sql.NewIntVal(0), sql.NewIntVal(2))).
|
||||
GroupBy(sql.NewRawObject("fingerprint")), nil
|
||||
}
|
62
wasm_parts/promql/rate.go
Normal file
62
wasm_parts/promql/rate.go
Normal file
@ -0,0 +1,62 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type RateOptimizer struct {
|
||||
vectorSelectorOptimizer *VectorSelectorOptimizer
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) IsAppliable(node parser.Node) bool {
|
||||
_node, ok := node.(*parser.Call)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
vectorSelector := r.getVectorSelector(_node)
|
||||
return vectorSelector != nil && (&VectorSelectorOptimizer{}).IsAppliable(vectorSelector)
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
_node, ok := node.(*parser.Call)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
vectorSelector := r.getVectorSelector(_node)
|
||||
matrixSelector := _node.Args[0].(*parser.MatrixSelector)
|
||||
res, err := (&VectorSelectorOptimizer{}).Optimize(vectorSelector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = &planners.RatePlanner{
|
||||
Main: res,
|
||||
Duration: matrixSelector.Range,
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (v *RateOptimizer) PlanOptimize(node parser.Node) error {
|
||||
v.vectorSelectorOptimizer = &VectorSelectorOptimizer{}
|
||||
return v.vectorSelectorOptimizer.PlanOptimize(node)
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) getVectorSelector(node *parser.Call) *parser.VectorSelector {
|
||||
if node.Func.Name != "rate" || len(node.Args) != 1 {
|
||||
return nil
|
||||
}
|
||||
_matrixSelector, ok := node.Args[0].(*parser.MatrixSelector)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
vectorSelector, ok := _matrixSelector.VectorSelector.(*parser.VectorSelector)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return vectorSelector
|
||||
}
|
||||
|
||||
func (r *RateOptimizer) Children() []IOptimizer {
|
||||
return []IOptimizer{r.vectorSelectorOptimizer}
|
||||
}
|
22
wasm_parts/promql/shared/types.go
Normal file
22
wasm_parts/promql/shared/types.go
Normal file
@ -0,0 +1,22 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"time"
|
||||
sql "wasm_parts/sql_select"
|
||||
)
|
||||
|
||||
type RequestPlanner interface {
|
||||
Process(ctx *PlannerContext) (sql.ISelect, error)
|
||||
}
|
||||
|
||||
type PlannerContext struct {
|
||||
IsCluster bool
|
||||
From time.Time
|
||||
To time.Time
|
||||
Step time.Duration
|
||||
TimeSeriesTable string
|
||||
TimeSeriesDistTable string
|
||||
TimeSeriesGinTable string
|
||||
MetricsTable string
|
||||
MetricsDistTable string
|
||||
}
|
45
wasm_parts/promql/smart_optimizers.go
Normal file
45
wasm_parts/promql/smart_optimizers.go
Normal file
@ -0,0 +1,45 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type IOptimizer interface {
|
||||
IsAppliable(node parser.Node) bool
|
||||
Optimize(node parser.Node) (shared.RequestPlanner, error)
|
||||
PlanOptimize(node parser.Node) error
|
||||
Children() []IOptimizer
|
||||
}
|
||||
|
||||
type OptimizerFactory func() IOptimizer
|
||||
|
||||
var VectorSelectorOptimizerFactory OptimizerFactory = func() IOptimizer {
|
||||
return &VectorSelectorOptimizer{}
|
||||
}
|
||||
|
||||
var FinalizerOptimizerFactory OptimizerFactory = func() IOptimizer {
|
||||
return &FinalizerOptimizer{}
|
||||
}
|
||||
|
||||
var Optimizers = []OptimizerFactory{
|
||||
func() IOptimizer {
|
||||
return &RateOptimizer{}
|
||||
},
|
||||
func() IOptimizer {
|
||||
return &AggregateOptimizer{}
|
||||
},
|
||||
}
|
||||
|
||||
func GetAppliableOptimizer(node parser.Node, factories []OptimizerFactory) IOptimizer {
|
||||
if factories == nil {
|
||||
factories = Optimizers
|
||||
}
|
||||
for _, factory := range factories {
|
||||
opt := factory()
|
||||
if opt.IsAppliable(node) {
|
||||
return opt
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
39
wasm_parts/promql/vector.go
Normal file
39
wasm_parts/promql/vector.go
Normal file
@ -0,0 +1,39 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"wasm_parts/promql/planners"
|
||||
"wasm_parts/promql/shared"
|
||||
)
|
||||
|
||||
type VectorSelectorOptimizer struct {
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) IsAppliable(node parser.Node) bool {
|
||||
_, ok := node.(*parser.VectorSelector)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) PlanOptimize(node parser.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) Optimize(node parser.Node) (shared.RequestPlanner, error) {
|
||||
_node := node.(*parser.VectorSelector)
|
||||
var res shared.RequestPlanner = &planners.TimeSeriesGinInitPlanner{}
|
||||
res = &planners.StreamSelectPlanner{
|
||||
Main: res,
|
||||
Matchers: _node.LabelMatchers,
|
||||
}
|
||||
res = &planners.MetricsInitPlanner{
|
||||
ValueCol: nil,
|
||||
Fingerprint: res,
|
||||
}
|
||||
res = &planners.MetricsZeroFillPlanner{Main: res}
|
||||
res = &planners.MetricsExtendPlanner{Main: res}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (v *VectorSelectorOptimizer) Children() []IOptimizer {
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user