Войти
ПрограммированиеФорумОбщее

lockfree ring buffer

#0
7:48, 11 мая 2018

Я для тренировки написал сабж, но он не работает, помогите разобраться.

буфер работает нормально при отношениях количества потоков для записи и чтения - *:1, 1:1, 1:*. Но не работает для общего случая *:*
т.е. когда 2 потока пишут и 2 читают, то уже падает в ошибку доступа памяти, не могу понять где проблема.

Весь код приложения (upd: уже рабочий код)

#define _ENABLE_ATOMIC_ALIGNMENT_FIX

#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>

template<typename T>
class CircularBufferLockFree
{
private:
  struct BufferIdx1 { uint16_t NextRead, WriteCapture; };
  struct BufferIdx2 { uint16_t NextWrite, ReadCapture; };
  std::atomic<BufferIdx1> m_Idx1;
  std::atomic<BufferIdx2> m_Idx2;
  std::vector<T> m_Data;
public:
  CircularBufferLockFree(uint16_t size) :m_Idx1({ 0, 0 }), m_Idx2({ 0, 0 }), m_Data(size) {
    static_assert(m_Idx1.is_always_lock_free);
    static_assert(m_Idx2.is_always_lock_free);
  }

  bool try_push(const T& val)
  {
    BufferIdx1 oldIdx1 = m_Idx1.load(std::memory_order_relaxed), newIdx1;
    do {
      newIdx1.NextRead = oldIdx1.NextRead;
      newIdx1.WriteCapture = (oldIdx1.WriteCapture + 1) % m_Data.size();
      if (newIdx1.WriteCapture == newIdx1.NextRead)
        return false;
    } while (!m_Idx1.compare_exchange_weak(oldIdx1, newIdx1, std::memory_order_relaxed));
    m_Data[oldIdx1.WriteCapture] = val;
    std::atomic_thread_fence(std::memory_order_release);
    BufferIdx2 oldIdx2 = m_Idx2.load(std::memory_order_relaxed), newIdx2;
    newIdx2.NextWrite = newIdx1.WriteCapture;
    do {
      oldIdx2.NextWrite = oldIdx1.WriteCapture;
      newIdx2.ReadCapture = oldIdx2.ReadCapture;
    }
    while (!m_Idx2.compare_exchange_weak(oldIdx2, newIdx2, std::memory_order_relaxed));
    return true;
  }
  bool try_pop(T& rVal)
  {
    BufferIdx2 oldIdx2 = m_Idx2.load(std::memory_order_relaxed), newIdx2;
    do {
      if (oldIdx2.ReadCapture == oldIdx2.NextWrite)
        return false;
      newIdx2.NextWrite = oldIdx2.NextWrite;
      newIdx2.ReadCapture = (oldIdx2.ReadCapture + 1) % m_Data.size();
    } while (!m_Idx2.compare_exchange_weak(oldIdx2, newIdx2, std::memory_order_relaxed));
    std::atomic_thread_fence(std::memory_order_acquire);
    rVal = m_Data[oldIdx2.ReadCapture];
    std::atomic_thread_fence(std::memory_order_release);
    BufferIdx1 oldIdx1 = m_Idx1.load(std::memory_order_relaxed), newIdx1;
    newIdx1.NextRead = newIdx2.ReadCapture;
    do {
      oldIdx1.NextRead = oldIdx2.ReadCapture;
      newIdx1.WriteCapture = oldIdx1.WriteCapture;
    }
    while (!m_Idx1.compare_exchange_weak(oldIdx1, newIdx1, std::memory_order_relaxed));
    return true;
  }
};

struct SpinLock {
  std::atomic_flag m_flag;
  void lock() { while (m_flag.test_and_set(std::memory_order_acquire)); }
  void unlock() { m_flag.clear(std::memory_order_release); }
};
template<typename T>
class CircularBuffer
{
private:
  uint16_t m_NextRead, m_NextWrite;
  std::vector<T> m_Data;
  SpinLock m_mutex;
  //std::mutex m_mutex;
public:
  CircularBuffer(uint16_t size) :m_NextRead(0), m_NextWrite(0), m_Data(size) { }

