null

UPSERT в PostgreSQL

При реализации одной задачи в PostgreSQL понадобилось что-то похожее на UPSERT. Сама проблема достаточно известна: в таблицу надо вставить некий ключ со значением, а если этот ключ уже существует в таблице, то обновить данные по этому ключу. По указанной ссылке также есть пример реализации этого функционала в продуктивных версиях PostgreSQL. Если рассматривать простую тестовую таблицу вида:

CREATE TABLE test (
  id    INTEGER PRIMARY KEY,
  value TEXT
);

То необходимая функция будет выглядеть так:

CREATE OR REPLACE FUNCTION testfunc_1(key INT, data TEXT) RETURNS VOID AS
$$
BEGIN
    LOOP
        UPDATE test SET value = data WHERE id = key;
        IF found THEN
            RETURN;
        END IF;
        BEGIN
            INSERT INTO test(id, value) VALUES (key, data);
            RETURN;
        EXCEPTION WHEN unique_violation THEN
        END;
    END LOOP;
END;
$$
LANGUAGE plpgsql;

Но в этом примере "немного" смутило наличие цикла, который обеспечивает попадение данных в СУБД даже при работе нескольких параллельных потоков. Может можно сделать проще?

Первой была возникла идея использовать блокировку таблицы, которая исключает возможность возникнования ситуации, из-за которой сделан цикл и может возникнуть необходимость повторного выполнения UPDATE:

CREATE OR REPLACE FUNCTION testfunc_2(key INT, data TEXT) RETURNS VOID AS
$$
BEGIN
    LOCK TABLE test IN EXCLUSIVE MODE;
    UPDATE test SET value = data WHERE id = key;
    IF found THEN
        RETURN;
    END IF;
    INSERT INTO test(id, value) VALUES (key, data);
END;
$$
LANGUAGE plpgsql;

В этом случае в любом случае будет выполнено только два запроса на обновление данных, но используется полная блокировка таблицы на запись.

В качестве второго варианта переделки было предложено сначала выполнять INSERT и при возникновении исключения выполнять UPDATE:

CREATE OR REPLACE FUNCTION testfunc_3(key INT, data TEXT) RETURNS VOID AS
$$
BEGIN
    BEGIN
        INSERT INTO test(id, value) VALUES (key, data);
        RETURN;
    EXCEPTION WHEN unique_violation THEN
        UPDATE test SET value = data WHERE id = key;
    END;
END;
$$
LANGUAGE plpgsql;

В данном случае при отсутствии данных в таблице запрос должен выполниться быстрее, а выполнение запросов на обновление может оказаться медленее из-за возможно медленной обработки исключений.

А при наличии трёх вариантов решения проблемы встаёт закономерный вопрос - какой из них эффективнее? Для оценки оценки эффективности я не придумал ничего лучше, чем набросать простой код на C:

#include <stdio.h>
#include <stdarg.h>
#include <libpq-fe.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

#define CONNINFO "host=XXX user=XXX password=XXX dbname=XXX"
#define COUNT 512
#define MASK 0x3ff
#define THREADS 4

typedef struct {
        char *func;
        char value[2];
} params_t;

void *
thread(void *arg)
{
        PGconn *conn = PQconnectdb(CONNINFO);
        params_t *params = arg;
        char *value = strdup("");
        char *req, *s;
        int i;

        for (i = 0; i < COUNT; i++) {
                asprintf(&s, "%s%s", value, params->value);
                free(value);
                value = s;

                asprintf(&req, "SELECT %s(%li, '%s')",
                        params->func, random() & MASK, value);
                PQclear(PQexec(conn, req));
                free(req);
        }

        PQfinish(conn);
        free(value);

        return NULL;
}

int
main(int argc, char *argv[])
{
        params_t params[THREADS];
        pthread_t threads[THREADS];
        int i;

        PGconn *conn = PQconnectdb(CONNINFO);
        PQclear(PQexec(conn, "DELETE FROM test"));
        PQfinish(conn);

        srandom(1024);

        for (i = 0; i < THREADS; i++) {
                params[i].func = argv[1];
                params[i].value[0] = 'A' + i;
                params[i].value[1] = 0;

                pthread_create(threads + i, NULL, thread, params + i);
        }

        for (i = 0; i < THREADS; i++)
                pthread_join(threads[i], NULL);

        return 0;
}

Код сознательно упрощён, в нём нет проверок на ошибки и т.д., но написание правильного и красивого кода не являлось целью исследования. Маска была подобрана таким образом, чтобы примерно половина запросов к СУБД вызывала вставку новых данных, а половина - обновление уже вставленных.

Данный код был несколько раз запущен на двух разных системах и времена выполнения восьми последовательных запусков программы в секундах для каждой из описанных функций представлены в таблице:

  testfunc_1 testfunc_2 testfunc_3
Система 1, 4 потока, маска 3FF 92,6 (101%) 197,67 (215%) 91,77 (100%)
Система 2, 4 потока, маска 3FF 39,65 (101%) 84,49 (214%) 39,4 (100%)
Система 2, 8 потоков, маска 7FF 42,68 (100%) 165,97 (389%) 47,7 (112%)
Система 2, 4 потока, маска 1FF 40,4 (100%) 83,68 (207%) 41,64 (103%)
Система 2, 4 потока, маска 7FF 39,65 (100%) 80.85 (204%) 39,94 (101%)

В качестве эксперимента было увеличено количество выполняющихся потоков до восьми, что очень сильно замедлило вариант с глобальной блокировкой таблицы (testfunc_2), не оказав существенного влияния на время выполнения двух других вариантов. Также было уменьшено (маска 1FF) и увеличено (маска 7FF) соотношение количества INSERT-ов к количеству UPDATE-ов.

Результаты наглядно демонстрируют, что использование глобальной блокировки таблицы (testfunc_2) является наихудшим вариантом, что только подтверждает известную мысль о том, что блокировками должна заниматься СУБД, а не приложение.

А изложенный в документации вариант (testfunc_1) оказался действительно не плох, и его результаты и результаты более простого варианта с выполнением INSERT и без использования цикла (testfunc_3) расходятся минимально, что можно посчитать ошибкой эксперимента. Но при этом исходный вариант чуть менее читаем, чем предлагаемый в статье.

Допускаю, что каком-то другом характере нагрузки или иных факторах эта разница может оказаться больше, но для решавшейся задачи это не так принципиально.

Коротко о себе:

Работаю в компании Tune-IT и тьютором кафедры Вычислительной техники в СПбГУИТМО.

Очень люблю команду cat, core solaris и IPv6.