Перейти к основному содержимому

Блокировки и конкурентный доступ в PostgreSQL: deadlocks и их предотвращение

Введение

Блокировки (locks) — это фундаментальный механизм управления конкурентным доступом к данным в многопользовательских системах. PostgreSQL использует сложную систему блокировок, которая обеспечивает баланс между производительностью и целостностью данных. Понимание того, как работают блокировки, критически важно для создания высокопроизводительных приложений.

Зачем изучать блокировки?

В высоконагруженных системах неправильное использование блокировок может привести к деградации производительности, deadlocks и зависанию приложения. Знание механизмов блокировок позволяет избежать этих проблем.

Типы блокировок в PostgreSQL

PostgreSQL использует несколько типов блокировок для управления конкурентным доступом:

  1. Table-level locks (блокировки таблиц)
  2. Row-level locks (блокировки строк)
  3. Page-level locks (блокировки страниц)
  4. Advisory locks (рекомендательные блокировки)
  5. Deadlock detection (обнаружение взаимных блокировок)

Table-Level Locks (Блокировки таблиц)

Режимы блокировок таблиц

PostgreSQL поддерживает 8 режимов блокировок таблиц, от самых слабых до самых сильных:

Режим блокировкиОписаниеТипичное использование
ACCESS SHAREСамая слабая, только чтениеSELECT
ROW SHAREБлокирует EXCLUSIVE и вышеSELECT FOR UPDATE
ROW EXCLUSIVEБлокирует SHARE и вышеINSERT, UPDATE, DELETE
SHARE UPDATE EXCLUSIVEЗащищает от одновременных изменений структурыVACUUM, CREATE INDEX CONCURRENTLY
SHAREБлокирует изменения данныхCREATE INDEX
SHARE ROW EXCLUSIVEЗащищает от изменений данных и структурыРедко используется напрямую
EXCLUSIVEБлокирует все кроме ACCESS SHAREREFRESH MATERIALIZED VIEW
ACCESS EXCLUSIVEПолная блокировкаALTER TABLE, DROP TABLE, TRUNCATE

Матрица конфликтов блокировок

-- Создадим таблицу для демонстрации
CREATE TABLE lock_demo (
id SERIAL PRIMARY KEY,
value INTEGER,
description TEXT
);

INSERT INTO lock_demo (value, description) VALUES (1, 'Test row');

Матрица конфликтов:

ACCESS SHAREROW SHAREROW EXCLUSIVESHARE UPDATE EXCLUSIVESHARESHARE ROW EXCLUSIVEEXCLUSIVEACCESS EXCLUSIVE
ACCESS SHARE
ROW SHARE
ROW EXCLUSIVE
SHARE UPDATE EXCLUSIVE
SHARE
SHARE ROW EXCLUSIVE
EXCLUSIVE
ACCESS EXCLUSIVE

✅ = совместимы, ❌ = конфликт

Явное получение блокировок таблиц

-- ACCESS SHARE (автоматически при SELECT)
BEGIN;
SELECT * FROM lock_demo;
-- Устанавливается ACCESS SHARE lock
COMMIT;

-- ROW SHARE (автоматически при SELECT FOR UPDATE)
BEGIN;
SELECT * FROM lock_demo FOR UPDATE;
-- Устанавливается ROW SHARE lock на таблицу
-- и блокируются выбранные строки
COMMIT;

-- ROW EXCLUSIVE (автоматически при изменениях)
BEGIN;
UPDATE lock_demo SET value = 2 WHERE id = 1;
-- Устанавливается ROW EXCLUSIVE lock
COMMIT;

-- Явное получение блокировки
BEGIN;
LOCK TABLE lock_demo IN SHARE MODE;
-- Теперь другие транзакции могут читать,
-- но не могут изменять таблицу

-- Выполняем операции...
SELECT * FROM lock_demo;
COMMIT;

-- Exclusive блокировка
BEGIN;
LOCK TABLE lock_demo IN EXCLUSIVE MODE;
-- Только эта транзакция может изменять таблицу
-- Другие могут только читать
COMMIT;

-- Полная блокировка
BEGIN;
LOCK TABLE lock_demo IN ACCESS EXCLUSIVE MODE;
-- Никто не может работать с таблицей
-- Используется при ALTER TABLE, DROP TABLE
COMMIT;

Демонстрация конфликтов блокировок

-- Сессия 1: Начинаем транзакцию с SHARE lock
BEGIN;
LOCK TABLE lock_demo IN SHARE MODE;
SELECT pg_backend_pid(); -- Узнаем PID процесса
-- Держим транзакцию открытой...

-- Сессия 2: Пытаемся изменить данные
BEGIN;
UPDATE lock_demo SET value = 3 WHERE id = 1;
-- БЛОКИРУЕТСЯ! Ждет освобождения SHARE lock
-- Сессия 1 имеет SHARE, которая конфликтует с ROW EXCLUSIVE

-- Сессия 3: Пытаемся читать (это работает)
BEGIN;
SELECT * FROM lock_demo;
-- УСПЕШНО! ACCESS SHARE совместима с SHARE
COMMIT;

-- Сессия 1: Завершаем транзакцию
COMMIT;
-- Теперь Сессия 2 разблокируется

Row-Level Locks (Блокировки строк)

Режимы блокировок строк

PostgreSQL поддерживает 4 режима блокировок строк:

  1. FOR UPDATE — эксклюзивная блокировка строки
  2. FOR NO KEY UPDATE — эксклюзивная без блокировки ключей
  3. FOR SHARE — разделяемая блокировка
  4. FOR KEY SHARE — слабая разделяемая блокировка

FOR UPDATE