  bool try_push(const T& val)
  {
    std::unique_lock<decltype(m_mutex)> lock(m_mutex);
    uint16_t newNextWrite = (m_NextWrite + 1) % m_Data.size();
    if (newNextWrite == m_NextRead) return false;
    m_Data[m_NextWrite] = val;
    m_NextWrite = newNextWrite;
    return true;
  }
  bool try_pop(T& rVal)
  {
    std::unique_lock<decltype(m_mutex)> lock(m_mutex);
    if (m_NextRead == m_NextWrite) return false;
    rVal = m_Data[m_NextRead];
    m_NextRead = (m_NextRead + 1) % m_Data.size();
    return true;
  }
};

class ThreadPool {
public:
  ThreadPool():m_IsContinue(false) { }
  ~ThreadPool() { stop(); }
  void start(uint32_t countthread, void(*Callback)()) {
    stop();
    m_IsContinue.store(true);
    m_Threads.reserve(countthread);
    for (size_t i = 0; i < countthread; ++i) {
      m_Threads.push_back(std::thread([this, Callback]() {
        while (m_IsContinue.load(std::memory_order_seq_cst)) Callback();
      }));
    }
  }
  void stop() {
    m_IsContinue.store(false);
    for (std::thread& t : m_Threads)
      t.join();
    m_Threads.clear();
  }
private:
  std::vector<std::thread> m_Threads;
  std::atomic<bool> m_IsContinue;
};

CircularBuffer<int*> g_buffer(50000);
std::atomic<size_t> g_Counter = 0;
std::chrono::steady_clock::time_point g_Start = std::chrono::high_resolution_clock::now();

void WorkThreadPush() {
  int * data = new int(0);
  if (g_buffer.try_push(data)) {
    size_t c;
    if ((c = g_Counter.fetch_add(1, std::memory_order_seq_cst)) % 1000000 == 0) {
      std::chrono::steady_clock::time_point End = std::chrono::high_resolution_clock::now();
      std::cout << (std::chrono::duration_cast<std::chrono::nanoseconds>(End - g_Start).count()/1000000.0) << std::endl;
      g_Start = End;
    }
  }
  else { delete data; /*std::this_thread::yield();*/ }
}
void WorkThreadPop() {
  int * data;
  if(g_buffer.try_pop(data)) {
    delete data;
    size_t c;
    if ((c = g_Counter.fetch_add(1, std::memory_order_seq_cst)) % 1000000 == 0) {
      std::chrono::steady_clock::time_point End = std::chrono::high_resolution_clock::now();
      std::cout << (std::chrono::duration_cast<std::chrono::nanoseconds>(End - g_Start).count() / 1000000.0) << std::endl;
      g_Start = End;
    }
  }
  else { /*std::this_thread::yield();*/ }
}

int main()
{
#ifndef NDEBUD
  _CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF);
  //_CrtSetBreakAlloc(16219);
#endif
  ThreadPool TPPush, TPPop;
  size_t CountPushThreads = 4, CountPopThreads = 4;
  TPPush.start(CountPushThreads, WorkThreadPush);
  TPPop.start(CountPopThreads, WorkThreadPop);
  std::cin.get();
  TPPush.stop();
  std::cin.get();
  return 0;
}


#1
10:24, 11 мая 2018

Не гарантирую, что это всё (в многопоточной среде вообще тяжело что-то гарантировать), но вот то, что я вижу:
Вообще, std::memory_order_relaxed - он на то и relaxed, что не накладывает никаких ограничений на порядок. Все перестановки, допустимые для обычных переменных, распространяются в том числе и на атомики с relaxed.
В твоём же коде он позволяет сделать вот такие преобразования

  bool try_push(const T& val)
  {
    // вынесли чтение за цикл - фейлим больше записей, чем нужно - некритично
    size_t nextRead = m_NextRead.load(std::memory_order_relaxed);
    size_t oldWriteCapture = m_WriteCapture.load(std::memory_order_relaxed), newWriteCapture;
    do {
      newWriteCapture = (oldWriteCapture + 1) % m_Data.size();
      if (newWriteCapture == nextRead)
        return false;
    } while (!m_WriteCapture.compare_exchange_weak(oldWriteCapture, newWriteCapture, std::memory_order_relaxed));
    // другие процессоры могут увидеть новую m_Data перед обновлением m_WriteCapture - некритично
    m_Data[oldWriteCapture] = val;
    std::atomic_thread_fence(std::memory_order_release);
    // перезаписывает свежие m_NextWrite с других потоков более старыми значениями
    // следующий try_push запишет более новый m_NextWrite, вернув потерянные элементы обратно
    // при неудачной деструкцией - потерянные элементы утекут
    while (!m_NextWrite.compare_exchange_weak(oldWriteCapture, newWriteCapture, std::memory_order_relaxed));
    return true;
  }
  bool try_pop(T& rVal)
  {
    // вынесли чтение за цикл - новые поступления станут видны только при следующем вызове
    size_t nextWrite = m_NextWrite.load(std::memory_order_relaxed);
    size_t oldReadCapture = m_ReadCapture.load(std::memory_order_relaxed), newReadCapture;
    do {
      if (oldReadCapture == nextWrite)
        return false;
      newReadCapture = (oldReadCapture + 1) % m_Data.size();
    } while (!m_ReadCapture.compare_exchange_weak(oldReadCapture, newReadCapture, std::memory_order_relaxed));
    std::atomic_thread_fence(std::memory_order_acquire);
    // операции не синхронизированы
    // освобождает слот прежде, чем закончится чтение! - критический баг
    // перезаписывает свежие m_NextRead с других потоков более старыми значениями
    // m_WriteCapture способен перескочить через откат m_NextRead! - критический баг
    while (!m_NextRead.compare_exchange_weak(oldReadCapture, newReadCapture, std::memory_order_relaxed));
    rVal = m_Data[oldReadCapture];
    return true;
  }
Если сделаешь корректную версию - замерь бенчмарком и сравни с вариантом, где вся структура закрывается одним мьютексом на std::atomic_flag. Есть вероятность, что с уменьшением числа atomic-операций весь код станет быстрее.

Дополнительно, если захочешь проверить по-хорошему - проверь на платформах помимо x86. Например, найди какой-нибудь компилятор для Андроида и запусти пример у себя на телефоне. Дело в том, что у x86 инструкции чтения и записи сами по себе уже обладают семантиками acquire/release (именно для использования этого факта атомики и ввели); тогда как у того же ARM - модель памяти более жидкая, и неправильно написанный код, по воле случая получивший нужные гарантии на x86, получит больше возможностей сломаться и показать свою истинную натуру на ARM.
Слабая модель так же и у PowerPC, но его ещё поискать надо - PlayStation 3 SDK у тебя вряд ли будет, а онлайн-компиляторов на PPC-серверах я не встречал.

#2
11:12, 11 мая 2018

Delfigamer
Вынос чтения m_NextRead и m_NextWrite не помогло.
> перезаписывает свежие m_NextWrite с других потоков более старыми значениями
> перезаписывает свежие m_NextRead с других потоков более старыми значениями
как это возможно если CAS в цикле перед заменой сравнивает их с oldReadCapture/oldWriteCapture которые были ранее зарезервированы
т.е. потоки должны по порядку инкрементить этот счетчик, а если это не удается то ждать в спин блокировке пока другой поток доинкрементит его до зарезервированного значения.
> m_WriteCapture способен перескочить через откат m_NextRead! - критический баг
вот это не понял, что за откат?

#3
11:26, 11 мая 2018

