render endpoint impl

This commit is contained in:
akvlad
2024-08-05 17:49:53 +03:00
parent 505b6b5051
commit decb26cb5e
5 changed files with 344 additions and 201 deletions

View File

@ -6,13 +6,14 @@ type units = string;
export interface Flamebearer {
version: number,
flamebearerProfileV1: flamebearerProfileV1
telemetry: {[key: string]: any}
telemetry?: {[key: string]: any}
}
export interface flamebearerProfileV1 {
flamebearer: flamebearerV1,
metadata: flamebearerMetadataV1,
timeline: flamebearerTimelineV1,
groups: {[key: string]: flamebearerTimelineV1}
heatmap: heatmap,
leftTicks: string,
rightTicks: string,

View File

@ -9,10 +9,17 @@ const { QrynBadRequest } = require('../lib/handlers/errors')
const { clusterName } = require('../common')
const logger = require('../lib/logger')
const jsonParsers = require('./json_parsers')
const { parser, wrapResponse, parseTypeId, serviceNameSelectorQuery, labelSelectorQuery } = require('./shared')
const HISTORY_TIMESPAN = 1000 * 60 * 60 * 24 * 7
const {
parser,
wrapResponse,
parseTypeId,
serviceNameSelectorQuery,
labelSelectorQuery,
HISTORY_TIMESPAN
} = require('./shared')
const settings = require('./settings')
const { mergeStackTraces } = require('./merge_stack_traces')
const { selectSeriesImpl } = require('./select_series')
const render = require('./render')
const profileTypesHandler = async (req, res) => {
@ -103,138 +110,13 @@ const selectMergeStacktracesV2 = async (req, res) => {
}
const selectSeries = async (req, res) => {
const _req = req.body
const fromTimeSec = Math.floor(req.getStart && req.getStart()
? parseInt(req.getStart()) / 1000
: Date.now() / 1000 - HISTORY_TIMESPAN)
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
: Date.now() / 1000)
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
throw new QrynBadRequest('No type provided')
}
typeID = parseTypeId(typeID)
if (!typeID) {
throw new QrynBadRequest('Invalid type provided')
}
const dist = clusterName ? '_dist' : ''
const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit
const labelSelector = _req.getLabelSelector && _req.getLabelSelector()
let groupBy = _req.getGroupByList && _req.getGroupByList()
groupBy = groupBy && groupBy.length ? groupBy : null
const step = _req.getStep && parseInt(_req.getStep())
if (!step || isNaN(step)) {
throw new QrynBadRequest('No step provided')
}
const aggregation = _req.getAggregation && _req.getAggregation()
const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`))
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
typeIdSelector,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(new Sql.Raw(
`has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`),
1)
)
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
let tagsReq = 'arraySort(p.tags)'
if (groupBy) {
tagsReq = `arraySort(arrayFilter(x -> x.1 in (${groupBy.map(g => Sql.quoteVal(g)).join(',')}), p.tags))`
}
const labelsReq = (new Sql.Select()).with(withIdxReq).select(
'fingerprint',
[new Sql.Raw(tagsReq), 'tags'],
[groupBy ? new Sql.Raw('cityHash64(tags)') : 'fingerprint', 'new_fingerprint']
).distinct(true).from([`${DATABASE_NAME()}.profiles_series`, 'p'])
.where(Sql.And(
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
typeIdSelector,
serviceNameSelector
))
const withLabelsReq = new Sql.With('labels', labelsReq, !!clusterName)
let valueCol = new Sql.Raw(
`sum(toFloat64(arrayFirst(x -> x.1 == ${Sql.quoteVal(sampleTypeId)}, p.values_agg).2))`)
if (aggregation === types.TimeSeriesAggregationType.TIME_SERIES_AGGREGATION_TYPE_AVERAGE) {
valueCol = new Sql.Raw(
`sum(toFloat64(arrayFirst(x -> x.1 == ${Sql.quoteVal(sampleTypeId)}).2, p.values_agg)) / ` +
`sum(toFloat64(arrayFirst(x -> x.1 == ${Sql.quoteVal(sampleTypeId)}).3, p.values_agg))`
)
}
const mainReq = (new Sql.Select()).with(withIdxReq, withLabelsReq).select(
[new Sql.Raw(`intDiv(p.timestamp_ns, 1000000000 * ${step}) * ${step} * 1000`), 'timestamp_ms'],
[new Sql.Raw('labels.new_fingerprint'), 'fingerprint'],
[new Sql.Raw('min(labels.tags)'), 'labels'],
[valueCol, 'value']
).from([`${DATABASE_NAME()}.profiles${dist}`, 'p']).join(
[new Sql.WithReference(withLabelsReq), 'labels'],
'ANY LEFT',
Sql.Eq(new Sql.Raw('p.fingerprint'), new Sql.Raw('labels.fingerprint'))
).where(
Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)),
typeIdSelector,
serviceNameSelector
)
).groupBy('timestamp_ns', 'fingerprint')
.orderBy(['fingerprint', 'ASC'], ['timestamp_ns', 'ASC'])
const strMainReq = mainReq.toString()
const chRes = await clickhouse
.rawRequest(strMainReq + ' FORMAT JSON', null, DATABASE_NAME())
let lastFingerprint = null
const seriesList = []
let lastSeries = null
let lastPoints = []
for (let i = 0; i < chRes.data.data.length; i++) {
const e = chRes.data.data[i]
if (lastFingerprint !== e.fingerprint) {
lastFingerprint = e.fingerprint
lastSeries && lastSeries.setPointsList(lastPoints)
lastSeries && seriesList.push(lastSeries)
lastPoints = []
lastSeries = new types.Series()
lastSeries.setLabelsList(e.labels.map(l => {
const lp = new types.LabelPair()
lp.setName(l[0])
lp.setValue(l[1])
return lp
}))
}
const p = new types.Point()
p.setValue(e.value)
p.setTimestamp(e.timestamp_ms)
lastPoints.push(p)
}
lastSeries && lastSeries.setPointsList(lastPoints)
lastSeries && seriesList.push(lastSeries)
const resp = new messages.SelectSeriesResponse()
resp.setSeriesList(seriesList)
return resp
return selectSeriesImpl(fromTimeSec, toTimeSec, req.body)
}
const selectMergeProfile = async (req, res) => {

View File

@ -1,5 +1,8 @@
const { parseTypeId } = require('./shared')
const { mergeStackTraces } = require('./merge_stack_traces')
const querierMessages = require('./querier_pb')
const { selectSeriesImpl } = require('./select_series')
const types = require('./types/v1/types_pb')
const render = async (req, res) => {
const query = req.query.query
@ -8,12 +11,12 @@ const render = async (req, res) => {
? Math.floor(parseInt(req.query.from) / 1000)
: Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000)
const toTimeSec = req.query.until
? Math.floor(parseInt(req.query.from) / 1000)
? Math.floor(parseInt(req.query.until) / 1000)
: Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000)
if (!parsedQuery) {
return res.sendStatus(400).send('Invalid query')
}
const groupBy = req.query.groupBy
const groupBy = req.query.groupBy || []
let agg = ''
switch (req.query.aggregation) {
case 'sum':
@ -26,80 +29,184 @@ const render = async (req, res) => {
if (req.query.format === 'dot') {
return res.sendStatus(400).send('Dot format is not supported')
}
const mergeStackTrace = mergeStackTraces(
const promises = []
promises.push(mergeStackTraces(
parsedQuery.typeDesc,
parsedQuery.labelSelector,
'{' + parsedQuery.labelSelector + '}',
fromTimeSec,
toTimeSec,
req.log)
//TODO
req.log))
const timelineStep = calcIntervalSec(fromTimeSec, toTimeSec)
promises.push(selectSeriesImpl(
fromTimeSec,
toTimeSec,
{
getProfileTypeid: () => parsedQuery.typeId,
getLabelSelector: () => `{${parsedQuery.labelSelector}}`,
getGroupByList: () => groupBy,
getStep: () => timelineStep,
getAggregation: () => agg
}
))
const [bMergeStackTrace, selectSeries] =
await Promise.all(promises)
const mergeStackTrace = querierMessages.SelectMergeStacktracesResponse.deserializeBinary(bMergeStackTrace)
let series = new types.Series()
if (selectSeries.getSeriesList().length === 1) {
series = selectSeries.getSeriesList()[0]
}
const fb = toFlamebearer(mergeStackTrace.getFlamegraph(), parsedQuery.profileType)
fb.flamebearerProfileV1.timeline = timeline(series,
fromTimeSec * 1000,
toTimeSec * 1000,
timelineStep)
if (groupBy.length > 0) {
fb.flamebearerProfileV1.groups = {}
let key = '*'
series.getSeriesList().forEach((_series) => {
_series.getLabelsList().forEach((label) => {
key = label.getName() === groupBy[0] ? label.getValue() : key
})
})
fb.flamebearerProfileV1.groups[key] = timeline(series,
fromTimeSec * 1000,
toTimeSec * 1000,
timelineStep)
}
res.code(200)
res.headers({ 'Content-Type': 'application/json' })
return res.send(Buffer.from(JSON.stringify(fb.flamebearerProfileV1)))
}
/*
func (q *QueryHandlers) Render(w http.ResponseWriter, req *http.Request) {
var resFlame *connect.Response[querierv1.SelectMergeStacktracesResponse]
g, ctx := errgroup.WithContext(req.Context())
selectParamsClone := selectParams.CloneVT()
g.Go(func() error {
var err error
resFlame, err = q.client.SelectMergeStacktraces(ctx, connect.NewRequest(selectParamsClone))
return err
})
timelineStep := timeline.CalcPointInterval(selectParams.Start, selectParams.End)
var resSeries *connect.Response[querierv1.SelectSeriesResponse]
g.Go(func() error {
var err error
resSeries, err = q.client.SelectSeries(req.Context(),
connect.NewRequest(&querierv1.SelectSeriesRequest{
ProfileTypeID: selectParams.ProfileTypeID,
LabelSelector: selectParams.LabelSelector,
Start: selectParams.Start,
End: selectParams.End,
Step: timelineStep,
GroupBy: groupBy,
Aggregation: &aggregation,
}))
return err
})
err = g.Wait()
if err != nil {
httputil.Error(w, err)
return
}
seriesVal := &typesv1.Series{}
if len(resSeries.Msg.Series) == 1 {
seriesVal = resSeries.Msg.Series[0]
}
fb := phlaremodel.ExportToFlamebearer(resFlame.Msg.Flamegraph, profileType)
fb.Timeline = timeline.New(seriesVal, selectParams.Start, selectParams.End, int64(timelineStep))
if len(groupBy) > 0 {
fb.Groups = make(map[string]*flamebearer.FlamebearerTimelineV1)
for _, s := range resSeries.Msg.Series {
key := "*"
for _, l := range s.Labels {
// right now we only support one group by
if l.Name == groupBy[0] {
key = l.Value
break
}
}
fb.Groups[key] = timeline.New(s, selectParams.Start, selectParams.End, int64(timelineStep))
}
}
w.Header().Add("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(fb); err != nil {
httputil.Error(w, err)
return
}
}
/**
*
* @param fg
* @param profileType
* @returns {Flamebearer}
*/
function toFlamebearer (fg, profileType) {
if (!fg) {
fg = new querierMessages.FlameGraph()
}
let unit = profileType.getSampleUnit()
let sampleRate = 100
switch (profileType.getSampleType()) {
case 'inuse_objects':
case 'alloc_objects':
case 'goroutine':
case 'samples':
unit = 'objects'
break
case 'cpu':
unit = 'samples'
sampleRate = 1000000000
}
/** @type {flamebearerV1} */
const flameBearer = {
levels: fg.getLevelsList().map(l => l.getValuesList().map(v => v)),
maxSelf: fg.getMaxSelf(),
names: fg.getNamesList(),
numTicks: fg.getTotal()
}
/** @type {flamebearerMetadataV1} */
const metadata = {
format: 'single',
units: unit,
name: profileType.getSampleType(),
sampleRate: sampleRate
}
return {
version: 1,
flamebearerProfileV1: {
metadata: metadata,
flamebearer: flameBearer
}
}
}
/**
*
* @param fromSec {number}
* @param toSec {number}
* @returns {number}
*/
function calcIntervalSec (fromSec, toSec) {
return Math.max(Math.ceil((toSec - fromSec) / 1500), 15)
}
/**
*
* @param series
* @param startMs
* @param endMs
* @param durationDeltaSec
* @returns {flamebearerTimelineV1}
*/
function timeline (series, startMs, endMs, durationDeltaSec) {
const durationDeltaMs = durationDeltaSec * 1000
startMs = Math.floor(startMs / durationDeltaMs) * durationDeltaMs
endMs = Math.floor(endMs / durationDeltaMs) * durationDeltaMs
const startS = Math.floor(startMs / 1000)
/** @type {flamebearerTimelineV1} */
const timeline = {
durationDelta: durationDeltaSec,
startTime: startS,
samples: []
}
if (startMs >= endMs) {
return timeline
}
const points = boundPointsToWindow(series.getPointsList(), startMs, endMs)
if (points.length < 1) {
const n = sizeToBackfill(startMs, endMs, durationDeltaSec)
if (n > 0) {
timeline.samples = new Array(n).fill(0)
}
return timeline
}
let n = sizeToBackfill(startMs, parseInt(points[0].getTimestamp()), durationDeltaSec)
const samples = n > 0 ? Array(n).fill(0) : []
let prev = points[0]
for (const p of points) {
n = sizeToBackfill(parseInt(prev.getTimestamp()), parseInt(p.getTimestamp()), durationDeltaSec)
Array.prototype.push.apply(samples, new Array(Math.max(0, n - 1)).fill(0))
samples.push(p.getValue())
prev = p
}
Array.prototype.push.apply(samples,
new Array(Math.max(0, sizeToBackfill(startMs, endMs, durationDeltaSec) - samples.length))
.fill(0)
)
timeline.samples = samples
return timeline
}
/**
*
* @param points {[]}
* @param startMs {number}
* @param endMs {number}
*/
function boundPointsToWindow (points, startMs, endMs) {
const startIdx = points.findIndex((v) => v.getTimestamp() >= startMs)
const endIdx = points.findLastIndex((v) => v.getTimestamp() < endMs)
return points.slice(startIdx, endIdx + 1)
}
/**
*
* @param startMs {number}
* @param endMs {number}
* @param stepSec {number}
* @returns {number}
*/
function sizeToBackfill (startMs, endMs, stepSec) {
return Math.floor((endMs - startMs) / (stepSec * 1000))
}
/**
*
@ -120,11 +227,19 @@ const parseQuery = (query) => {
labels.push([m[2], m[3], m[4]])
}
}
const profileType = new types.ProfileType()
profileType.setId(typeId)
profileType.setName(typeDesc.type)
profileType.setSampleType(typeDesc.sampleType)
profileType.setSampleUnit(typeDesc.sampleUnit)
profileType.setPeriodType(typeDesc.periodType)
profileType.setPeriodUnit(typeDesc.periodUnit)
return {
typeId,
typeDesc,
labels,
labelSelector: strLabels
labelSelector: strLabels,
profileType
}
}

142
pyroscope/select_series.js Normal file
View File

@ -0,0 +1,142 @@
const { QrynBadRequest } = require('../lib/handlers/errors')
const { parseTypeId, serviceNameSelectorQuery, labelSelectorQuery } = require('./shared')
const { clusterName } = require('../common')
const Sql = require('@cloki/clickhouse-sql')
const { DATABASE_NAME } = require('../lib/utils')
const types = require('./types/v1/types_pb')
const clickhouse = require('../lib/db/clickhouse')
const messages = require('./querier_pb')
const selectSeriesImpl = async (fromTimeSec, toTimeSec, payload) => {
const _req = payload
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
throw new QrynBadRequest('No type provided')
}
typeID = parseTypeId(typeID)
if (!typeID) {
throw new QrynBadRequest('Invalid type provided')
}
const dist = clusterName ? '_dist' : ''
const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit
const labelSelector = _req.getLabelSelector && _req.getLabelSelector()
let groupBy = _req.getGroupByList && _req.getGroupByList()
groupBy = groupBy && groupBy.length ? groupBy : null
const step = _req.getStep && parseInt(_req.getStep())
if (!step || isNaN(step)) {
throw new QrynBadRequest('No step provided')
}
const aggregation = _req.getAggregation && _req.getAggregation()
const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`))
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
typeIdSelector,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(new Sql.Raw(
`has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`),
1)
)
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
let tagsReq = 'arraySort(p.tags)'
if (groupBy) {
tagsReq = `arraySort(arrayFilter(x -> x.1 in (${groupBy.map(g => Sql.quoteVal(g)).join(',')}), p.tags))`
}
const labelsReq = (new Sql.Select()).with(withIdxReq).select(
'fingerprint',
[new Sql.Raw(tagsReq), 'tags'],
[groupBy ? new Sql.Raw('cityHash64(tags)') : 'fingerprint', 'new_fingerprint']
).distinct(true).from([`${DATABASE_NAME()}.profiles_series`, 'p'])
.where(Sql.And(
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
typeIdSelector,
serviceNameSelector
))
const withLabelsReq = new Sql.With('labels', labelsReq, !!clusterName)
let valueCol = new Sql.Raw(
`sum(toFloat64(arrayFirst(x -> x.1 == ${Sql.quoteVal(sampleTypeId)}, p.values_agg).2))`)
if (aggregation === types.TimeSeriesAggregationType.TIME_SERIES_AGGREGATION_TYPE_AVERAGE) {
valueCol = new Sql.Raw(
`sum(toFloat64(arrayFirst(x -> x.1 == ${Sql.quoteVal(sampleTypeId)}).2, p.values_agg)) / ` +
`sum(toFloat64(arrayFirst(x -> x.1 == ${Sql.quoteVal(sampleTypeId)}).3, p.values_agg))`
)
}
const mainReq = (new Sql.Select()).with(withIdxReq, withLabelsReq).select(
[new Sql.Raw(`intDiv(p.timestamp_ns, 1000000000 * ${step}) * ${step} * 1000`), 'timestamp_ms'],
[new Sql.Raw('labels.new_fingerprint'), 'fingerprint'],
[new Sql.Raw('min(labels.tags)'), 'labels'],
[valueCol, 'value']
).from([`${DATABASE_NAME()}.profiles${dist}`, 'p']).join(
[new Sql.WithReference(withLabelsReq), 'labels'],
'ANY LEFT',
Sql.Eq(new Sql.Raw('p.fingerprint'), new Sql.Raw('labels.fingerprint'))
).where(
Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)),
typeIdSelector,
serviceNameSelector
)
).groupBy('timestamp_ns', 'fingerprint')
.orderBy(['fingerprint', 'ASC'], ['timestamp_ns', 'ASC'])
const strMainReq = mainReq.toString()
const chRes = await clickhouse
.rawRequest(strMainReq + ' FORMAT JSON', null, DATABASE_NAME())
let lastFingerprint = null
const seriesList = []
let lastSeries = null
let lastPoints = []
for (let i = 0; i < chRes.data.data.length; i++) {
const e = chRes.data.data[i]
if (lastFingerprint !== e.fingerprint) {
lastFingerprint = e.fingerprint
lastSeries && lastSeries.setPointsList(lastPoints)
lastSeries && seriesList.push(lastSeries)
lastPoints = []
lastSeries = new types.Series()
lastSeries.setLabelsList(e.labels.map(l => {
const lp = new types.LabelPair()
lp.setName(l[0])
lp.setValue(l[1])
return lp
}))
}
const p = new types.Point()
p.setValue(e.value)
p.setTimestamp(e.timestamp_ms)
lastPoints.push(p)
}
lastSeries && lastSeries.setPointsList(lastPoints)
lastSeries && seriesList.push(lastSeries)
const resp = new messages.SelectSeriesResponse()
resp.setSeriesList(seriesList)
console.log(`Queried ${seriesList.length} series`)
return resp
}
module.exports = {
selectSeriesImpl
}

View File

@ -171,6 +171,8 @@ const labelSelectorQuery = (query, labelSelector) => {
))
}
const HISTORY_TIMESPAN = 1000 * 60 * 60 * 24 * 7
module.exports = {
bufferize,
parser,
@ -179,5 +181,6 @@ module.exports = {
parseTypeId,
serviceNameSelectorQuery,
parseLabelSelector,
labelSelectorQuery
labelSelectorQuery,
HISTORY_TIMESPAN
}