mirror of
https://github.com/metrico/qryn.git
synced 2025-03-14 10:07:18 +00:00
otlp logs push init
This commit is contained in:
106
lib/handlers/otlp_log_push.js
Normal file
106
lib/handlers/otlp_log_push.js
Normal file
@ -0,0 +1,106 @@
|
||||
const DATABASE = require('../db/clickhouse')
|
||||
const { asyncLogError, logType, metricType, bothType, readonly } = require('../../common')
|
||||
const UTILS = require('../utils')
|
||||
const stringify = UTILS.stringify
|
||||
const fingerPrint = UTILS.fingerPrint
|
||||
const { bulk_labels, bulk, labels } = DATABASE.cache
|
||||
|
||||
async function handle (req, res) {
|
||||
if (readonly) {
|
||||
asyncLogError('Readonly! No push support.', req.log)
|
||||
return res.code(500).send()
|
||||
}
|
||||
try {
|
||||
const promises = []
|
||||
const fingerprints = {}
|
||||
for (const resourceLogsEntry of req.body.resourceLogs) {
|
||||
const resAttrs = resource2Attrs(resourceLogsEntry.resource)
|
||||
for (const scopeLogsEntry of resourceLogsEntry.scopeLogs) {
|
||||
const scopeAttrs = {
|
||||
...resAttrs,
|
||||
...resource2Attrs(scopeLogsEntry.scope)
|
||||
}
|
||||
for (const logRecord of scopeLogsEntry.logRecords) {
|
||||
const logAttrs = {
|
||||
...scopeAttrs,
|
||||
...resource2Attrs(logRecord)
|
||||
}
|
||||
if (logRecord.severityText) {
|
||||
logAttrs.level = logRecord.severityText
|
||||
}
|
||||
const labels = stringify(logAttrs)
|
||||
const fingerprint = fingerPrint(labels)
|
||||
const ts = BigInt(logRecord.timeUnixNano)
|
||||
promises.push(bulk.add([[
|
||||
fingerprint,
|
||||
ts,
|
||||
null,
|
||||
anyValueToString(logRecord.body),
|
||||
logType
|
||||
]]))
|
||||
const date = new Date(Number(ts / BigInt(1000000))).toISOString().split('T')[0]
|
||||
!fingerprints[fingerprint] && promises.push(bulk_labels.add([[
|
||||
date,
|
||||
fingerprint,
|
||||
labels,
|
||||
labels.name || '',
|
||||
logType
|
||||
]]))
|
||||
fingerprints[fingerprint] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
await Promise.all(promises)
|
||||
} catch (error) {
|
||||
await asyncLogError(error)
|
||||
res.status(500).send({ error: 'Internal Server Error' })
|
||||
}
|
||||
}
|
||||
|
||||
function resource2Attrs (resource) {
|
||||
if (!resource || !resource.attributes) {
|
||||
return {}
|
||||
}
|
||||
const attrs = {}
|
||||
for (const attribute of resource.attributes) {
|
||||
attrs[normalizeAttrName(attribute.key)] = anyValueToString(attribute.value)
|
||||
}
|
||||
return attrs
|
||||
}
|
||||
|
||||
function normalizeAttrName (name) {
|
||||
return name.replaceAll(/[^a-zA-Z0-9_]/g, '_')
|
||||
}
|
||||
|
||||
function anyValueToString (value) {
|
||||
if (!value) {
|
||||
return ''
|
||||
}
|
||||
if (value.stringValue) {
|
||||
return value.stringValue
|
||||
}
|
||||
if (value.boolValue) {
|
||||
return value.boolValue ? 'true' : 'false'
|
||||
}
|
||||
if (value.intValue) {
|
||||
return value.intValue.toString()
|
||||
}
|
||||
if (value.doubleValue) {
|
||||
return value.doubleValue.toString()
|
||||
}
|
||||
if (value.bytesValue) {
|
||||
return Buffer.from(value.bytesValue).toString('base64')
|
||||
}
|
||||
if (value.arrayValue) {
|
||||
return JSON.stringify(value.arrayValue.values.map(anyValueToString))
|
||||
}
|
||||
if (value.kvlistValue) {
|
||||
return JSON.stringify(value.kvlistValue.values.reduce((agg, pair) => ({
|
||||
...agg,
|
||||
[pair.key]: anyValueToString(pair.value)
|
||||
})))
|
||||
}
|
||||
return ''
|
||||
}
|
||||
|
||||
module.exports = handle
|
177
lib/otlp.proto
177
lib/otlp.proto
@ -340,3 +340,180 @@ message Status {
|
||||
// The status code.
|
||||
StatusCode code = 3;
|
||||
}
|
||||
|
||||
// Recource logs definition
|
||||
|
||||
message LogsData {
|
||||
// An array of ResourceLogs.
|
||||
// For data coming from a single resource this array will typically contain
|
||||
// one element. Intermediary nodes that receive data from multiple origins
|
||||
// typically batch the data before forwarding further and in that case this
|
||||
// array will contain multiple elements.
|
||||
repeated ResourceLogs resource_logs = 1;
|
||||
}
|
||||
|
||||
// A collection of ScopeLogs from a Resource.
|
||||
message ResourceLogs {
|
||||
reserved 1000;
|
||||
|
||||
// The resource for the logs in this message.
|
||||
// If this field is not set then resource info is unknown.
|
||||
Resource resource = 1;
|
||||
|
||||
// A list of ScopeLogs that originate from a resource.
|
||||
repeated ScopeLogs scope_logs = 2;
|
||||
|
||||
// The Schema URL, if known. This is the identifier of the Schema that the resource data
|
||||
// is recorded in. To learn more about Schema URL see
|
||||
// https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
|
||||
// This schema_url applies to the data in the "resource" field. It does not apply
|
||||
// to the data in the "scope_logs" field which have their own schema_url field.
|
||||
string schema_url = 3;
|
||||
}
|
||||
|
||||
// A collection of Logs produced by a Scope.
|
||||
message ScopeLogs {
|
||||
// The instrumentation scope information for the logs in this message.
|
||||
// Semantically when InstrumentationScope isn't set, it is equivalent with
|
||||
// an empty instrumentation scope name (unknown).
|
||||
InstrumentationScope scope = 1;
|
||||
|
||||
// A list of log records.
|
||||
repeated LogRecord log_records = 2;
|
||||
|
||||
// The Schema URL, if known. This is the identifier of the Schema that the log data
|
||||
// is recorded in. To learn more about Schema URL see
|
||||
// https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
|
||||
// This schema_url applies to all logs in the "logs" field.
|
||||
string schema_url = 3;
|
||||
}
|
||||
|
||||
// Possible values for LogRecord.SeverityNumber.
|
||||
enum SeverityNumber {
|
||||
// UNSPECIFIED is the default SeverityNumber, it MUST NOT be used.
|
||||
SEVERITY_NUMBER_UNSPECIFIED = 0;
|
||||
SEVERITY_NUMBER_TRACE = 1;
|
||||
SEVERITY_NUMBER_TRACE2 = 2;
|
||||
SEVERITY_NUMBER_TRACE3 = 3;
|
||||
SEVERITY_NUMBER_TRACE4 = 4;
|
||||
SEVERITY_NUMBER_DEBUG = 5;
|
||||
SEVERITY_NUMBER_DEBUG2 = 6;
|
||||
SEVERITY_NUMBER_DEBUG3 = 7;
|
||||
SEVERITY_NUMBER_DEBUG4 = 8;
|
||||
SEVERITY_NUMBER_INFO = 9;
|
||||
SEVERITY_NUMBER_INFO2 = 10;
|
||||
SEVERITY_NUMBER_INFO3 = 11;
|
||||
SEVERITY_NUMBER_INFO4 = 12;
|
||||
SEVERITY_NUMBER_WARN = 13;
|
||||
SEVERITY_NUMBER_WARN2 = 14;
|
||||
SEVERITY_NUMBER_WARN3 = 15;
|
||||
SEVERITY_NUMBER_WARN4 = 16;
|
||||
SEVERITY_NUMBER_ERROR = 17;
|
||||
SEVERITY_NUMBER_ERROR2 = 18;
|
||||
SEVERITY_NUMBER_ERROR3 = 19;
|
||||
SEVERITY_NUMBER_ERROR4 = 20;
|
||||
SEVERITY_NUMBER_FATAL = 21;
|
||||
SEVERITY_NUMBER_FATAL2 = 22;
|
||||
SEVERITY_NUMBER_FATAL3 = 23;
|
||||
SEVERITY_NUMBER_FATAL4 = 24;
|
||||
}
|
||||
|
||||
// LogRecordFlags represents constants used to interpret the
|
||||
// LogRecord.flags field, which is protobuf 'fixed32' type and is to
|
||||
// be used as bit-fields. Each non-zero value defined in this enum is
|
||||
// a bit-mask. To extract the bit-field, for example, use an
|
||||
// expression like:
|
||||
//
|
||||
// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK)
|
||||
//
|
||||
enum LogRecordFlags {
|
||||
// The zero value for the enum. Should not be used for comparisons.
|
||||
// Instead use bitwise "and" with the appropriate mask as shown above.
|
||||
LOG_RECORD_FLAGS_DO_NOT_USE = 0;
|
||||
|
||||
// Bits 0-7 are used for trace flags.
|
||||
LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF;
|
||||
|
||||
// Bits 8-31 are reserved for future use.
|
||||
}
|
||||
|
||||
// A log record according to OpenTelemetry Log Data Model:
|
||||
// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md
|
||||
message LogRecord {
|
||||
reserved 4;
|
||||
|
||||
// time_unix_nano is the time when the event occurred.
|
||||
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
|
||||
// Value of 0 indicates unknown or missing timestamp.
|
||||
fixed64 time_unix_nano = 1;
|
||||
|
||||
// Time when the event was observed by the collection system.
|
||||
// For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK)
|
||||
// this timestamp is typically set at the generation time and is equal to Timestamp.
|
||||
// For events originating externally and collected by OpenTelemetry (e.g. using
|
||||
// Collector) this is the time when OpenTelemetry's code observed the event measured
|
||||
// by the clock of the OpenTelemetry code. This field MUST be set once the event is
|
||||
// observed by OpenTelemetry.
|
||||
//
|
||||
// For converting OpenTelemetry log data to formats that support only one timestamp or
|
||||
// when receiving OpenTelemetry log data by recipients that support only one timestamp
|
||||
// internally the following logic is recommended:
|
||||
// - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano.
|
||||
//
|
||||
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
|
||||
// Value of 0 indicates unknown or missing timestamp.
|
||||
fixed64 observed_time_unix_nano = 11;
|
||||
|
||||
// Numerical value of the severity, normalized to values described in Log Data Model.
|
||||
// [Optional].
|
||||
SeverityNumber severity_number = 2;
|
||||
|
||||
// The severity text (also known as log level). The original string representation as
|
||||
// it is known at the source. [Optional].
|
||||
string severity_text = 3;
|
||||
|
||||
// A value containing the body of the log record. Can be for example a human-readable
|
||||
// string message (including multi-line) describing the event in a free form or it can
|
||||
// be a structured data composed of arrays and maps of other values. [Optional].
|
||||
AnyValue body = 5;
|
||||
|
||||
// Additional attributes that describe the specific event occurrence. [Optional].
|
||||
// Attribute keys MUST be unique (it is not allowed to have more than one
|
||||
// attribute with the same key).
|
||||
repeated KeyValue attributes = 6;
|
||||
uint32 dropped_attributes_count = 7;
|
||||
|
||||
// Flags, a bit field. 8 least significant bits are the trace flags as
|
||||
// defined in W3C Trace Context specification. 24 most significant bits are reserved
|
||||
// and must be set to 0. Readers must not assume that 24 most significant bits
|
||||
// will be zero and must correctly mask the bits when reading 8-bit trace flag (use
|
||||
// flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional].
|
||||
fixed32 flags = 8;
|
||||
|
||||
// A unique identifier for a trace. All logs from the same trace share
|
||||
// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR
|
||||
// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON
|
||||
// is zero-length and thus is also invalid).
|
||||
//
|
||||
// This field is optional.
|
||||
//
|
||||
// The receivers SHOULD assume that the log record is not associated with a
|
||||
// trace if any of the following is true:
|
||||
// - the field is not present,
|
||||
// - the field contains an invalid value.
|
||||
bytes trace_id = 9;
|
||||
|
||||
// A unique identifier for a span within a trace, assigned when the span
|
||||
// is created. The ID is an 8-byte array. An ID with all zeroes OR of length
|
||||
// other than 8 bytes is considered invalid (empty string in OTLP/JSON
|
||||
// is zero-length and thus is also invalid).
|
||||
//
|
||||
// This field is optional. If the sender specifies a valid span_id then it SHOULD also
|
||||
// specify a valid trace_id.
|
||||
//
|
||||
// The receivers SHOULD assume that the log record is not associated with a
|
||||
// span if any of the following is true:
|
||||
// - the field is not present,
|
||||
// - the field contains an invalid value.
|
||||
bytes span_id = 10;
|
||||
}
|
||||
|
28
parsers.js
28
parsers.js
@ -13,7 +13,9 @@ const path = require('path')
|
||||
const WriteRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'prompb.proto')).lookupType('WriteRequest')
|
||||
const PushRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'loki.proto')).lookupType('PushRequest')
|
||||
const OTLPTraceData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('TracesData')
|
||||
const OTLPLogsData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('LogsData')
|
||||
const { parse: queryParser } = require('fast-querystring')
|
||||
const fs = require('fs')
|
||||
|
||||
/**
|
||||
*
|
||||
@ -202,6 +204,29 @@ function tempoNDJsonParser (req, payload) {
|
||||
return parser
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param req {FastifyRequest}
|
||||
* @param payload {Stream}
|
||||
* @returns {*}
|
||||
*/
|
||||
async function otlpLogsDataParser (req, payload) {
|
||||
const length = getContentLength(req, 5e6)
|
||||
await shaper.register(length)
|
||||
let body = []
|
||||
const otelStream = stream.Readable.from(payload)
|
||||
otelStream.on('data', data => {
|
||||
body.push(data)
|
||||
})
|
||||
await new Promise(resolve => otelStream.once('end', resolve))
|
||||
body = Buffer.concat(body)
|
||||
body = OTLPLogsData.toObject(OTLPLogsData.decode(body), {
|
||||
longs: String,
|
||||
bytes: String
|
||||
})
|
||||
return body
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param subparsers {function(FastifyRequest): Promise<*|undefined>}
|
||||
@ -363,5 +388,6 @@ module.exports = {
|
||||
tempoNDJsonParser,
|
||||
otlpPushProtoParser,
|
||||
wwwFormParser,
|
||||
parsers
|
||||
parsers,
|
||||
otlpLogsDataParser
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ import {
|
||||
combinedParser,
|
||||
jsonParser,
|
||||
lokiPushJSONParser,
|
||||
lokiPushProtoParser, otlpPushProtoParser, prometheusPushProtoParser,
|
||||
lokiPushProtoParser, otlpLogsDataParser, otlpPushProtoParser, prometheusPushProtoParser,
|
||||
rawStringParser,
|
||||
tempoPushNDJSONParser,
|
||||
tempoPushParser, wwwFormParser, yamlParser
|
||||
@ -56,6 +56,7 @@ import handlerPromGetRules from './lib/handlers/alerts/prom_get_rules.js'
|
||||
import handlerTail from './lib/handlers/tail.js'
|
||||
import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js'
|
||||
import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js'
|
||||
import handlerOtlpLogsPush from './lib/handlers/otlp_log_push.js'
|
||||
import {init as pyroscopeInit } from './pyroscope/pyroscope.js'
|
||||
|
||||
import { boolEnv, readonly, readerMode, writerMode } from './common.js'
|
||||
@ -332,6 +333,10 @@ export default async() => {
|
||||
|
||||
readerMode && pyroscopeInit(fastify)
|
||||
|
||||
writerMode && fastify.post('/v1/logs', handlerOtlpLogsPush, {
|
||||
'*': otlpLogsDataParser
|
||||
})
|
||||
|
||||
const serveView = fs.existsSync(path.join(__dirname, 'view/index.html'))
|
||||
if (serveView) {
|
||||
app.plug(group(path.join(__dirname, 'view')));
|
||||
|
@ -59,13 +59,11 @@ const {
|
||||
shaper,
|
||||
parsers,
|
||||
lokiPushJSONParser, lokiPushProtoParser, jsonParser, rawStringParser, tempoPushParser, tempoPushNDJSONParser,
|
||||
yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser
|
||||
yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser, otlpLogsDataParser
|
||||
} = require('./parsers')
|
||||
|
||||
const fastifyPlugin = require('fastify-plugin')
|
||||
|
||||
|
||||
|
||||
let fastify = require('fastify')({
|
||||
logger,
|
||||
bodyLimit: parseInt(process.env.FASTIFY_BODYLIMIT) || 5242880,
|
||||
@ -460,6 +458,11 @@ let fastify = require('fastify')({
|
||||
|
||||
readerMode && require('./pyroscope/pyroscope').init(fastify)
|
||||
|
||||
const handleOTLPLogs = require('./lib/handlers/otlp_log_push').bind(this)
|
||||
writerMode && fastify.post('/v1/logs', handleOTLPLogs, {
|
||||
'*': otlpLogsDataParser
|
||||
})
|
||||
|
||||
// Run API Service
|
||||
fastify.listen(
|
||||
{
|
||||
|
Reference in New Issue
Block a user