Использование меток времени в Cassandra

Использование меток времени в Cassandra

24 января 2016 г.

Что такое метка времени или timestamp

Метка времени (timestamp) является составной частью колонки (column), также как имя колонки и её значение. Каждая операция модификации данных имеет свою метку времени.

Метка времени необходима для задания правильной последовательности операций изменения данных и определения наиболее актуальной версии данных.

Как работают метки времени

Операция изменения данных в кассандре - это всегда добавление новой колонки, старые “версии” данных никуда не исчезают, а только лишь становятся неактуальными, а актуальной считается колонка с самой большой меткой времени.

Кроме того, вспомним, что кассандра - это распределенная система и разные операции модификации данных могут быть обработаны разными нодами, причем в последовательности необязательно инициированной пользователем. Разные ноды в какой-то момент времени могут содержать разные версии одних и тех же данных (согласованность в конечном итоге) и для того, чтоб обеспечить строгий уровень согласованности  при чтении (скажем уровень ALL) кассандра опрашивает все ноды, на которых находятся данные, и выбирает колонку с самой большой меткой времени.

Наглядно последовательность операций представлена в таблице ниже:

key value timestamp
‘some key’ ‘value 1’ 0000001234
‘some key’ ‘value 2’ 0000001238
‘some key’ ‘value 3’ 0000001231
‘value 1’ ‘value 4’ 0000001235

Несмотря на то, что фактически данные запишутся в последовательности 1-2-3-4, но актуальной будет информация со значение ‘value 2’, так как ее метка времени имеет самое большое значение: 0000001238.

Как и когда использовать метки времени

Когда нужно сделать изменение одних и тех же данных за короткое время.

Может случиться так, что вам необходимо сделать несколько последовательных операций изменения одних и тех же данных за очень короткий промежуток времени.

В этом случае полезно задавать последовательность явно, используя последовательные метки времени для запросов.

INSERT INTO test_run(id, result) values (1, pass) USING TIMESTAMP 0000000000001234
INSERT INTO test_run(id, result) values (1, fail) USING TIMESTAMP 0000000000001235

Это гарантирует, что колонки запишутся в той последовательности, которую вы зададите метками времени. Иначе, Cassandra автоматически присвоит метки времени, но в силу сетевых задержек или в силу расхождения времени на узлах кассандры, может случиться, что последовательность будет нарушена и результат может оказаться неожиданным.

На практике, для генерации последовательностей таймстампов можно использовать класс AtomicMonotonicTimestampGenerator который поставляется вместе с Java драйвером или реализовать свой простой класс, который будет генерировать уникальные метки времени для каждого вызова:

import java.util.concurrent.atomic.AtomicLong;

/**
 * Thread safe time stamps sequence.
 */
public class TimeStampSequence {
    private static final AtomicLong LAST_TIMESTAMP = new AtomicLong();

    /**
     * Returns unique time stamp in milliseconds.
     *
     * @return unique timestamp in milliseconds
     */
    public static long nextTimeStamp() {
        long currentTime = System.currentTimeMillis();

        while (true) {
            long lastTimestamp = LAST_TIMESTAMP.get();
            if (lastTimestamp >= currentTime) {
                currentTime = lastTimestamp + 1;
            }

            if (LAST_TIMESTAMP.compareAndSet(lastTimestamp, currentTime)) {
                return currentTime;
            }
        }
    }
}

Используя класс генератор таймстампов можем явно задать последовательность операций:

import com.datastax.driver.core.Session;
/**
Number of microseconds per millisecond.
*/
public static final int MICROS_PER_MILLIS = 1000;
...
Session session = getSession();

session.execute("INSERT INTO my_table(id, data) values (1, 'Hello Vasya') " +
"USING TIMESTAMP " + TimeStampSequence.nextTimeStamp() * MICROS_PER_MILLIS);

session.execute("INSERT INTO my_table(id, data) values (1, 'Hello Petya') " +
"USING TIMESTAMP " + TimeStampSequence.nextTimeStamp() * MICROS_PER_MILLIS);