ashujon
> как это возможно если CAS в цикле перед заменой сравнивает их с oldReadCapture/oldWriteCapture которые были ранее зарезервированы
ах, я понял свою ошибку, compare_exchange_weak перезапиывает их
Исправил, но все равно не работает, хотя работает на треть секунды дольше :)

  bool try_push(const T& val)
  {
    std::unique_lock<std::mutex> lock(mutex1);
    size_t oldWriteCapture, newWriteCapture, NextRead = m_NextRead.load(std::memory_order_relaxed);
    oldWriteCapture = m_WriteCapture.load(std::memory_order_relaxed);
    do {
      newWriteCapture = (oldWriteCapture + 1) % m_Data.size();
      if (newWriteCapture == NextRead)
        return false;
    } while (!m_WriteCapture.compare_exchange_weak(oldWriteCapture, newWriteCapture, std::memory_order_relaxed));
    m_Data[oldWriteCapture] = val;
    std::atomic_thread_fence(std::memory_order_release);
    size_t nextWrite;
    do { nextWrite = oldWriteCapture; }
    while (!m_NextWrite.compare_exchange_weak(nextWrite, newWriteCapture, std::memory_order_relaxed));
    return true;
  }
  bool try_pop(T& rVal)
  {
    std::unique_lock<std::mutex> lock(mutex1);
    size_t oldReadCapture, newReadCapture, NextWrite = m_NextWrite.load(std::memory_order_relaxed);
    oldReadCapture = m_ReadCapture.load(std::memory_order_relaxed);
    do {
      if (oldReadCapture == NextWrite)
        return false;
      newReadCapture = (oldReadCapture + 1) % m_Data.size();
    } while (!m_ReadCapture.compare_exchange_weak(oldReadCapture, newReadCapture, std::memory_order_relaxed));
    std::atomic_thread_fence(std::memory_order_acquire);
    rVal = m_Data[oldReadCapture];
    size_t nextRead;
    do { nextRead = oldReadCapture; }
    while (!m_NextRead.compare_exchange_weak(nextRead, newReadCapture, std::memory_order_relaxed));
    return true;
  }
#4
12:19, 11 мая 2018

Все, разобрался, работает при 4 пишущих и 4 читающих потоках
Вынос чтения m_NextRead и m_NextWrite за цикл было ошибкой, в итоге old* переменные проскакивали его.
первоначальная и единственная ошибка была именно в
> перезаписывает свежие m_NextRead с других потоков более старыми значениями
> перезаписывает свежие m_NextWrite с других потоков более старыми значениями
так что спасибо.

#5
15:07, 11 мая 2018

Нет, не единственная.
Delfigamer
> // операции не синхронизированы
> // освобождает слот прежде, чем закончится чтение! - критический баг

#6
15:12, 11 мая 2018

Delfigamer
а как это исправить?

std::atomic_thread_fence(std::memory_order_acquire);
rVal = m_Data[oldReadCapture];
std::atomic_thread_fence(std::memory_order_release);
так?

#7
15:20, 11 мая 2018

и еще я рано обрадовался, оказывается чтение m_NextRead и m_NextWrite в цикле не достаточно, все равно происходит гонка.
Это проявляется когда буфер маленький, допустим 20 элементов, тогда спин-блокировки происходят достаточно долго чтобы проблема гонки проявилась.

#8
16:22, 11 мая 2018

решил проблему с гонкой, пришлось объединить пары переменных в общие атомик структуры, работает идеально, добавил код в нулевом посту
по поводу тестов получил такие результаты, работало 4 потока на вставку и 4 на удаление, время замерял на каждые 1000000 вставок/удалений

размер буфера 20:
    буфер на std::mutex
        ~620мс
    буфер на спин блокировке через std::atomic_flag
        ~420мс
    lockfree буфер
        ~170мс
размер буфера 50000:
    буфер на std::mutex
        ~75мс
    буфер на спин блокировке через std::atomic_flag
        ~320мс
    lockfree буфер
        ~170мс

в общем, никому не нужная фигня, удаляю проект и забываю как плохой сон.

#9
16:54, 11 мая 2018

Я бы сделал примерно вот так:

+ Показать

Основная идея в том, чтобы объявить ряд инвариантов - утверждений, который всегда должны быть верны - а затем строить алгоритмы исходя из них.

ПрограммированиеФорумОбщее

Тема в архиве.