-- Подготовка
CREATE TABLE accounts (
account_id SERIAL PRIMARY KEY,
balance NUMERIC(12, 2),
account_holder VARCHAR(100)
);

INSERT INTO accounts (balance, account_holder) VALUES
(1000.00, 'Alice'),
(500.00, 'Bob'),
(750.00, 'Charlie');

-- Сессия 1: Блокируем строку для обновления
BEGIN;
SELECT * FROM accounts
WHERE account_id = 1
FOR UPDATE;

-- Строка заблокирована эксклюзивно
-- Другие транзакции не могут:
-- - UPDATE/DELETE эту строку
-- - SELECT FOR UPDATE эту строку
-- Но могут: SELECT без FOR UPDATE

-- Выполняем изменения
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
COMMIT;

-- Сессия 2 (параллельно): Пытаемся заблокировать ту же строку
BEGIN;
SELECT * FROM accounts
WHERE account_id = 1
FOR UPDATE;
-- БЛОКИРУЕТСЯ! Ждет завершения Сессии 1
COMMIT;

-- Сессия 3 (параллельно): Обычное чтение работает
BEGIN;
SELECT * FROM accounts WHERE account_id = 1;
-- УСПЕШНО! Читает данные без блокировки
COMMIT;

FOR NO KEY UPDATE

-- FOR NO KEY UPDATE позволяет другим транзакциям создавать
-- внешние ключи на эту строку

CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
account_id INTEGER REFERENCES accounts(account_id),
amount NUMERIC(12, 2)
);

-- Сессия 1: Блокируем с FOR NO KEY UPDATE
BEGIN;
SELECT * FROM accounts
WHERE account_id = 1
FOR NO KEY UPDATE;

-- Можем изменять неключевые поля
UPDATE accounts SET balance = balance - 50 WHERE account_id = 1;

-- Сессия 2 (параллельно): Можем создать заказ с FK
BEGIN;
INSERT INTO orders (account_id, amount)
VALUES (1, 100.00);
-- УСПЕШНО! FOR NO KEY UPDATE позволяет это
COMMIT;

-- Сессия 1: Завершаем
COMMIT;

-- Если бы использовали FOR UPDATE вместо FOR NO KEY UPDATE,
-- Сессия 2 была бы заблокирована

FOR SHARE

-- FOR SHARE позволяет нескольким транзакциям
-- читать и блокировать строку одновременно

-- Сессия 1: Блокируем с FOR SHARE
BEGIN;
SELECT * FROM accounts
WHERE account_id = 1
FOR SHARE;

-- Строка заблокирована для чтения

-- Сессия 2 (параллельно): Тоже можем получить FOR SHARE
BEGIN;
SELECT * FROM accounts
WHERE account_id = 1
FOR SHARE;
-- УСПЕШНО! Несколько FOR SHARE совместимы
COMMIT;

-- Сессия 3 (параллельно): Попытка UPDATE
BEGIN;
UPDATE accounts SET balance = 600 WHERE account_id = 1;
-- БЛОКИРУЕТСЯ! FOR SHARE предотвращает изменения
ROLLBACK;

-- Сессия 1: Завершаем
COMMIT;

FOR KEY SHARE

-- FOR KEY SHARE — самая слабая блокировка строки
-- Используется для проверки существования строки для FK

-- Сессия 1: Блокируем с FOR KEY SHARE
BEGIN;
SELECT * FROM accounts
WHERE account_id = 1
FOR KEY SHARE;

-- Сессия 2 (параллельно): Можем изменять неключевые поля
BEGIN;
UPDATE accounts
SET balance = balance + 100
WHERE account_id = 1;
-- УСПЕШНО! FOR KEY SHARE не блокирует обычные UPDATE
COMMIT;

-- Сессия 3 (параллельно): Но не можем удалить
BEGIN;
DELETE FROM accounts WHERE account_id = 1;
-- БЛОКИРУЕТСЯ! FOR KEY SHARE предотвращает DELETE
ROLLBACK;

-- Сессия 1: Завершаем
COMMIT;

Матрица конфликтов блокировок строк

FOR KEY SHAREFOR SHAREFOR NO KEY UPDATEFOR UPDATE
FOR KEY SHARE
FOR SHARE
FOR NO KEY UPDATE
FOR UPDATE

Блокировка нескольких строк

-- Блокируем несколько строк одновременно
BEGIN;
SELECT * FROM accounts
WHERE balance > 500
FOR UPDATE;

-- Все строки с балансом > 500 заблокированы

UPDATE accounts
SET balance = balance * 1.1
WHERE balance > 500;
COMMIT;

-- Блокировка с сортировкой (важно для избежания deadlock!)
BEGIN;
SELECT * FROM accounts
WHERE account_id IN (1, 2, 3, 4, 5)
ORDER BY account_id -- Всегда блокируем в одном порядке!
FOR UPDATE;

-- Обрабатываем строки...
COMMIT;

NOWAIT и SKIP LOCKED

-- NOWAIT: не ждать блокировки, сразу вернуть ошибку
BEGIN;
SELECT * FROM accounts
WHERE account_id = 1
FOR UPDATE NOWAIT;
-- Если строка заблокирована, сразу получим ошибку:
-- ERROR: could not obtain lock on row in relation "accounts"
COMMIT;

-- Использование в приложении с retry
DO $$
DECLARE
v_account RECORD;
v_retries INTEGER := 3;
v_success BOOLEAN := FALSE;
BEGIN
WHILE v_retries > 0 AND NOT v_success LOOP
BEGIN
SELECT * INTO v_account
FROM accounts
WHERE account_id = 1
FOR UPDATE NOWAIT;

-- Получили блокировку, обрабатываем
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
v_success := TRUE;

