mirror of
https://github.com/metrico/qryn.git
synced 2025-03-14 10:07:18 +00:00
@ -31,6 +31,8 @@ const { getClickhouseUrl } = require('./clickhouse_options')
|
||||
|
||||
// External Storage Policy for Tables (S3, MINIO)
|
||||
const storagePolicy = process.env.STORAGE_POLICY || false
|
||||
// Clickhouse Distributed Engine setting to skip unavailable shards
|
||||
const skipUnavailableShards = process.env.SKIP_UNAVAILABLE_SHARDS || false
|
||||
|
||||
const { StringStream, DataStream } = require('scramjet')
|
||||
|
||||
@ -188,7 +190,7 @@ const initialize = async function (dbName) {
|
||||
}
|
||||
if (!isOmitTablesCreation()) {
|
||||
const maintain = require('./maintain/index')
|
||||
await maintain.upgrade({ name: dbName, storage_policy: storagePolicy })
|
||||
await maintain.upgrade({ name: dbName, storage_policy: storagePolicy, skip_unavailable_shards: skipUnavailableShards })
|
||||
await maintain.rotate([{
|
||||
db: dbName,
|
||||
samples_days: rotationSamples,
|
||||
|
@ -14,21 +14,21 @@ const getEnv = () => {
|
||||
|
||||
/**
|
||||
*
|
||||
* @param db {{name: string, storage_policy: string}}
|
||||
* @param db {{name: string, storage_policy: string, skip_unavailable_shards: boolean}}
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
module.exports.upgrade = async (db) => {
|
||||
await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy)
|
||||
await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy)
|
||||
await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy)
|
||||
await upgradeSingle(db.name, 1, scripts.overall, db.storage_policy, false)
|
||||
await upgradeSingle(db.name, 2, scripts.traces, db.storage_policy, false)
|
||||
await upgradeSingle(db.name, 5, scripts.profiles, db.storage_policy, false)
|
||||
if (db.storage_policy) {
|
||||
await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.name)
|
||||
await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.name)
|
||||
}
|
||||
if (clusterName) {
|
||||
await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy)
|
||||
await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy)
|
||||
await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy)
|
||||
await upgradeSingle(db.name, 3, scripts.overall_dist, db.storage_policy, db.skip_unavailable_shards)
|
||||
await upgradeSingle(db.name, 4, scripts.traces_dist, db.storage_policy, db.skip_unavailable_shards)
|
||||
await upgradeSingle(db.name, 6, scripts.profiles_dist, db.storage_policy, db.skip_unavailable_shards)
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,10 +39,18 @@ let isDBCreated = false
|
||||
* @param key {number}
|
||||
* @param scripts {string[]}
|
||||
* @param storagePolicy {string}
|
||||
* @param skipUnavailableShards {boolean}
|
||||
*/
|
||||
const upgradeSingle = async (db, key, scripts, storagePolicy) => {
|
||||
const upgradeSingle = async (db, key, scripts, storagePolicy, skipUnavailableShards) => {
|
||||
const _upgradeRequest = (request, useDefaultDB, updateVer) => {
|
||||
return upgradeRequest({ db, useDefaultDB, updateVer, storage_policy: storagePolicy }, request)
|
||||
return upgradeRequest({
|
||||
db,
|
||||
useDefaultDB,
|
||||
updateVer,
|
||||
storage_policy:
|
||||
storagePolicy,
|
||||
skip_unavailable_shards: skipUnavailableShards
|
||||
}, request)
|
||||
}
|
||||
if (!isDBCreated) {
|
||||
isDBCreated = true
|
||||
@ -68,7 +76,13 @@ const upgradeSingle = async (db, key, scripts, storagePolicy) => {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param opts {{db: string, useDefaultDB: boolean, updateVer: {key: number, to: number}, storage_policy: string}}
|
||||
* @param opts {{
|
||||
* db: string,
|
||||
* useDefaultDB: boolean,
|
||||
* updateVer: {key: number, to: number},
|
||||
* storage_policy: string,
|
||||
* skip_unavailable_shards: boolean
|
||||
* }}
|
||||
* @param request {string} database to update
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
@ -83,7 +97,8 @@ const upgradeRequest = async (opts, request) => {
|
||||
MergeTree: clusterName ? 'ReplicatedMergeTree' : 'MergeTree',
|
||||
ReplacingMergeTree: clusterName ? 'ReplicatedReplacingMergeTree' : 'ReplacingMergeTree',
|
||||
AggregatingMergeTree: clusterName ? 'ReplicatedAggregatingMergeTree' : 'AggregatingMergeTree',
|
||||
CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : ''
|
||||
CREATE_SETTINGS: opts.storage_policy ? `SETTINGS storage_policy='${opts.storage_policy}'` : '',
|
||||
DIST_CREATE_SETTINGS: opts.skip_unavailable_shards ? `SETTINGS skip_unavailable_shards=1` : ''
|
||||
})
|
||||
await client.rawRequest(request, null, opts.useDefaultDB ? opts.db : undefined)
|
||||
if (opts.updateVer) {
|
||||
|
@ -207,7 +207,7 @@ module.exports.overall_dist = [
|
||||
\`count\` AggregateFunction(count),
|
||||
\`sum\` SimpleAggregateFunction(sum, Float64),
|
||||
\`bytes\` SimpleAggregateFunction(sum, Float64)
|
||||
) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint)`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}', '{{DB}}', 'metrics_15s', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.samples_v3_dist {{{OnCluster}}} (
|
||||
\`fingerprint\` UInt64,
|
||||
@ -221,7 +221,7 @@ module.exports.overall_dist = [
|
||||
\`fingerprint\` UInt64,
|
||||
\`labels\` String,
|
||||
\`name\` String
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint);`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.settings_dist {{{OnCluster}}} (
|
||||
\`fingerprint\` UInt64,
|
||||
@ -229,14 +229,14 @@ module.exports.overall_dist = [
|
||||
\`name\` String,
|
||||
\`value\` String,
|
||||
\`inserted_at\` DateTime64(9, 'UTC')
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand());`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'settings', rand()) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.time_series_gin_dist {{{OnCluster}}} (
|
||||
date Date,
|
||||
key String,
|
||||
val String,
|
||||
fingerprint UInt64
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand());`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'time_series_gin', rand()) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
'ALTER TABLE {{DB}}.metrics_15s_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS `type` UInt8;',
|
||||
|
||||
@ -254,7 +254,7 @@ module.exports.traces_dist = [
|
||||
key String,
|
||||
val_id String,
|
||||
val String
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key));`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key)) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_dist {{{OnCluster}}} (
|
||||
oid String,
|
||||
@ -267,7 +267,7 @@ module.exports.traces_dist = [
|
||||
service_name String,
|
||||
payload_type Int8,
|
||||
payload String,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id));`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin_dist {{{OnCluster}}} (
|
||||
oid String,
|
||||
@ -278,7 +278,7 @@ module.exports.traces_dist = [
|
||||
span_id FixedString(8),
|
||||
timestamp_ns Int64,
|
||||
duration Int64
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id));`
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_attrs_gin', sipHash64(oid, trace_id)) {{{DIST_CREATE_SETTINGS}}};`
|
||||
]
|
||||
|
||||
module.exports.profiles = [
|
||||
@ -440,7 +440,7 @@ module.exports.profiles_dist = [
|
||||
payload_type LowCardinality(String),
|
||||
payload String,
|
||||
values_agg Array(Tuple(String, Int64, Int32))
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint);`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_dist {{{OnCluster}}} (
|
||||
date Date,
|
||||
@ -448,7 +448,7 @@ module.exports.profiles_dist = [
|
||||
service_name LowCardinality(String),
|
||||
fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)),
|
||||
tags Array(Tuple(String, String)) CODEC(ZSTD(1))
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint);`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_gin_dist {{{OnCluster}}} (
|
||||
date Date,
|
||||
@ -457,14 +457,14 @@ module.exports.profiles_dist = [
|
||||
type_id LowCardinality(String),
|
||||
service_name LowCardinality(String),
|
||||
fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1))
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint);`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_keys_dist {{{OnCluster}}} (
|
||||
date Date,
|
||||
key String,
|
||||
val String,
|
||||
val_id UInt64
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`,
|
||||
) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand()) {{{DIST_CREATE_SETTINGS}}};`,
|
||||
|
||||
`ALTER TABLE {{DB}}.profiles_dist {{{OnCluster}}}
|
||||
ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))),
|
||||
|
Reference in New Issue
Block a user