fix: a rare deadlock with blmove and multi-db (#4568)

The bug requires lots of conditions in order to reproduce:
1. blocking operations on multiple databases
2. use of lua scripts that wake blocking transactions

The bug was discovered due to a deadlock in BLMOVE but could also manifest with other commands that would
"disappear" causing local starvation effects on the connections sending them.
With BLMOVE it causes a global deadlock in the transaction queue in dragonfly.

The fix is actually deleting a few lines of code introduced by #3260 from 6 months ago,
so it is actually a long lived regression.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman
2025-02-09 09:58:24 +02:00
committed by GitHub
parent 9262ec9b0f
commit 6f8a52ecef
3 changed files with 117 additions and 9 deletions

View File

@ -632,17 +632,11 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
trans = nullptr;
if ((is_self && disarmed) || continuation_trans_->DisarmInShard(sid)) {
auto bc = continuation_trans_->GetNamespace().GetBlockingController(shard_id_);
if (bool keep = run(continuation_trans_, false); !keep) {
// if this holds, we can remove this check altogether.
DCHECK(continuation_trans_ == nullptr);
continuation_trans_ = nullptr;
}
if (bc && bc->HasAwakedTransaction()) {
// Break if there are any awakened transactions, as we must give way to them
// before continuing to handle regular transactions from the queue.
return;
}
}
}
@ -684,7 +678,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
// If we disarmed, but didn't find ourselves in the loop, run now.
if (trans && disarmed) {
DCHECK(trans != head);
DCHECK(trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q));
bool is_ooo = trans_mask & Transaction::OUT_OF_ORDER;

View File

@ -1307,5 +1307,35 @@ TEST_F(ListFamilyTest, LMPopWrongType) {
EXPECT_THAT(resp, RespArray(ElementsAre("l1", RespArray(ElementsAre("e1")))));
}
// Reproduce a flow that trigerred a wrong DCHECK in the transaction flow.
TEST_F(ListFamilyTest, AwakeMulti) {
auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] {
for (unsigned i = 0; i < 100; ++i) {
Run("CONSUMER", {"blmove", "src", "dest", "LEFT", "LEFT", "0"});
};
});
auto f2 = pp_->at(1)->LaunchFiber([&] {
for (unsigned i = 0; i < 100; ++i) {
Run("PROD", {"lpush", "src", "a"});
ThisFiber::SleepFor(50us);
};
});
auto f3 = pp_->at(2)->LaunchFiber([&] {
for (unsigned i = 0; i < 100; ++i) {
Run({"multi"});
for (unsigned j = 0; j < 8; ++j) {
Run({"get", StrCat("key", j)});
};
Run({"exec"});
};
});
f1.Join();
f2.Join();
f3.Join();
}
#pragma GCC diagnostic pop
} // namespace dfly

View File

@ -1,4 +1,3 @@
import os
import logging
import pytest
import redis
@ -6,7 +5,7 @@ import asyncio
from redis import asyncio as aioredis
from . import dfly_multi_test_args, dfly_args
from .instance import DflyStartException
from .instance import DflyInstance, DflyStartException
from .utility import batch_fill_data, gen_test_data, EnvironCntx
from .seeder import StaticSeeder
@ -81,6 +80,92 @@ async def test_txq_ooo(async_client: aioredis.Redis, df_server):
)
@dfly_args({"proactor_threads": 2, "num_shards": 2})
async def test_blocking_multiple_dbs(async_client: aioredis.Redis, df_server: DflyInstance):
active = True
# A task to trigger the flow that eventually looses a transaction
# blmove is used to trigger a global deadlock, but we could use any
# command - the effect would be - a deadlocking locally that connection
async def blmove_task_loose(num):
async def run(id):
c = df_server.client()
await c.lpush(f"key{id}", "val")
while active:
await c.blmove(f"key{id}", f"key{id}", 0, "LEFT", "LEFT")
await asyncio.sleep(0.01)
tasks = []
for i in range(num):
tasks.append(run(i))
await asyncio.gather(*tasks)
# A task that creates continuation_trans_ by constantly timing out on
# an empty set. We could probably use any 2-hop operation like rename.
async def task_blocking(num):
async def block(id):
c = df_server.client()
while active:
await c.blmove(f"{{{id}}}from", f"{{{id}}}to", 0.1, "LEFT", "LEFT")
tasks = []
for i in range(num):
tasks.append(block(i))
await asyncio.gather(*tasks)
# produce is constantly waking up consumers. It is used to trigger the
# flow that creates wake ups on a differrent database in the
# middle of continuation transaction.
async def tasks_produce(num, iters):
LPUSH_SCRIPT = """
redis.call('LPUSH', KEYS[1], "val")
"""
async def produce(id):
c = df_server.client(db=1) # important to be on a different db
for i in range(iters):
# Must be a lua script and not multi-exec for some reason.
await c.eval(LPUSH_SCRIPT, 1, f"list{{{id}}}")
tasks = []
for i in range(num):
task = asyncio.create_task(produce(i))
tasks.append(task)
await asyncio.gather(*tasks)
logging.info("Finished producing")
# works with producer to constantly block and wake up
async def tasks_consume(num, iters):
async def drain(id, iters):
client = df_server.client(db=1)
for _ in range(iters):
await client.blmove(f"list{{{id}}}", f"sink{{{id}}}", 0, "LEFT", "LEFT")
tasks = []
for i in range(num):
task = asyncio.create_task(drain(i, iters))
tasks.append(task)
await asyncio.gather(*tasks)
logging.info("Finished consuming")
num_keys = 32
num_iters = 200
async_task1 = asyncio.create_task(blmove_task_loose(num_keys))
async_task2 = asyncio.create_task(task_blocking(num_keys))
logging.info("Starting tasks")
await asyncio.gather(
tasks_consume(num_keys, num_iters),
tasks_produce(num_keys, num_iters),
)
logging.info("Finishing tasks")
active = False
await asyncio.gather(async_task1, async_task2)
async def test_arg_from_environ_overwritten_by_cli(df_factory):
with EnvironCntx(DFLY_port="6378"):
with df_factory.create(port=6377):