EXCEPTION WHEN lock_not_available THEN
v_retries := v_retries - 1;
IF v_retries > 0 THEN
RAISE NOTICE 'Lock not available, retrying... (% attempts left)', v_retries;
PERFORM pg_sleep(0.1); -- Небольшая пауза
ELSE
RAISE EXCEPTION 'Could not acquire lock after multiple retries';
END IF;
END;
END LOOP;
END $$;

-- SKIP LOCKED: пропустить заблокированные строки
-- Полезно для обработки очередей задач
CREATE TABLE job_queue (
job_id SERIAL PRIMARY KEY,
status VARCHAR(20) DEFAULT 'pending',
task_data JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_by INTEGER
);

INSERT INTO job_queue (task_data)
SELECT jsonb_build_object('task', 'Task ' || i)
FROM generate_series(1, 100) i;

-- Worker 1: Берет первую доступную задачу
BEGIN;
SELECT * FROM job_queue
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED;

-- Если все задачи заблокированы другими workers,
-- запрос вернет 0 строк (не будет ждать)

-- Обрабатываем задачу...
UPDATE job_queue
SET status = 'processing', processed_by = pg_backend_pid()
WHERE job_id = 1;
COMMIT;

-- Worker 2 (параллельно): Берет следующую доступную задачу
BEGIN;
SELECT * FROM job_queue
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED;

-- Получит job_id = 2, т.к. job_id = 1 заблокирован Worker 1
COMMIT;

Deadlocks (Взаимные блокировки)

Что такое Deadlock?

Deadlock возникает когда две или более транзакций ждут друг друга, создавая циклическую зависимость блокировок.

Транзакция A блокирует ресурс X, ждет ресурс Y
Транзакция B блокирует ресурс Y, ждет ресурс X
→ Deadlock!

Классический пример Deadlock

-- Подготовка
CREATE TABLE resources (
resource_id INTEGER PRIMARY KEY,
resource_name VARCHAR(50),
value INTEGER
);

INSERT INTO resources VALUES
(1, 'Resource A', 100),
(2, 'Resource B', 200);

-- Сессия 1
BEGIN;
-- Шаг 1: Блокируем ресурс A
UPDATE resources SET value = value + 10 WHERE resource_id = 1;
SELECT pg_sleep(2); -- Имитация обработки

-- Шаг 2: Пытаемся заблокировать ресурс B
UPDATE resources SET value = value + 20 WHERE resource_id = 2;
-- ЖДЕТ освобождения от Сессии 2...
COMMIT;

-- Сессия 2 (запускается одновременно с Сессией 1)
BEGIN;
-- Шаг 1: Блокируем ресурс B
UPDATE resources SET value = value + 30 WHERE resource_id = 2;
SELECT pg_sleep(2); -- Имитация обработки

-- Шаг 2: Пытаемся заблокировать ресурс A
UPDATE resources SET value = value + 40 WHERE resource_id = 1;
-- ЖДЕТ освобождения от Сессии 1...

-- DEADLOCK DETECTED!
-- ERROR: deadlock detected
-- DETAIL: Process 1234 waits for ShareLock on transaction 567;
-- blocked by process 5678.
-- Process 5678 waits for ShareLock on transaction 890;
-- blocked by process 1234.
ROLLBACK;

-- PostgreSQL автоматически обнаруживает deadlock
-- (по умолчанию через 1 секунду) и откатывает одну транзакцию

Deadlock с блокировкой строк

-- Более реалистичный пример: перевод денег

-- Сессия 1: Перевод от Alice к Bob
BEGIN;
-- Блокируем счет Alice
UPDATE accounts SET balance = balance - 100 WHERE account_holder = 'Alice';
SELECT pg_sleep(1);

-- Пытаемся заблокировать счет Bob
UPDATE accounts SET balance = balance + 100 WHERE account_holder = 'Bob';
COMMIT;

-- Сессия 2 (одновременно): Перевод от Bob к Alice
BEGIN;
-- Блокируем счет Bob
UPDATE accounts SET balance = balance - 50 WHERE account_holder = 'Bob';
SELECT pg_sleep(1);

-- Пытаемся заблокировать счет Alice
UPDATE accounts SET balance = balance + 50 WHERE account_holder = 'Alice';
-- DEADLOCK!
ROLLBACK;

Обнаружение Deadlock в логах

-- Настройка логирования deadlock в postgresql.conf:
-- deadlock_timeout = 1s (время ожидания перед проверкой deadlock)
-- log_lock_waits = on (логировать долгие ожидания блокировок)

-- Пример записи в логе:
/*
ERROR: deadlock detected
DETAIL: Process 12345 waits for ShareLock on transaction 67890;
blocked by process 54321.
Process 54321 waits for ShareLock on transaction 98765;
blocked by process 12345.
HINT: See server log for query details.
CONTEXT: while updating tuple (0,1) in relation "accounts"
*/

Предотвращение Deadlocks

Стратегия 1: Упорядочивание доступа к ресурсам

-- ПЛОХО: Разный порядок блокировки
-- Сессия 1: блокирует A, потом B
-- Сессия 2: блокирует B, потом A
-- → Возможен deadlock

-- ХОРОШО: Одинаковый порядок блокировки
-- Обе сессии: всегда блокируют сначала A, потом B

-- Пример: всегда блокируем счета в порядке ID
CREATE OR REPLACE FUNCTION transfer_money_safe(
from_account_id INTEGER,
to_account_id INTEGER,
amount NUMERIC
) RETURNS BOOLEAN AS $$
DECLARE
first_id INTEGER;
second_id INTEGER;
BEGIN
-- Определяем порядок блокировки
IF from_account_id < to_account_id THEN
first_id := from_account_id;
second_id := to_account_id;
ELSE
first_id := to_account_id;
second_id := from_account_id;
END IF;

