fix testing; fix teardown; overall traffic shaping

This commit is contained in:
akvlad
2022-02-01 09:26:47 +02:00
parent 2e77621711
commit 2cfd1ff6ce
4 changed files with 109 additions and 103 deletions

183
cloki.js
View File

@ -16,6 +16,7 @@ require('./plugins/engine')
const DATABASE = require('./lib/db/clickhouse')
const UTILS = require('./lib/utils')
const { EventEmitter } = require('events')
/* ProtoBuf Helpers */
const fs = require('fs')
@ -49,8 +50,83 @@ this.instantQueryScan = DATABASE.instantQueryScan
this.tempoQueryScan = DATABASE.tempoQueryScan
this.scanMetricFingerprints = DATABASE.scanMetricFingerprints
this.tempoQueryScan = DATABASE.tempoQueryScan
this.scanClickhouse = DATABASE.scanClickhouse;
let profiler = null;
this.scanClickhouse = DATABASE.scanClickhouse
let profiler = null
const shaper = {
onParse: 0,
onParsed: new EventEmitter(),
shapeInterval: setInterval(() => {
shaper.onParse = 0
shaper.onParsed.emit('parsed')
}, 1000),
/**
*
* @param size {number}
* @returns {Promise<void>}
*/
register: async (size) => {
while (shaper.onParse + size > 5e10) {
await new Promise(resolve => { shaper.onParsed.once('parsed') })
}
shaper.onParse += size
},
stop: () => {
shaper.shapeInterval && clearInterval(shaper.shapeInterval)
shaper.shapeInterval = null
}
}
/**
*
* @param req {FastifyRequest}
* @param limit {number}
* @returns {number}
*/
function getContentLength (req, limit) {
if (!req.headers['content-length'] || isNaN(parseInt(req.headers['content-length']))) {
throw new CLokiError(400, 'Content-Length is required')
}
const res = parseInt(req.headers['content-length'])
if (limit && res > limit) {
throw new CLokiError(400, 'Request is too big')
}
return res
}
/**
*
* @param req {FastifyRequest}
* @returns {Promise<string>}
*/
async function getContentBody (req) {
let body = ''
req.raw.on('data', data => {
body += data.toString()
})
await new Promise(resolve => req.raw.once('end', resolve))
return body
}
/**
*
* @param req {FastifyRequest}
* @returns {Promise<void>}
*/
async function genericJSONParser (req) {
try {
const length = getContentLength(req, 1e9)
if (req.routerPath === '/loki/api/v1/push' && length > 5e6) {
return
}
await shaper.register(length)
return JSON.parse(await getContentBody(req))
} catch (err) {
err.statusCode = 400
throw err
}
}
(async () => {
if (!this.readonly) {
await init(process.env.CLICKHOUSE_DB || 'cloki')
@ -69,8 +145,6 @@ let profiler = null;
process.exit(1)
})
/* Fastify Helper */
const fastify = require('fastify')({
logger: false,
@ -104,77 +178,27 @@ if (this.http_user && this.http_password) {
})
}
let onParse = 0
const { EventEmitter } = require('events')
const { database } = require('./lib/db/clickhouse')
const { fingerPrint } = require('./lib/utils')
const onParsed = new EventEmitter()
setInterval(() => {
onParse = 0
onParsed.emit('parsed')
}, 1000)
fastify.addContentTypeParser('application/json',
fastify.addContentTypeParser('application/yaml', {},
async function (req, body, done) {
try {
if (!req.headers['content-length'] || isNaN(parseInt(req.headers['content-length']))) {
throw new CLokiError(400, 'Content-Length is required')
}
const length = parseInt(req.headers['content-length'])
if (length > 1e9) {
throw new CLokiError(400, 'Request is too big')
}
if (req.routerPath === '/loki/api/v1/push' && length > 5e6) {
return
}
while (onParse >= 50000000) {
await new Promise(f => onParsed.once('parsed', f))
}
onParse += length
let body = ''
req.raw.on('data', data => {
body += data.toString()
})
await new Promise(f => req.raw.once('end', f))
const json = JSON.parse(body)
const length = getContentLength(req, 5e6)
await shaper.register(length)
const json = yaml.parse(await getContentBody(req))
return json
// done(null, json)
} catch (err) {
err.statusCode = 400
// done(err, undefined)
throw err
}
})
fastify.addContentTypeParser('text/plain', {
parseAs: 'string'
}, function (req, body, done) {
try {
const json = JSON.parse(body)
done(null, json)
} catch (err) {
err.statusCode = 400
done(err, undefined)
}
})
fastify.addContentTypeParser('application/yaml', {
parseAs: 'string'
}, function (req, body, done) {
try {
const json = yaml.parse(body)
done(null, json)
} catch (err) {
err.statusCode = 400
done(err, undefined)
}
})
})
try {
const snappy = require('snappyjs')
/* Protobuf Handler */
fastify.addContentTypeParser('application/x-protobuf', { parseAs: 'buffer' },
async function (req, body, done) {
fastify.addContentTypeParser('application/x-protobuf', {},
async function (req) {
const length = getContentLength(req, 5e6)
await shaper.register(length)
const body = await getContentBody(req)
// Prometheus Protobuf Write Handler
if (req.url === '/api/v1/prom/remote/write') {
let _data = await snappy.uncompress(body)
@ -213,31 +237,10 @@ try {
}
/* Null content-type handler for CH-MV HTTP PUSH */
fastify.addContentTypeParser('*', async function (req, body, done) {
try {
if (!req.headers['content-length'] || isNaN(parseInt(req.headers['content-length']))) {
throw new CLokiError(400, 'Content-Length is required')
}
const length = parseInt(req.headers['content-length'])
if (length > 1e9) {
throw new CLokiError(400, 'Request is too big')
}
if (req.routerPath === '/loki/api/v1/push' && length > 5e6) {
return
}
let _body = ''
req.raw.on('data', data => { _body += data.toString() })
await new Promise((resolve, reject) => {
req.raw.once('end', resolve)
req.raw.once('error', reject)
})
const json = JSON.parse(_body)
return json
} catch (err) {
err.statusCode = 400
throw err
}
})
fastify.addContentTypeParser('*', {},
async function (req, body, done) {
return await genericJSONParser(req)
})
/* 404 Handler */
const handler404 = require('./lib/handlers/404.js').bind(this)
@ -326,6 +329,8 @@ fastify.listen(
)
module.exports.stop = () => {
shaper.stop()
profiler && clearInterval(profiler)
fastify.close()
DATABASE.stop()
require('./parser/transpiler').stop()

View File

@ -17,13 +17,14 @@ const clickhouseOptions = {
readonly: !!process.env.READONLY,
queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' }
}
const CORS = process.env.CORS_ALLOW_ORIGIN || '*';
const CORS = process.env.CORS_ALLOW_ORIGIN || '*'
const transpiler = require('../../parser/transpiler')
const rotationLabels = process.env.LABELS_DAYS || 7
const rotationSamples = process.env.SAMPLES_DAYS || 7
const axios = require('axios')
const { samplesTableName, samplesReadTableName } = UTILS
const path = require('path')
const protocol = process.env.CLICKHOUSE_PROTO || 'http'
@ -46,7 +47,7 @@ let throttler = null
const resolvers = {}
const rejectors = {}
if (isMainThread) {
throttler = new Worker(__dirname + '/throttler.js')
throttler = new Worker(path.join(__dirname, '/throttler.js'))
throttler.on('message', (msg) => {
switch (msg.status) {
case 'ok':
@ -330,10 +331,10 @@ const instantQueryScan = async function (query, res) {
}
const tempoQueryScan = async function (query, res, traceId) {
if (debug) console.log('Scanning Tempo Fingerprints...',traceId)
if (debug) console.log('Scanning Tempo Fingerprints...', traceId)
const time = parseMs(query.time, Date.now())
/* Tempo does not seem to pass start/stop parameters. Use ENV or default 24h */
var hours = this.tempo_span || 24;
var hours = this.tempo_span || 24
if (!query.start) query.start = (time - (hours * 60 * 60 * 1000)) * 1000000
if (!query.end) query.end = Date.now() * 1000000
const _query = transpiler.transpile(query)
@ -346,7 +347,7 @@ const tempoQueryScan = async function (query, res, traceId) {
}
)
const dataStream = preprocessStream(_stream, _query.stream || [])
console.log('debug tempo', query);
console.log('debug tempo', query)
return await (outputTempoSpans(dataStream, res, traceId))
}
@ -574,7 +575,8 @@ const outputTempoSpans = async (dataStream, res, traceId) => {
let i = 0
let lastLabels = null
let lastStream = []
let response = '{"total": 0, "limit": 0, "offset": 0, "errors": null, "processes" : { "p1": {} }, "data": [ { "traceID": "'+traceId+'", '
let response = '{"total": 0, "limit": 0, "offset": 0, "errors": null, "processes" : { "p1": {} }, "data": [ { "traceID": "' +
traceId + '", '
response += '"spans":['
for await (const item of gen()) {
if (!item) {
@ -585,7 +587,7 @@ const outputTempoSpans = async (dataStream, res, traceId) => {
continue
}
response += (i ? ',' : '')
response += JSON.stringify(lastStream[0]);
response += JSON.stringify(lastStream[0])
/*
res.res.write(JSON.stringify({
traceID: lastLabels.traceId,
@ -605,7 +607,7 @@ const outputTempoSpans = async (dataStream, res, traceId) => {
}
if (lastLabels) {
response += (i ? ',' : '')
response += (JSON.stringify(lastStream[0]));
response += (JSON.stringify(lastStream[0]))
/*
res.res.write(JSON.stringify({
traceID: lastLabels.traceId,
@ -618,7 +620,7 @@ const outputTempoSpans = async (dataStream, res, traceId) => {
lastStream = ts ? [JSON.parse(item.string)] : []
}
response += (']}]}')
return response;
return response
}
/**
@ -977,9 +979,7 @@ module.exports.preprocessStream = preprocessStream
module.exports.capabilities = capabilities
module.exports.ping = ping
module.exports.stop = () => {
/*samplesThrottler.stop()
timeSeriesThrottler.stop()*/
throttler.postMessage({ type: 'stop' })
throttler.postMessage({ type: 'end' })
}
module.exports.ready = () => state === 'READY'
module.exports.scanSeries = getSeries

View File

@ -83,6 +83,7 @@ if (isMainThread) {
case 'end':
samplesThrottler.stop()
timeSeriesThrottler.stop()
parentPort.removeAllListeners('message')
break
case 'values':
samplesThrottler.queue.push(message.data)

View File

@ -86,7 +86,7 @@ function processStream (stream, labels, bulkLabels, bulkValues, toJSON, fingerPr
async function handler (req, res) {
const self = this
console.log('POST /loki/api/v1/push')
if (this.debug) console.log('POST /loki/api/v1/push')
if (this.debug) console.log('QUERY: ', req.query)
if (this.debug) console.log('BODY: ', req.body)
if (!req.body) {
@ -172,7 +172,7 @@ class ConditilnalStreamValues extends StreamValues {
}
_transform (chunk, encoding, callback) {
this.__transform(chunk, encoding, callback)
this.__transform(chunk, encoding, callback)
}
pass (chunk, encoding) {