asynchronous error log due to axios limitations for issue#255

This commit is contained in:
akvlad
2022-11-21 17:30:20 +02:00
parent 6c9e975bc7
commit 272771005c
25 changed files with 106 additions and 54 deletions

View File

@ -79,6 +79,24 @@ module.exports.durationToNs = (durationStr) => {
throw new Error('Unsupported duration')
}
module.exports.asyncLogError = async (err, logger) => {
try {
const resp = err.response || err.err.response
if (resp) {
if (typeof resp.data === 'object') {
err.responseData = ''
err.response.data.on('data', data => { err.responseData += data })
await new Promise((resolve) => err.response.data.once('end', resolve))
} else {
err.responseData = err.response.data
}
logger.error(err)
}
} catch (e) {
logger.error(err)
}
}
module.exports.LineFmtOption = () => process.env.LINE_FMT || 'handlebars'
module.exports.errors = require('./lib/handlers/errors')

View File

@ -72,7 +72,7 @@ class AlertWatcher {
this.health = 'ok'
this.lastError = ''
} catch (err) {
logger.error({ err })
logger.error(err)
this.health = 'error'
this.lastError = err.message
}

View File

@ -195,7 +195,7 @@ const reloadFingerprints = function () {
rows.push(row)
})
stream.on('error', function (err) {
logger.error({ err }, 'Error reloading fingerprints')
logger.error(err, 'Error reloading fingerprints')
})
stream.on('end', function () {
rows.forEach(function (row) {
@ -207,7 +207,7 @@ const reloadFingerprints = function () {
labels.add('_LABELS_', key)
labels.add(key, JSONLabels[key])
}
} catch (err) { logger.error({ err }, 'error reloading fingerprints') }
} catch (err) { logger.error(err, 'error reloading fingerprints') }
})
})
}
@ -1010,7 +1010,7 @@ const scanMetricFingerprints = function (settings, client, params) {
})
resp.data.result.push(metrics)
})
} catch (err) { logger.error({ err }, 'Error scanning fingerprints') }
} catch (err) { logger.error(err, 'Error scanning fingerprints') }
}
logger.debug({ resp }, 'QRYN RESPONSE')
client.send(resp)
@ -1098,7 +1098,7 @@ const scanClickhouse = function (settings, client, params) {
})
stream.on('error', function (err) {
// TODO: handler error
logger.error({ err }, 'error scanning clickhouse')
logger.error(err, 'error scanning clickhouse')
resp.status = "error"
resp.data.result = []
client.send(resp)
@ -1126,7 +1126,7 @@ const scanClickhouse = function (settings, client, params) {
})
resp.data.result.push(metrics)
})
} catch (err) { logger.error({ err }, 'error scanning clickhouse') }
} catch (err) { logger.error(err, 'error scanning clickhouse') }
}
logger.debug({ resp }, 'QRYN RESPONSE')
client.send(resp)

View File