-- Блокируем в правильном порядке
PERFORM * FROM accounts WHERE account_id = first_id FOR UPDATE;
PERFORM * FROM accounts WHERE account_id = second_id FOR UPDATE;

-- Проверяем баланс
IF (SELECT balance FROM accounts WHERE account_id = from_account_id) < amount THEN
RAISE EXCEPTION 'Insufficient funds';
END IF;

-- Выполняем перевод
UPDATE accounts SET balance = balance - amount WHERE account_id = from_account_id;
UPDATE accounts SET balance = balance + amount WHERE account_id = to_account_id;

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

-- Использование
BEGIN;
SELECT transfer_money_safe(1, 2, 100); -- Alice → Bob
COMMIT;

BEGIN;
SELECT transfer_money_safe(2, 1, 50); -- Bob → Alice
COMMIT;
-- Теперь deadlock невозможен!

Стратегия 2: Минимизация времени удержания блокировок

-- ПЛОХО: Долгие операции внутри транзакции
BEGIN;
SELECT * FROM accounts WHERE account_id = 1 FOR UPDATE;

-- Долгие вычисления (опасно!)
PERFORM pg_sleep(10);

-- Вызов внешнего API
-- ... медленная операция ...

UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
COMMIT;

-- ХОРОШО: Минимальное время в транзакции
-- Вычисления ДО транзакции
-- ... выполняем вычисления ...

BEGIN;
SELECT * FROM accounts WHERE account_id = 1 FOR UPDATE;
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
COMMIT;

-- Вызов API ПОСЛЕ транзакции
-- ... вызываем API ...

Стратегия 3: Использование LOCK TABLE

-- Получаем все необходимые блокировки сразу
BEGIN;
-- Блокируем обе таблицы в начале
LOCK TABLE accounts IN EXCLUSIVE MODE;
LOCK TABLE transactions IN ROW EXCLUSIVE MODE;

-- Теперь безопасно работаем с обеими таблицами
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
INSERT INTO transactions (account_id, amount) VALUES (1, -100);

UPDATE accounts SET balance = balance + 100 WHERE account_id = 2;
INSERT INTO transactions (account_id, amount) VALUES (2, 100);
COMMIT;

Стратегия 4: Retry логика

-- Автоматический повтор при deadlock
CREATE OR REPLACE FUNCTION execute_with_retry(
operation_func TEXT,
max_retries INTEGER DEFAULT 3
) RETURNS BOOLEAN AS $$
DECLARE
retry_count INTEGER := 0;
last_error TEXT;
BEGIN
WHILE retry_count < max_retries LOOP
BEGIN
-- Выполняем операцию
EXECUTE operation_func;
RETURN TRUE;

EXCEPTION
WHEN deadlock_detected THEN
retry_count := retry_count + 1;
last_error := SQLERRM;

IF retry_count < max_retries THEN
RAISE NOTICE 'Deadlock detected, retry % of %',
retry_count, max_retries;
-- Небольшая случайная задержка
PERFORM pg_sleep(random() * 0.1);
ELSE
RAISE EXCEPTION 'Max retries exceeded. Last error: %', last_error;
END IF;

WHEN serialization_failure THEN
retry_count := retry_count + 1;

IF retry_count < max_retries THEN
RAISE NOTICE 'Serialization failure, retry % of %',
retry_count, max_retries;
PERFORM pg_sleep(random() * 0.1);
ELSE
RAISE EXCEPTION 'Max retries exceeded';
END IF;
END;
END LOOP;

RETURN FALSE;
END;
$$ LANGUAGE plpgsql;

-- Пример функции переноса
CREATE OR REPLACE FUNCTION transfer_with_retry()
RETURNS VOID AS $$
BEGIN
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
UPDATE accounts SET balance = balance + 100 WHERE account_id = 2;
END;
$$ LANGUAGE plpgsql;

-- Использование
SELECT execute_with_retry('SELECT transfer_with_retry()');

Стратегия 5: Использование уровня изоляции Serializable

-- Serializable level автоматически обнаруживает конфликты
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
UPDATE accounts SET balance = balance + 100 WHERE account_id = 2;
COMMIT;

-- При конфликте получим ошибку сериализации вместо deadlock
-- ERROR: could not serialize access due to concurrent update

Стратегия 6: SELECT FOR UPDATE с NOWAIT

-- Не ждем блокировки, сразу возвращаем ошибку
CREATE OR REPLACE FUNCTION transfer_nowait(
from_id INTEGER,
to_id INTEGER,
amount NUMERIC
) RETURNS BOOLEAN AS $$
DECLARE
v_balance NUMERIC;
BEGIN
-- Пытаемся получить блокировки с NOWAIT
BEGIN
SELECT balance INTO v_balance
FROM accounts
WHERE account_id = from_id
FOR UPDATE NOWAIT;
EXCEPTION WHEN lock_not_available THEN
RAISE NOTICE 'Account % is locked, aborting', from_id;
RETURN FALSE;
END;

BEGIN
PERFORM * FROM accounts
WHERE account_id = to_id
FOR UPDATE NOWAIT;
EXCEPTION WHEN lock_not_available THEN
RAISE NOTICE 'Account % is locked, aborting', to_id;
RETURN FALSE;
END;

-- Проверяем баланс
IF v_balance < amount THEN
RAISE EXCEPTION 'Insufficient funds';
END IF;

-- Выполняем перевод
UPDATE accounts SET balance = balance - amount WHERE account_id = from_id;
UPDATE accounts SET balance = balance + amount WHERE account_id = to_id;