session.execute("INSERT INTO my_table(id, data) values (1, 'Hello Petya') " +
"USING TIMESTAMP " + TimeStampSequence.nextTimeStamp() * MICROS_PER_MILLIS);

Каждый таймстамп переводим из миллисекунд в микросекунды домножая на 1000, посколку кассандра ожидает именно микросекунды.

Вторая проблема - изменение одних данных из разных потоков (race condition)

Представим ситуацию:

Есть два потока которые одновременно работают с некоторой таблицей.

  1. Первый поток читает данные по определенному ключу и производит какие-то манипуляции с этими данными.
  2. Второй поток удаляет данные для этого же ключа
  3. Первый поток делает операцию записи обновленных данных по этому же ключу.

В результате, актуальным значением будут данные на момент 3, потому как они имеют большую метку времени. Но как мы могли обновить данные которых уже не существует? Во многих случаях это может сломать бизнес-логику приложения и нужен механизм чтобы, выполнить операцию обновления данных именно на момент времени когда данные еще существовали, до выполенния операции удаления.

  1. Первый поток в начале своей деятельности создает метку времени равную текущему моменту и читает данные, производит необходимые для логики манипуляции.
  2. Второй поток удаляет данные со свой  меткой времени
  3. Первый поток делает операцию записи с меткой времени полученной в пункте 1

Таким образом актуальным значением будут данные на момент пункта 2 (удаление), так как метка времени самая большая для этой операции.

Посмотрим как это выглядит в коде:

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RaceConditionOperations {

    /**
     * Returns new Update data operation.
     *
     * @return Runnable Update data operation
     */
    public Runnable newUpdateOperation() {

        return new Runnable() {
            @Override
            public void run() {
                final long operationTimeStamp = TimeStampSequence.nextTimeStamp();

                Session session = getSession();

                ResultSet results = session.execute("SELECT * FROM my_table WHERE id = 1;");

                String newValue = doSomethingWithResult(results);

                session.execute("INSERT INTO my_table(id, data) values (1, '" + newValue + "') " +
                        "USING TIMESTAMP " + operationTimeStamp * MICROS_PER_MILLIS);

            }
        };

    }

    /**
     * Creates new Delete data operation.
     *
     * @return Runnable Delete data operation.
     */
    public Runnable newDeleteOperation() {

        return new Runnable() {
            @Override
            public void run() {
                Session session = getSession();
                session.execute("DELETE FROM my_table USING TIMESTAMP " + TimeStampSequence.nextTimeStamp() * MICROS_PER_MILLIS + " WHERE id = 1;");
            }
        };
    }

    /**
     * Executes mutation operations asynchronously.
     */
    public void doAsyncOperations() {
        ExecutorService executor = Executors.newFixedThreadPool(NUMBER_THREADS);
        executor.submit(this.newUpdateOperation());
        executor.submit(this.newDeleteOperation());
    }
}

При выполнении Runnable объекта, созданного в newUpdateOperation, мы запоминаем время начала выполнения операции в operationTimeStamp и считаем, что все операции модификации данных в этом методе актуальны именно на этот момент времени, проще говоря, делаем INSERT с использованием этой метки времени. А операция удаления имеет свою метку времени. Таким образом, если операция удаления началась позже чем операция обновления, но, при этом удаление завершилось раньше чем обновление, то ничего страшного не произойдет - в базе мы будем иметь результат той операции, чья метка времени больше, в нашем случае это операция удаления.

Итак, манипуляции с метками времени могут быть весьма полезны, когда есть необходимость управлять последовательностью операций в Cassandra. Для того чтобы избежать путанницы в данных рекомендую всегда задавать таймстамп в клиентском коде, а не полагаться на автоматические метки сгенерированные Кассандрой, более того в версии Java драйвера и новой версии Cassandra протокола  метки времени по умолчанию генерируются самим драйвером. Но кроме меток по умолчаню, как мы сегодня узнали бвыает полезно явным образом указывать их значения.

И на последок несколько полезных ссылок: