From 1f948a8dfd5a47dae6a7bf922d817a830ec73c24 Mon Sep 17 00:00:00 2001
From: akvlad <akvlad90@gmail.com>
Date: Mon, 15 Jul 2024 15:54:46 +0300
Subject: [PATCH] otlp logs push init

---
 lib/handlers/otlp_log_push.js | 106 ++++++++++++++++++++
 lib/otlp.proto                | 177 ++++++++++++++++++++++++++++++++++
 parsers.js                    |  28 +++++-
 qryn_bun.mjs                  |   7 +-
 qryn_node.js                  |   9 +-
 5 files changed, 322 insertions(+), 5 deletions(-)
 create mode 100644 lib/handlers/otlp_log_push.js

diff --git a/lib/handlers/otlp_log_push.js b/lib/handlers/otlp_log_push.js
new file mode 100644
index 0000000..4334714
--- /dev/null
+++ b/lib/handlers/otlp_log_push.js
@@ -0,0 +1,106 @@
+const DATABASE = require('../db/clickhouse')
+const { asyncLogError, logType, metricType, bothType, readonly } = require('../../common')
+const UTILS = require('../utils')
+const stringify = UTILS.stringify
+const fingerPrint = UTILS.fingerPrint
+const { bulk_labels, bulk, labels } = DATABASE.cache
+
+async function handle (req, res) {
+  if (readonly) {
+    asyncLogError('Readonly! No push support.', req.log)
+    return res.code(500).send()
+  }
+  try {
+    const promises = []
+    const fingerprints = {}
+    for (const resourceLogsEntry of req.body.resourceLogs) {
+      const resAttrs = resource2Attrs(resourceLogsEntry.resource)
+      for (const scopeLogsEntry of resourceLogsEntry.scopeLogs) {
+        const scopeAttrs = {
+          ...resAttrs,
+          ...resource2Attrs(scopeLogsEntry.scope)
+        }
+        for (const logRecord of scopeLogsEntry.logRecords) {
+          const logAttrs = {
+            ...scopeAttrs,
+            ...resource2Attrs(logRecord)
+          }
+          if (logRecord.severityText) {
+            logAttrs.level = logRecord.severityText
+          }
+          const labels = stringify(logAttrs)
+          const fingerprint = fingerPrint(labels)
+          const ts = BigInt(logRecord.timeUnixNano)
+          promises.push(bulk.add([[
+            fingerprint,
+            ts,
+            null,
+            anyValueToString(logRecord.body),
+            logType
+          ]]))
+          const date = new Date(Number(ts / BigInt(1000000))).toISOString().split('T')[0]
+          !fingerprints[fingerprint] && promises.push(bulk_labels.add([[
+            date,
+            fingerprint,
+            labels,
+            labels.name || '',
+            logType
+          ]]))
+          fingerprints[fingerprint] = true
+        }
+      }
+    }
+    await Promise.all(promises)
+  } catch (error) {
+    await asyncLogError(error)
+    res.status(500).send({ error: 'Internal Server Error' })
+  }
+}
+
+function resource2Attrs (resource) {
+  if (!resource || !resource.attributes) {
+    return {}
+  }
+  const attrs = {}
+  for (const attribute of resource.attributes) {
+    attrs[normalizeAttrName(attribute.key)] = anyValueToString(attribute.value)
+  }
+  return attrs
+}
+
+function normalizeAttrName (name) {
+  return name.replaceAll(/[^a-zA-Z0-9_]/g, '_')
+}
+
+function anyValueToString (value) {
+  if (!value) {
+    return ''
+  }
+  if (value.stringValue) {
+    return value.stringValue
+  }
+  if (value.boolValue) {
+    return value.boolValue ? 'true' : 'false'
+  }
+  if (value.intValue) {
+    return value.intValue.toString()
+  }
+  if (value.doubleValue) {
+    return value.doubleValue.toString()
+  }
+  if (value.bytesValue) {
+    return Buffer.from(value.bytesValue).toString('base64')
+  }
+  if (value.arrayValue) {
+    return JSON.stringify(value.arrayValue.values.map(anyValueToString))
+  }
+  if (value.kvlistValue) {
+    return JSON.stringify(value.kvlistValue.values.reduce((agg, pair) => ({
+      ...agg,
+      [pair.key]: anyValueToString(pair.value)
+    })))
+  }
+  return ''
+}
+
+module.exports = handle
diff --git a/lib/otlp.proto b/lib/otlp.proto
index 81a15fd..cee0cf0 100644
--- a/lib/otlp.proto
+++ b/lib/otlp.proto
@@ -340,3 +340,180 @@ message Status {
   // The status code.
   StatusCode code = 3;
 }