RETURN TRUE;
END;
$$ LANGUAGE plpgsql;

Advisory Locks (Рекомендательные блокировки)

Концепция Advisory Locks

Advisory locks — это блокировки на уровне приложения, не привязанные к конкретным строкам или таблицам. Приложение само решает, что блокировать.

-- Получение advisory lock по числовому ID
SELECT pg_advisory_lock(12345);

-- Блокировка получена, теперь только эта сессия может работать с "ресурсом 12345"

-- Выполняем критическую секцию...

-- Освобождаем блокировку
SELECT pg_advisory_unlock(12345);

-- Или автоматическое освобождение при завершении транзакции
BEGIN;
SELECT pg_advisory_xact_lock(12345);
-- Критическая секция
COMMIT; -- Автоматически освобождается

Типы Advisory Locks

-- 1. Сессионный уровень (session-level)
SELECT pg_advisory_lock(key); -- Эксклюзивная блокировка
SELECT pg_advisory_lock_shared(key); -- Разделяемая блокировка
SELECT pg_advisory_unlock(key); -- Освобождение

-- 2. Транзакционный уровень (transaction-level)
BEGIN;
SELECT pg_advisory_xact_lock(key); -- Эксклюзивная
SELECT pg_advisory_xact_lock_shared(key); -- Разделяемая
-- Автоматически освобождается при COMMIT/ROLLBACK
COMMIT;

-- 3. Неблокирующие варианты (try)
SELECT pg_try_advisory_lock(key); -- Возвращает true/false
SELECT pg_try_advisory_lock_shared(key);

-- 4. Двойной ключ (для namespace)
SELECT pg_advisory_lock(namespace, key);
SELECT pg_advisory_unlock(namespace, key);

Практические примеры Advisory Locks

Пример 1: Предотвращение дублирования задач

-- Гарантируем, что задача выполняется только один раз
CREATE OR REPLACE FUNCTION process_job_safely(job_id INTEGER)
RETURNS BOOLEAN AS $
BEGIN
-- Пытаемся получить блокировку для этой задачи
IF NOT pg_try_advisory_lock(job_id) THEN
RAISE NOTICE 'Job % is already being processed', job_id;
RETURN FALSE;
END IF;

-- Блокировка получена, обрабатываем задачу
BEGIN
RAISE NOTICE 'Processing job %', job_id;

-- Имитация обработки
PERFORM pg_sleep(5);

RAISE NOTICE 'Job % completed', job_id;

RETURN TRUE;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Error processing job %: %', job_id, SQLERRM;
RETURN FALSE;
FINALLY
-- Всегда освобождаем блокировку
PERFORM pg_advisory_unlock(job_id);
END;
END;
$ LANGUAGE plpgsql;

-- Использование в нескольких воркерах
-- Worker 1
SELECT process_job_safely(100);

-- Worker 2 (одновременно)
SELECT process_job_safely(100);
-- Вернет FALSE, так как job уже обрабатывается

Пример 2: Singleton паттерн для задач

-- Обеспечиваем, что только один процесс выполняет задачу
CREATE OR REPLACE FUNCTION run_daily_maintenance()
RETURNS VOID AS $
DECLARE
lock_key BIGINT := 999999; -- Уникальный ID для этой задачи
BEGIN
-- Пытаемся получить блокировку
IF NOT pg_try_advisory_lock(lock_key) THEN
RAISE NOTICE 'Maintenance is already running in another process';
RETURN;
END IF;

BEGIN
RAISE NOTICE 'Starting daily maintenance...';

-- VACUUM таблиц
VACUUM ANALYZE orders;
VACUUM ANALYZE customers;

-- Очистка старых данных
DELETE FROM logs WHERE created_at < NOW() - INTERVAL '90 days';

-- Обновление статистики
ANALYZE;

RAISE NOTICE 'Daily maintenance completed';

EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'Maintenance failed: %', SQLERRM;
FINALLY
-- Освобождаем блокировку
PERFORM pg_advisory_unlock(lock_key);
END;
END;
$ LANGUAGE plpgsql;

-- Можно вызывать из нескольких мест, выполнится только один раз
SELECT run_daily_maintenance();

Пример 3: Распределенная блокировка по пользователю

-- Предотвращаем одновременные операции одного пользователя
CREATE OR REPLACE FUNCTION process_user_action(
user_id INTEGER,
action_type VARCHAR
) RETURNS JSONB AS $
DECLARE
result JSONB;
BEGIN
-- Блокируем по user_id (namespace = 1000)
IF NOT pg_try_advisory_lock(1000, user_id) THEN
RETURN jsonb_build_object(
'success', false,
'message', 'Another operation is in progress for this user'
);
END IF;

BEGIN
-- Выполняем операцию
CASE action_type
WHEN 'withdraw' THEN
-- Снятие средств
UPDATE accounts
SET balance = balance - 100
WHERE account_id = user_id;

WHEN 'deposit' THEN
-- Пополнение
UPDATE accounts
SET balance = balance + 100
WHERE account_id = user_id;
END CASE;

result := jsonb_build_object(
'success', true,
'message', 'Operation completed successfully'
);

RETURN result;

EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object(
'success', false,
'message', SQLERRM
);
FINALLY
PERFORM pg_advisory_unlock(1000, user_id);
END;
END;
$ LANGUAGE plpgsql;

Пример 4: Rate Limiting с Advisory Locks

-- Ограничение частоты вызовов функции
CREATE TABLE rate_limit_state (
key TEXT PRIMARY KEY,
last_call TIMESTAMP,
call_count INTEGER
);

