feat: add node health status into config (#4758)

This commit is contained in:
Borys
2025-03-13 09:20:02 +02:00
committed by GitHub
parent 93bd52ceb8
commit 8d6a18465b
4 changed files with 114 additions and 51 deletions

View File

@ -5,6 +5,7 @@
#include "cluster_config.h"
#include <absl/container/flat_hash_set.h>
#include <absl/strings/match.h>
#include <optional>
#include <string_view>
@ -161,13 +162,13 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
return SlotRanges(ranges);
}
optional<ClusterNodeInfo> ParseClusterNode(const JsonType& json) {
optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
}
ClusterNodeInfo node;
ClusterExtendedNodeInfo node;
{
auto id = json.at_or_null("id");
@ -195,6 +196,26 @@ optional<ClusterNodeInfo> ParseClusterNode(const JsonType& json) {
node.port = port.value();
}
{
auto health = json.at_or_null("health");
if (!health.is_null()) {
if (!health.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node " << json;
} else {
auto health_str = std::move(health).as_string();
if (absl::EqualsIgnoreCase(health_str, "FAIL")) {
node.health = NodeHealth::FAIL;
} else if (absl::EqualsIgnoreCase(health_str, "LOADING")) {
node.health = NodeHealth::LOADING;
} else if (absl::EqualsIgnoreCase(health_str, "ONLINE")) {
node.health = NodeHealth::ONLINE;
} else {
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
}
}
}
}
return node;
}

View File

@ -95,41 +95,47 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 16000}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}}}),
nullptr);
EXPECT_EQ(
ClusterConfig::CreateFromConfig(
kMyId,
{{.slot_ranges = SlotRanges({{.start = 0, .end = 16000}}),
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
.replicas = {},
.migrations = {}}}),
nullptr);
}
TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId,
ClusterShardInfos({{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {},
.migrations = {}}})),
kMyId, ClusterShardInfos(
{{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000},
NodeHealth::ONLINE},
.replicas = {},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
.master = {{.id = "other2", .ip = "192.168.0.101", .port = 7001},
NodeHealth::ONLINE},
.replicas = {},
.migrations = {}}})),
nullptr);
}
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotId) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF + 1}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}}}),
nullptr);
EXPECT_EQ(
ClusterConfig::CreateFromConfig(
kMyId,
{{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF + 1}}),
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
.replicas = {},
.migrations = {}}}),
nullptr);
}
TEST_F(ClusterConfigTest, ConfigSetOk) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
.replicas = {},
.migrations = {}}});
EXPECT_NE(config, nullptr);
@ -140,10 +146,12 @@ TEST_F(ClusterConfigTest, ConfigSetOk) {
TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}}});
kMyId,
{{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {{.id = "other-master", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
.replicas = {{{.id = "other-replica", .ip = "192.168.0.101", .port = 7001},
NodeHealth::ONLINE}},
.migrations = {}}});
EXPECT_NE(config, nullptr);
EXPECT_THAT(config->GetMasterNodeForSlot(0),
NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000}));
@ -151,19 +159,25 @@ TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {
TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, ClusterShardInfos(
{{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}},
.migrations = {}}}));
kMyId,
ClusterShardInfos(
{{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
.master = {{.id = "other-master", .ip = "192.168.0.100", .port = 7000},
NodeHealth::ONLINE},
.replicas = {{{.id = "other-replica", .ip = "192.168.0.101", .port = 7001},
NodeHealth::ONLINE}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
.master = {{.id = kMyId, .ip = "192.168.0.102", .port = 7002}, NodeHealth::ONLINE},
.replicas = {{{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003},
NodeHealth::ONLINE}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
.master = {{.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
NodeHealth::ONLINE},
.replicas = {{{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005},
NodeHealth::ONLINE}},
.migrations = {}}}));
EXPECT_NE(config, nullptr);
SlotSet owned_slots = config->GetOwnedSlots();
EXPECT_EQ(owned_slots.ToSlotRanges().Size(), 1);
@ -705,4 +719,23 @@ TEST_F(ClusterConfigTest, ConfigComparison) {
EXPECT_EQ(config5->GetConfig(), config5->GetConfig());
}
TEST_F(ClusterConfigTest, NodesHealth) {
auto config1 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 16383 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000, "health" : "online" },
"replicas": [{ "id": "id1", "ip": "localhost", "port": 3001, "health" : "loading" },
{ "id": "id2", "ip": "localhost", "port": 3002, "health" : "fail" }],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
}
])json");
EXPECT_EQ(config1->GetConfig().begin()->master.health, NodeHealth::ONLINE);
EXPECT_EQ(config1->GetConfig().begin()->replicas.front().health, NodeHealth::LOADING);
EXPECT_EQ(config1->GetConfig().begin()->replicas.back().health, NodeHealth::FAIL);
}
} // namespace dfly::cluster

View File

@ -94,6 +94,12 @@ struct ClusterNodeInfo {
}
};
enum class NodeHealth : std::uint8_t { NONE, FAIL, LOADING, ONLINE };
struct ClusterExtendedNodeInfo : ClusterNodeInfo {
NodeHealth health = NodeHealth::NONE;
};
struct MigrationInfo {
SlotRanges slot_ranges;
ClusterNodeInfo node_info;
@ -111,8 +117,8 @@ struct MigrationInfo {
struct ClusterShardInfo {
SlotRanges slot_ranges;
ClusterNodeInfo master;
std::vector<ClusterNodeInfo> replicas;
ClusterExtendedNodeInfo master;
std::vector<ClusterExtendedNodeInfo> replicas;
std::vector<MigrationInfo> migrations;
bool operator==(const ClusterShardInfo& r) const;

View File

@ -149,21 +149,24 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
? static_cast<uint16_t>(absl::GetFlag(FLAGS_port))
: cluster_announce_port;
info.master = {.id = id_, .ip = preferred_endpoint, .port = preferred_port};
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, NodeHealth::NONE};
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
info.replicas.push_back({{.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)},
NodeHealth::NONE});
}
}
} else {
// TODO: We currently don't save the master's ID in the replica
info.master = {.id = "", .ip = replication_info->host, .port = replication_info->port};
info.replicas.push_back({.id = id_,
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))});
info.master = {{.id = "", .ip = replication_info->host, .port = replication_info->port},
NodeHealth::ONLINE};
info.replicas.push_back({{.id = id_,
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))},
NodeHealth::NONE});
}
return info;