+
+// Recource logs definition
+
+message LogsData {
+  // An array of ResourceLogs.
+  // For data coming from a single resource this array will typically contain
+  // one element. Intermediary nodes that receive data from multiple origins
+  // typically batch the data before forwarding further and in that case this
+  // array will contain multiple elements.
+  repeated ResourceLogs resource_logs = 1;
+}
+
+// A collection of ScopeLogs from a Resource.
+message ResourceLogs {
+  reserved 1000;
+
+  // The resource for the logs in this message.
+  // If this field is not set then resource info is unknown.
+  Resource resource = 1;
+
+  // A list of ScopeLogs that originate from a resource.
+  repeated ScopeLogs scope_logs = 2;
+
+  // The Schema URL, if known. This is the identifier of the Schema that the resource data
+  // is recorded in. To learn more about Schema URL see
+  // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
+  // This schema_url applies to the data in the "resource" field. It does not apply
+  // to the data in the "scope_logs" field which have their own schema_url field.
+  string schema_url = 3;
+}
+
+// A collection of Logs produced by a Scope.
+message ScopeLogs {
+  // The instrumentation scope information for the logs in this message.
+  // Semantically when InstrumentationScope isn't set, it is equivalent with
+  // an empty instrumentation scope name (unknown).
+  InstrumentationScope scope = 1;
+
+  // A list of log records.
+  repeated LogRecord log_records = 2;
+
+  // The Schema URL, if known. This is the identifier of the Schema that the log data
+  // is recorded in. To learn more about Schema URL see
+  // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
+  // This schema_url applies to all logs in the "logs" field.
+  string schema_url = 3;
+}
+
+// Possible values for LogRecord.SeverityNumber.
+enum SeverityNumber {
+  // UNSPECIFIED is the default SeverityNumber, it MUST NOT be used.
+  SEVERITY_NUMBER_UNSPECIFIED = 0;
+  SEVERITY_NUMBER_TRACE  = 1;
+  SEVERITY_NUMBER_TRACE2 = 2;
+  SEVERITY_NUMBER_TRACE3 = 3;
+  SEVERITY_NUMBER_TRACE4 = 4;
+  SEVERITY_NUMBER_DEBUG  = 5;
+  SEVERITY_NUMBER_DEBUG2 = 6;
+  SEVERITY_NUMBER_DEBUG3 = 7;
+  SEVERITY_NUMBER_DEBUG4 = 8;
+  SEVERITY_NUMBER_INFO   = 9;
+  SEVERITY_NUMBER_INFO2  = 10;
+  SEVERITY_NUMBER_INFO3  = 11;
+  SEVERITY_NUMBER_INFO4  = 12;
+  SEVERITY_NUMBER_WARN   = 13;
+  SEVERITY_NUMBER_WARN2  = 14;
+  SEVERITY_NUMBER_WARN3  = 15;
+  SEVERITY_NUMBER_WARN4  = 16;
+  SEVERITY_NUMBER_ERROR  = 17;
+  SEVERITY_NUMBER_ERROR2 = 18;
+  SEVERITY_NUMBER_ERROR3 = 19;
+  SEVERITY_NUMBER_ERROR4 = 20;
+  SEVERITY_NUMBER_FATAL  = 21;
+  SEVERITY_NUMBER_FATAL2 = 22;
+  SEVERITY_NUMBER_FATAL3 = 23;
+  SEVERITY_NUMBER_FATAL4 = 24;
+}
+
+// LogRecordFlags represents constants used to interpret the
+// LogRecord.flags field, which is protobuf 'fixed32' type and is to
+// be used as bit-fields. Each non-zero value defined in this enum is
+// a bit-mask.  To extract the bit-field, for example, use an
+// expression like:
+//
+//   (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK)
+//
+enum LogRecordFlags {
+  // The zero value for the enum. Should not be used for comparisons.
+  // Instead use bitwise "and" with the appropriate mask as shown above.
+  LOG_RECORD_FLAGS_DO_NOT_USE = 0;
+
+  // Bits 0-7 are used for trace flags.
+  LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF;
+
+  // Bits 8-31 are reserved for future use.
+}
+
+// A log record according to OpenTelemetry Log Data Model:
+// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md
+message LogRecord {
+  reserved 4;
+
+  // time_unix_nano is the time when the event occurred.
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
+  // Value of 0 indicates unknown or missing timestamp.
+  fixed64 time_unix_nano = 1;
+
+  // Time when the event was observed by the collection system.
+  // For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK)
+  // this timestamp is typically set at the generation time and is equal to Timestamp.
+  // For events originating externally and collected by OpenTelemetry (e.g. using
+  // Collector) this is the time when OpenTelemetry's code observed the event measured
+  // by the clock of the OpenTelemetry code. This field MUST be set once the event is
+  // observed by OpenTelemetry.
+  //
+  // For converting OpenTelemetry log data to formats that support only one timestamp or
+  // when receiving OpenTelemetry log data by recipients that support only one timestamp
+  // internally the following logic is recommended:
+  //   - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano.
+  //
+  // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
+  // Value of 0 indicates unknown or missing timestamp.
+  fixed64 observed_time_unix_nano = 11;
+
+  // Numerical value of the severity, normalized to values described in Log Data Model.
+  // [Optional].
+  SeverityNumber severity_number = 2;
+
+  // The severity text (also known as log level). The original string representation as
+  // it is known at the source. [Optional].
+  string severity_text = 3;
+
+  // A value containing the body of the log record. Can be for example a human-readable
+  // string message (including multi-line) describing the event in a free form or it can
+  // be a structured data composed of arrays and maps of other values. [Optional].
+  AnyValue body = 5;
+
+  // Additional attributes that describe the specific event occurrence. [Optional].
+  // Attribute keys MUST be unique (it is not allowed to have more than one
+  // attribute with the same key).
+  repeated KeyValue attributes = 6;
+  uint32 dropped_attributes_count = 7;
+
+  // Flags, a bit field. 8 least significant bits are the trace flags as
+  // defined in W3C Trace Context specification. 24 most significant bits are reserved
+  // and must be set to 0. Readers must not assume that 24 most significant bits
+  // will be zero and must correctly mask the bits when reading 8-bit trace flag (use
+  // flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional].
+  fixed32 flags = 8;
+
+  // A unique identifier for a trace. All logs from the same trace share
+  // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR
+  // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON
+  // is zero-length and thus is also invalid).
+  //
+  // This field is optional.
+  //
+  // The receivers SHOULD assume that the log record is not associated with a
+  // trace if any of the following is true:
+  //   - the field is not present,
+  //   - the field contains an invalid value.
+  bytes trace_id = 9;
+
+  // A unique identifier for a span within a trace, assigned when the span
+  // is created. The ID is an 8-byte array. An ID with all zeroes OR of length
+  // other than 8 bytes is considered invalid (empty string in OTLP/JSON
+  // is zero-length and thus is also invalid).
+  //
+  // This field is optional. If the sender specifies a valid span_id then it SHOULD also
+  // specify a valid trace_id.
+  //
+  // The receivers SHOULD assume that the log record is not associated with a
+  // span if any of the following is true:
+  //   - the field is not present,
+  //   - the field contains an invalid value.
+  bytes span_id = 10;
+}
diff --git a/parsers.js b/parsers.js
index 6cd4da0..64caf0c 100644
--- a/parsers.js
+++ b/parsers.js
@@ -13,7 +13,9 @@ const path = require('path')
 const WriteRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'prompb.proto')).lookupType('WriteRequest')
 const PushRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'loki.proto')).lookupType('PushRequest')
 const OTLPTraceData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('TracesData')