CREATE OR REPLACE FUNCTION rate_limited_operation(
operation_key TEXT,
max_calls_per_minute INTEGER DEFAULT 10
) RETURNS BOOLEAN AS $
DECLARE
lock_id BIGINT;
current_count INTEGER;
last_reset TIMESTAMP;
BEGIN
-- Генерируем числовой ID из текстового ключа
lock_id := hashtext(operation_key)::BIGINT;

-- Получаем блокировку
PERFORM pg_advisory_lock(lock_id);

BEGIN
-- Проверяем текущее состояние
SELECT call_count, last_call
INTO current_count, last_reset
FROM rate_limit_state
WHERE key = operation_key;

-- Сбрасываем счетчик если прошла минута
IF last_reset IS NULL OR
NOW() - last_reset > INTERVAL '1 minute' THEN
INSERT INTO rate_limit_state (key, last_call, call_count)
VALUES (operation_key, NOW(), 1)
ON CONFLICT (key) DO UPDATE
SET last_call = NOW(), call_count = 1;

PERFORM pg_advisory_unlock(lock_id);
RETURN TRUE;
END IF;

-- Проверяем лимит
IF current_count >= max_calls_per_minute THEN
PERFORM pg_advisory_unlock(lock_id);
RAISE NOTICE 'Rate limit exceeded for %', operation_key;
RETURN FALSE;
END IF;

-- Увеличиваем счетчик
UPDATE rate_limit_state
SET call_count = call_count + 1
WHERE key = operation_key;

PERFORM pg_advisory_unlock(lock_id);
RETURN TRUE;

EXCEPTION WHEN OTHERS THEN
PERFORM pg_advisory_unlock(lock_id);
RAISE;
END;
END;
$ LANGUAGE plpgsql;

-- Использование
SELECT rate_limited_operation('api_endpoint_xyz', 5);

Мониторинг блокировок

Просмотр текущих блокировок

-- Все активные блокировки
SELECT
locktype,
database,
relation::regclass AS table_name,
page,
tuple,
virtualxid,
transactionid,
mode,
granted,
pid
FROM pg_locks
WHERE NOT granted OR locktype = 'relation'
ORDER BY pid, locktype;

-- Блокировки таблиц
SELECT
pg_class.relname AS table_name,
pg_locks.locktype,
pg_locks.mode,
pg_locks.granted,
pg_stat_activity.pid,
pg_stat_activity.usename,
pg_stat_activity.query,
pg_stat_activity.state
FROM pg_locks
JOIN pg_class ON pg_locks.relation = pg_class.oid
LEFT JOIN pg_stat_activity ON pg_locks.pid = pg_stat_activity.pid
WHERE pg_class.relkind = 'r'
ORDER BY pg_class.relname, pg_locks.pid;

Поиск блокирующих запросов

-- Кто кого блокирует (подробно)
SELECT
blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocked_activity.query AS blocked_query,
blocked_activity.state AS blocked_state,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocking_activity.query AS blocking_query,
blocking_activity.state AS blocking_state,
blocked_activity.application_name AS blocked_app,
blocking_activity.application_name AS blocking_app
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity
ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity
ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

-- Упрощенная версия (PostgreSQL 9.6+)
SELECT
activity.pid,
activity.usename,
activity.query AS blocked_query,
blocking.pid AS blocking_pid,
blocking.query AS blocking_query,
NOW() - activity.query_start AS blocked_duration
FROM pg_stat_activity AS activity
JOIN pg_stat_activity AS blocking
ON blocking.pid = ANY(pg_blocking_pids(activity.pid))
WHERE activity.pid != blocking.pid;

Создание представления для мониторинга

-- Создаем удобное представление
CREATE OR REPLACE VIEW v_lock_monitor AS
SELECT
l.pid,
l.locktype,
l.relation::regclass AS relation,
l.mode,
l.granted,
a.usename,
a.application_name,
a.client_addr,
a.query,
a.state,
NOW() - a.query_start AS query_duration,
NOW() - a.state_change AS state_duration
FROM pg_locks l
LEFT JOIN pg_stat_activity a ON l.pid = a.pid
ORDER BY a.query_start;

-- Использование
SELECT * FROM v_lock_monitor WHERE NOT granted;

-- Долгие блокировки (> 1 минуты)
SELECT * FROM v_lock_monitor
WHERE query_duration > INTERVAL '1 minute'
ORDER BY query_duration DESC;

Мониторинг Advisory Locks

-- Просмотр активных advisory locks
SELECT
locktype,
classid,
objid,
objsubid,
virtualtransaction,
pid,
mode,
granted
FROM pg_locks
WHERE locktype IN ('advisory', 'advisory_xact');

-- С информацией о процессах
SELECT
l.locktype,
l.classid,
l.objid,
l.mode,
l.granted,
a.pid,
a.usename,
a.application_name,
a.query,
NOW() - a.query_start AS duration
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE l.locktype IN ('advisory', 'advisory_xact')
ORDER BY a.query_start;

Автоматическое логирование долгих блокировок

-- Настройка в postgresql.conf:
-- log_lock_waits = on
-- deadlock_timeout = 1s

-- Функция для периодической проверки
CREATE OR REPLACE FUNCTION log_long_locks()
RETURNS TABLE(
blocked_pid INTEGER,
blocked_query TEXT,
blocking_pid INTEGER,
blocking_query TEXT,
wait_duration INTERVAL
) AS $
BEGIN
RETURN QUERY
SELECT
activity.pid,
activity.query,
blocking.pid,
blocking.query,
NOW() - activity.state_change
FROM pg_stat_activity AS activity
JOIN pg_stat_activity AS blocking
ON blocking.pid = ANY(pg_blocking_pids(activity.pid))
WHERE activity.state = 'active'
AND NOW() - activity.state_change > INTERVAL '10 seconds';
END;
$ LANGUAGE plpgsql;

-- Периодический вызов (через pg_cron или внешний планировщик)
SELECT * FROM log_long_locks();

Оптимизация и Best Practices

Минимизация блокировок

-- 1. Используйте короткие транзакции
-- ПЛОХО:
BEGIN;
SELECT * FROM large_table FOR UPDATE;
-- Долгая обработка в приложении
-- ... 30 секунд ...
UPDATE large_table SET processed = true;
COMMIT;

-- ХОРОШО:
-- Обработка вне транзакции
-- ... обработка данных ...
BEGIN;
UPDATE large_table SET processed = true WHERE id = ?;
COMMIT;

-- 2. Блокируйте только необходимые строки
-- ПЛОХО:
BEGIN;
SELECT * FROM orders FOR UPDATE; -- Блокирует ВСЕ строки!
COMMIT;

-- ХОРОШО:
BEGIN;
SELECT * FROM orders
WHERE status = 'pending' AND customer_id = 123
FOR UPDATE;
COMMIT;

-- 3. Используйте индексы для блокировок
-- Без индекса: блокируется вся таблица для сканирования
CREATE INDEX idx_orders_status ON orders(status);

BEGIN;
SELECT * FROM orders WHERE status = 'pending' FOR UPDATE;
-- Теперь блокируются только найденные строки
COMMIT;

Избегание Lock Escalation

-- PostgreSQL не имеет автоматической эскалации блокировок,
-- но можно явно управлять уровнем блокировок

-- Для массовых операций: используйте LOCK TABLE
BEGIN;
LOCK TABLE orders IN EXCLUSIVE MODE;

-- Массовое обновление
UPDATE orders SET status = 'processed'
WHERE created_at < NOW() - INTERVAL '30 days';
COMMIT;

-- Альтернатива: батч-обработка
DO $
DECLARE
batch_size INTEGER := 1000;
affected_rows INTEGER;
BEGIN
LOOP
UPDATE orders
SET status = 'processed'
WHERE id IN (
SELECT id FROM orders
WHERE status = 'pending'
LIMIT batch_size
);

GET DIAGNOSTICS affected_rows = ROW_COUNT;
EXIT WHEN affected_rows = 0;

COMMIT;
-- Между батчами другие транзакции могут работать
END LOOP;
END $;

Использование правильного уровня изоляции

-- Для большинства операций: Read Committed
BEGIN; -- По умолчанию Read Committed
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
COMMIT;

-- Для аналитики: Repeatable Read (согласованный снимок)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT
DATE(created_at) as date,
COUNT(*) as orders,
SUM(amount) as total
FROM orders
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY DATE(created_at);
COMMIT;

-- Для критичных операций: Serializable
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Проверка и обновление с гарантией отсутствия конкурентных изменений
IF (SELECT balance FROM accounts WHERE account_id = 1) >= 100 THEN
UPDATE accounts SET balance = balance - 100 WHERE account_id = 1;
END IF;
COMMIT;

Паттерн: Lock-Free операции с CAS

-- Compare-And-Swap (CAS) для lock-free обновлений
CREATE TABLE counters (
counter_id INTEGER PRIMARY KEY,
value BIGINT NOT NULL,
version INTEGER NOT NULL DEFAULT 0
);

-- Функция increment с оптимистичной блокировкой
CREATE OR REPLACE FUNCTION increment_counter_cas(
p_counter_id INTEGER,
max_retries INTEGER DEFAULT 10
) RETURNS BIGINT AS $
DECLARE
current_value BIGINT;
current_version INTEGER;
new_value BIGINT;
retry_count INTEGER := 0;
rows_updated INTEGER;
BEGIN
LOOP
-- Читаем текущее значение
SELECT value, version INTO current_value, current_version
FROM counters
WHERE counter_id = p_counter_id;

new_value := current_value + 1;

-- Пытаемся обновить только если версия не изменилась
UPDATE counters
SET value = new_value,
version = version + 1
WHERE counter_id = p_counter_id
AND version = current_version;

GET DIAGNOSTICS rows_updated = ROW_COUNT;

IF rows_updated > 0 THEN
RETURN new_value;
END IF;

-- Версия изменилась, повторяем попытку
retry_count := retry_count + 1;
IF retry_count >= max_retries THEN
RAISE EXCEPTION 'Max retries exceeded for counter %', p_counter_id;
END IF;

-- Небольшая задержка перед повтором
PERFORM pg_sleep(0.001 * random());
END LOOP;
END;
$ LANGUAGE plpgsql;

-- Использование
SELECT increment_counter_cas(1);

Паттерн: Queue с SKIP LOCKED

-- Эффективная очередь задач без блокировок
CREATE TABLE task_queue (
task_id BIGSERIAL PRIMARY KEY,
task_type VARCHAR(50),
payload JSONB,
status VARCHAR(20) DEFAULT 'pending',
priority INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
worker_id TEXT
);

-- Индекс для эффективного поиска pending задач
CREATE INDEX idx_task_queue_pending
ON task_queue(priority DESC, created_at)
WHERE status = 'pending';

-- Функция для получения задачи воркером
CREATE OR REPLACE FUNCTION claim_next_task(
p_worker_id TEXT
) RETURNS TABLE(
task_id BIGINT,
task_type VARCHAR,
payload JSONB
) AS $
DECLARE
claimed_task RECORD;
BEGIN
-- Получаем первую доступную задачу
SELECT t.task_id, t.task_type, t.payload
INTO claimed_task
FROM task_queue t
WHERE t.status = 'pending'
ORDER BY t.priority DESC, t.created_at
LIMIT 1
FOR UPDATE SKIP LOCKED; -- Пропускаем заблокированные задачи