@ -3,6 +3,23 @@ const axios = require('axios')
const { getClickhouseUrl, samplesTableName } = require('./clickhouse')
const clickhouseOptions = require('./clickhouse').databaseOptions
const logger = require('../logger')
const axiosError = async (err) => {
try {
const resp = err.response
if (resp) {
if (typeof resp.data === 'object') {
err.responseData = ''
err.response.data.on('data', data => { err.responseData += data })
await new Promise((resolve) => err.response.data.once('end', resolve))
}
return err
}
} catch (e) {
return err
}
}
class TimeoutThrottler {
constructor (statement) {
this.statement = statement
@ -16,7 +33,7 @@ class TimeoutThrottler {
await this._flush()
this.resolvers.forEach(r => r())
} catch (err) {
logger.error(err, 'AXIOS ERROR')
logger.error(await axiosError(err), 'AXIOS ERROR')
this.rejects.forEach(r => r(err))
}
this.resolvers = []
@ -69,7 +86,7 @@ if (isMainThread) {
await samplesThrottler.flush()
await tracesThottler.flush()
} catch (err) {
logger.error({ err }, 'AXIOS ERROR')
logger.error(await axiosError(err), 'AXIOS ERROR')
}
const p = Date.now() - ts
if (p < 100) {

View File

@ -6,6 +6,7 @@ const { queryFingerprintsScan, createLiveView, watchLiveView } = require('./clic
const { capabilities, samplesTableName } = require('./clickhouse')
const logger = require('../logger')
const compiler = require('../../parser/bnf')
const { asyncLogError } = require('../../common')
/**
*
* @type {Object<string, { w: Watcher, c: number }>}
@ -63,7 +64,7 @@ class Watcher extends EventEmitter {
await promise
}
} catch (err) {
logger.error({ err })
asyncLogError(err, logger)
throw err
}
}

View File

@ -22,7 +22,7 @@ async function handler (req, res) {
}))
)
} catch (err) {
logger.error({ err }, 'SEND ERROR')
logger.error(err, 'SEND ERROR')
throw err
}
res.send('ok')

View File

@ -6,18 +6,19 @@
*/
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
function handler (req, res) {
const self = this
req.log.debug('ELASTIC Bulk Request')
if (!req.body) {
req.log.error('No Request Body or Target!'+ req.body)
asyncLogError('No Request Body or Target!' + req.body, req.log)
res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
return
}
if (this.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
return
}
@ -41,7 +42,7 @@ function handler (req, res) {
streams.forEach(function (stream) {
try {
stream = JSON.parse(stream)
} catch (err) { req.log.error({ err }); return };
} catch (err) { asyncLogError(err, req.log); return };
// Allow Index, Create. Discard Delete, Update.
if (stream.delete||stream.update) { last_tags = false; return; }
@ -60,7 +61,7 @@ function handler (req, res) {
if (doc_target) JSONLabels._index = doc_target
JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort())
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
return
}
// Calculate Fingerprint
@ -80,7 +81,7 @@ function handler (req, res) {
self.labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err }, 'failed ingesting elastic doc')
asyncLogError(err, req.log)
}
// Store Elastic Doc Object

View File

@ -9,18 +9,19 @@
*/
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
function handler (req, res) {
const self = this
req.log.debug('ELASTIC Index Request')
if (!req.body || !req.params.target) {
req.log.error('No Request Body or Target!')
asyncLogError('No Request Body or Target!', req.log)
res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
return
}
if (this.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
return
}
@ -52,7 +53,7 @@ function handler (req, res) {
if (doc_id) JSONLabels._id = doc_id
JSONLabels = Object.fromEntries(Object.entries(JSONLabels).sort())
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
return
}
// Calculate Fingerprint
@ -71,12 +72,12 @@ function handler (req, res) {
self.labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err }, 'failed ingesting elastic doc')
asyncLogError(err, req.log)
}
try {
stream = JSON.parse(stream);
} catch (err) { req.log.error({ err }); return };
} catch (err) { asyncLogError(err, req.log); return };
// Store Elastic Doc Object
const values = [
finger,

View File

@ -1,3 +1,5 @@
const { asyncLogError } = require('../../common')
class QrynError extends Error {
constructor (code, name, message) {
super(message)
@ -31,7 +33,7 @@ const handler = (err, req, res) => {
// throw err
}*/
req.log.error({ err })
asyncLogError(err, req.log)
res.status(500).send({
statusCode: 500,
error: 'Internal Server Error',

View File

@ -38,16 +38,16 @@
const stringify = require('../utils').stringify
const lineToJSON = require('@qxip/influx-line-protocol-parser')
const { asyncLogError } = require('../../common')
function handler (req, res) {
const self = this
req.log.error('POST INFLUX /write')
if (!req.body && !req.body.metrics) {
req.log.error('No Request Body!')
asyncLogError('No Request Body!', req.log)
return
}
if (self.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.send(500)
return
}
@ -98,7 +98,7 @@ function handler (req, res) {
self.labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
}
const timestamp = stream.timestamp || JSONFields.timestamp
/* metrics */
@ -110,7 +110,7 @@ function handler (req, res) {
!timestamp &&
!value
) {
req.log.error({ key }, 'no bulkable data')
asyncLogError('no bulkable data', req.log)
return
}
const values = [

View File

@ -18,6 +18,7 @@
*/
const { Transform } = require('stream')
const { asyncLogError } = require('../../common')
function handleOne (req, streams, promises) {
const self = this
@ -31,12 +32,12 @@ function handleOne (req, streams, promises) {
async function handler (req, res) {
req.log.debug('POST /tempo/api/push')
if (!req.body) {
req.log.error('No Request Body!')
asyncLogError('No Request Body!', req.log)
res.code(500).send()
return
}
if (this.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.code(500).send()
return
}

View File

@ -11,18 +11,19 @@
}
*/
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
async function handler (req, res) {
const self = this
req.log.debug('POST /api/v1/prom/remote/write')
if (!req.body) {
req.log.error('No Request Body!')
asyncLogError('No Request Body!', req.log)
res.code(500).send()
return
}
if (this.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.code(500).send()
return
}
@ -42,7 +43,7 @@ async function handler (req, res) {
return sum
}, {})
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
return
}
// Calculate Fingerprint
@ -62,7 +63,7 @@ async function handler (req, res) {
self.labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
}
if (stream.samples) {
@ -73,7 +74,7 @@ async function handler (req, res) {
!entry.timestamp &&
!entry.value
) {
req.log.error({ entry }, 'no bulkable data')
asyncLogError({ entry }, req.log)
return
}
const values = [

View File

@ -1,6 +1,7 @@
/* Emulated PromQL Query Handler */
const { p2l } = require('@qxip/promql2logql');
const { asyncLogError } = require('../../common')
const empty = '{"status" : "success", "data" : {"resultType" : "scalar", "result" : []}}'; // to be removed
const test = () => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, "2"]}}`; // to be removed
@ -20,7 +21,7 @@ async function handler (req, res) {
try {
req.query.query = p2l(req.query.query);
} catch(e) {
req.log.error({ err })
asyncLogError({ e }, req.log)
res.send(empty)
}
/* scan fingerprints */
@ -31,7 +32,7 @@ async function handler (req, res) {
{ res: res.raw }
)
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
res.send(empty)
}
}

View File

@ -10,6 +10,7 @@
*/
const { p2l } = require('@qxip/promql2logql')
const { asyncLogError } = require('../../common')
async function handler (req, res) {
req.log.debug('GET /api/v1/query_range')
@ -37,7 +38,7 @@ async function handler (req, res) {
)
res.hijack()
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
res.send(resp)
}
}

View File

@ -21,6 +21,7 @@ const FilterBase = require('stream-json/filters/FilterBase')
const StreamValues = require('stream-json/streamers/StreamValues')
const logger = require('../logger')
const UTILS = require('../utils')
const { asyncLogError } = require('../../common')
const stringify = UTILS.stringify
function processStream (stream, labels, bulkLabels, bulk, toJSON, fingerPrint) {
@ -98,7 +99,7 @@ async function handler (req, res) {
return
}
if (this.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.code(500).send()
return
}

View File

@ -1,4 +1,6 @@
// Query Handler
const { asyncLogError } = require('../../common')
async function handler (req, res) {
req.log.debug('GET /loki/api/v1/query')
const resp = { streams: [] }
@ -16,7 +18,7 @@ async function handler (req, res) {
{ res: res.raw }
)
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
throw err
}
}

View File

@ -11,6 +11,7 @@
const { parseCliQL } = require('../cliql')
const { checkCustomPlugins } = require('./common')
const { asyncLogError } = require('../../common')
async function handler (req, res) {
req.log.debug('GET /loki/api/v1/query_range')
@ -43,7 +44,7 @@ async function handler (req, res) {
)
res.hijack()
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
throw err
}
}

View File

@ -20,6 +20,6 @@ module.exports = function handler (connection, res) {
w.destroy()
})
} catch (err) {
logger.error({ err })
logger.error(err)
}
}

View File

@ -8,16 +8,16 @@
*/
const { asyncLogError } = require('../../common')
const stringify = require('../utils').stringify
function handler (req, res) {
req.log.error('POST /telegraf')
if (!req.body && !req.body.metrics) {
req.log.error('No Request Body!')
asyncLogError('No Request Body!', req.log)
return
}
if (this.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.send(500)
return
}
@ -50,7 +50,7 @@ function handler (req, res) {
this.labels.add(key, JSONLabels[key])
}
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
}
if (stream.fields) {
Object.keys(stream.fields).forEach(function (entry) {
@ -60,7 +60,7 @@ function handler (req, res) {
!stream.timestamp &&
(!entry.value || !entry.line)
) {
req.log.error({ entry }, 'no bulkable data')
asyncLogError('no bulkable data', req.log)
return
}
const values = [

View File

@ -18,6 +18,7 @@
*/
const { Transform } = require('stream')
const { asyncLogError } = require('../../common')
function handleOne (req, streams, promises) {
const self = this
@ -31,12 +32,12 @@ function handleOne (req, streams, promises) {
async function handler (req, res) {
req.log.debug('POST /tempo/api/push')
if (!req.body) {
req.log.error('No Request Body!')
asyncLogError('No Request Body!', req.log)
res.code(500).send()
return
}
if (this.readonly) {
req.log.error('Readonly! No push support.')
asyncLogError('Readonly! No push support.', req.log)
res.code(500).send()
return
}

View File

@ -15,6 +15,7 @@
const logfmt = require('logfmt')
const common = require('../../common')
const { asyncLogError } = require('../../common')
async function handler (req, res) {
req.log.debug('GET /api/search')
@ -48,7 +49,7 @@ async function handler (req, res) {
traces: resp
})
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
res.send(resp)
}
}

View File

@ -1,10 +1,11 @@
const { asyncLogError } = require('../../common')
async function handler (req, res) {
try {
const resp = await this.queryTempoTags()
res.send(resp.map(e => e.key))
} catch (e) {
req.log.error(e)
asyncLogError(e, req.log)
res.code(500)
}
}

View File

@ -15,6 +15,7 @@ const TraceDataType = protoBuff.loadSync(__dirname + '/../opentelemetry/proto/tr
.lookupType('opentelemetry.proto.trace.v1.TracesData')
const {stringify} = require('logfmt')
const { flatOTLPAttrs, OTLPgetServiceNames } = require('../utils')
const { asyncLogError } = require('../../common')
function pad (pad, str, padLeft) {
if (typeof str === 'undefined') {
@ -115,7 +116,7 @@ async function handler (req, res) {
res.send(proto)
}
} catch (err) {
req.log.error({ err })
asyncLogError(err, req.log)
res.headers({ 'content-type': 'application/json' }).send(resp)
}
}

View File

@ -10,6 +10,8 @@
]
}
*/
const { asyncLogError } = require('../../common')
async function handler (req, res) {
req.log.debug(`GET /api/search/tag/${req.params.name}/values`)
if (!req.params.name) {
@ -19,7 +21,7 @@ async function handler (req, res) {
const vals = (await this.queryTempoValues(req.params.name)).map(e => e.val)
res.send({ tagValues: vals })
} catch (e) {
req.log.error(e)
asyncLogError(e, req.log)
res.code(500)
}
};

View File

@ -13,10 +13,8 @@ const logger = pino({
serializers: {
err: pino.stdSerializers.wrapErrorSerializer((err) => {
if (err.response) {
err.responseData = err.response.data
err.responseStatus = err.response.status
const strData = typeof err.response.data === 'object' ? JSON.stringify(err.response.data) : err.response.data
const res = new Error(`${err.message}\nResponse: [${err.response.status}] ${strData}`)
const res = new Error(`${err.message}\nResponse: [${err.response.status}] ${err.responseData}`)
res.stack = err.stack
return res.toString() + '\n' + res.stack.toString()
}