+const OTLPLogsData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('LogsData')
 const { parse: queryParser } = require('fast-querystring')
+const fs = require('fs')
 
 /**
  *
@@ -202,6 +204,29 @@ function tempoNDJsonParser (req, payload) {
   return parser
 }
 
+/**
+ *
+ * @param req {FastifyRequest}
+ * @param payload {Stream}
+ * @returns {*}
+ */
+async function otlpLogsDataParser (req, payload) {
+  const length = getContentLength(req, 5e6)
+  await shaper.register(length)
+  let body = []
+  const otelStream = stream.Readable.from(payload)
+  otelStream.on('data', data => {
+    body.push(data)
+  })
+  await new Promise(resolve => otelStream.once('end', resolve))
+  body = Buffer.concat(body)
+  body = OTLPLogsData.toObject(OTLPLogsData.decode(body), {
+    longs: String,
+    bytes: String
+  })
+  return body
+}
+
 /**
  *
  * @param subparsers {function(FastifyRequest): Promise<*|undefined>}
@@ -363,5 +388,6 @@ module.exports = {
   tempoNDJsonParser,
   otlpPushProtoParser,
   wwwFormParser,
-  parsers
+  parsers,
+  otlpLogsDataParser
 }
diff --git a/qryn_bun.mjs b/qryn_bun.mjs
index 992ed4e..b972e45 100644
--- a/qryn_bun.mjs
+++ b/qryn_bun.mjs
@@ -12,7 +12,7 @@ import {
   combinedParser,
   jsonParser,
   lokiPushJSONParser,
-  lokiPushProtoParser, otlpPushProtoParser, prometheusPushProtoParser,
+  lokiPushProtoParser, otlpLogsDataParser, otlpPushProtoParser, prometheusPushProtoParser,
   rawStringParser,
   tempoPushNDJSONParser,
   tempoPushParser, wwwFormParser, yamlParser
@@ -56,6 +56,7 @@ import handlerPromGetRules from './lib/handlers/alerts/prom_get_rules.js'
 import handlerTail from './lib/handlers/tail.js'
 import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js'
 import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js'
+import handlerOtlpLogsPush from './lib/handlers/otlp_log_push.js'
 import {init as pyroscopeInit } from './pyroscope/pyroscope.js'
 
 import { boolEnv, readonly, readerMode, writerMode } from './common.js'
@@ -332,6 +333,10 @@ export default async() => {
 
   readerMode && pyroscopeInit(fastify)
 
+  writerMode && fastify.post('/v1/logs', handlerOtlpLogsPush, {
+    '*': otlpLogsDataParser
+  })
+
   const serveView = fs.existsSync(path.join(__dirname, 'view/index.html'))
   if (serveView) {
     app.plug(group(path.join(__dirname, 'view')));
diff --git a/qryn_node.js b/qryn_node.js
index 1b863b8..a42c156 100755
--- a/qryn_node.js
+++ b/qryn_node.js
@@ -59,13 +59,11 @@ const {
   shaper,
   parsers,
   lokiPushJSONParser, lokiPushProtoParser, jsonParser, rawStringParser, tempoPushParser, tempoPushNDJSONParser,
-  yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser
+  yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser, otlpLogsDataParser
 } = require('./parsers')
 
 const fastifyPlugin = require('fastify-plugin')
 
-
-
 let fastify = require('fastify')({
   logger,
   bodyLimit: parseInt(process.env.FASTIFY_BODYLIMIT) || 5242880,
@@ -460,6 +458,11 @@ let fastify = require('fastify')({
 
   readerMode && require('./pyroscope/pyroscope').init(fastify)
 
+  const handleOTLPLogs = require('./lib/handlers/otlp_log_push').bind(this)
+  writerMode && fastify.post('/v1/logs', handleOTLPLogs, {
+    '*': otlpLogsDataParser
+  })
+
   // Run API Service
   fastify.listen(
     {