IF NOT FOUND THEN
RETURN; -- Нет доступных задач
END IF;

-- Помечаем задачу как выполняющуюся
UPDATE task_queue
SET status = 'processing',
started_at = NOW(),
worker_id = p_worker_id
WHERE task_queue.task_id = claimed_task.task_id;

RETURN QUERY
SELECT claimed_task.task_id, claimed_task.task_type, claimed_task.payload;
END;
$ LANGUAGE plpgsql;

-- Функция завершения задачи
CREATE OR REPLACE FUNCTION complete_task(
p_task_id BIGINT,
p_success BOOLEAN
) RETURNS VOID AS $
BEGIN
UPDATE task_queue
SET status = CASE WHEN p_success THEN 'completed' ELSE 'failed' END,
completed_at = NOW()
WHERE task_id = p_task_id;
END;
$ LANGUAGE plpgsql;

-- Использование в воркере
DO $
DECLARE
task RECORD;
BEGIN
-- Получаем задачу
SELECT * INTO task FROM claim_next_task('worker-1');

IF task.task_id IS NOT NULL THEN
-- Обрабатываем задачу
RAISE NOTICE 'Processing task %', task.task_id;

-- ... обработка ...

-- Завершаем задачу
PERFORM complete_task(task.task_id, true);
ELSE
RAISE NOTICE 'No tasks available';
END IF;
END $;

Диагностика проблем с блокировками

Автоматическое обнаружение deadlock

-- Скрипт для анализа логов deadlock
CREATE TABLE deadlock_history (
id SERIAL PRIMARY KEY,
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
victim_pid INTEGER,
victim_query TEXT,
blocker_pid INTEGER,
blocker_query TEXT,
details TEXT
);

-- Функция для записи информации о deadlock
CREATE OR REPLACE FUNCTION log_deadlock_info()
RETURNS VOID AS $
DECLARE
deadlock_info RECORD;
BEGIN
-- Получаем информацию о блокировках
FOR deadlock_info IN
SELECT
blocked.pid AS victim_pid,
blocked.query AS victim_query,
blocking.pid AS blocker_pid,
blocking.query AS blocker_query
FROM pg_stat_activity AS blocked
JOIN pg_stat_activity AS blocking
ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
WHERE blocked.wait_event_type = 'Lock'
LOOP
INSERT INTO deadlock_history (
victim_pid, victim_query, blocker_pid, blocker_query
) VALUES (
deadlock_info.victim_pid,
deadlock_info.victim_query,
deadlock_info.blocker_pid,
deadlock_info.blocker_query
);
END LOOP;
END;
$ LANGUAGE plpgsql;

Анализ паттернов блокировок

-- Статистика блокировок по таблицам
SELECT
schemaname,
tablename,
COUNT(*) as lock_count,
array_agg(DISTINCT mode) as lock_modes
FROM pg_locks l
JOIN pg_class c ON l.relation = c.oid
JOIN pg_stat_user_tables t ON c.relname = t.tablename
WHERE l.locktype = 'relation'
GROUP BY schemaname, tablename
ORDER BY lock_count DESC;

-- Самые заблокированные запросы
SELECT
query,
COUNT(*) as blocked_count,
AVG(EXTRACT(EPOCH FROM (NOW() - query_start))) as avg_wait_seconds
FROM pg_stat_activity
WHERE wait_event_type = 'Lock'
GROUP BY query
ORDER BY blocked_count DESC;

Инструмент для kill зависших блокировок

-- Функция для завершения долгих блокировок
CREATE OR REPLACE FUNCTION kill_long_blocking_queries(
p_max_duration INTERVAL DEFAULT '5 minutes'
) RETURNS TABLE(
killed_pid INTEGER,
killed_query TEXT,
blocking_duration INTERVAL
) AS $
DECLARE
blocking_query RECORD;
BEGIN
FOR blocking_query IN
SELECT DISTINCT
blocking.pid,
blocking.query,
NOW() - blocking.query_start AS duration
FROM pg_stat_activity AS blocked
JOIN pg_stat_activity AS blocking
ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
WHERE blocking.state = 'active'
AND NOW() - blocking.query_start > p_max_duration
LOOP
-- Завершаем блокирующий процесс
PERFORM pg_terminate_backend(blocking_query.pid);

RETURN QUERY SELECT
blocking_query.pid,
blocking_query.query,
blocking_query.duration;

RAISE NOTICE 'Killed blocking query PID % (duration: %)',
blocking_query.pid, blocking_query.duration;
END LOOP;
END;
$ LANGUAGE plpgsql;

-- Использование (осторожно!)
SELECT * FROM kill_long_blocking_queries('10 minutes');

Заключение

Правильное управление блокировками критически важно для производительности и надежности PostgreSQL приложений. Ключевые выводы:

  1. Понимайте типы блокировок — table-level, row-level, advisory
  2. Минимизируйте время удержания блокировок — короткие транзакции
  3. Соблюдайте порядок блокировки ресурсов — предотвращает deadlocks
  4. Используйте SKIP LOCKED для очередей задач
  5. Применяйте NOWAIT когда блокировка необязательна
  6. Мониторьте блокировки регулярно
  7. Используйте Advisory Locks для координации на уровне приложения
  8. Реализуйте retry логику для обработки deadlocks
  9. Выбирайте правильный уровень изоляции
  10. Тестируйте под нагрузкой — deadlocks часто проявляются только в production
Золотое правило

Лучший способ справиться с deadlocks — спроектировать приложение так, чтобы они не возникали. Упорядочивание доступа к ресурсам и короткие транзакции решают 90% проблем.

Дополнительные ресурсы