Войти
ПрограммированиеФорумОбщее

Сортировка однонаправленного списка за O(N log N) без рекурсии

Страницы: 1 2 3 4 Следующая »
#0
(Правка: 8:57) 8:31, 10 июля 2019

Как многие знают, однонаправленный список можно отсортировать за линейно-логарифмическое время, используя merge sort. Можно также легко нагуглить множество готовых реализаций этой сортировки. Но к моему удивлению я обнаружил, что большинство реализаций используют массивы с random access, те же немногие, что обходятся итераторами последовательного доступа, реализуют простую рекурсивную версию алгоритма, а те, что реализованы без рекурсии, почти всегда используют дополнительную память для хранения разбитых списков, и, наконец, оставшиеся полторы реализации на списках без рекурсии и за O(1) памяти, не вызвали у меня эстетического катарсиса от качества кода.

Короче, я написал свою — может, кому-то пригодится. Идея в том, что её можно гонять в шейдере для order-independent transparency для сортировки фрагментов без использования указателей и рекурсии на GPU. Точки хоть и лежат в массиве, но используются только как элементы списка, так как они не упорядочены. При желании переделать на любой другой тип указателей должно быть тривиально. Будет хорошо, если алгоритм кому-то покажется интересным, пригодится или будут комментарии, как улучшить мою реализацию:

using PointIndex = size_t;
struct Point
{
  float data;
  PointIndex next;
};

size_t GetListSize(Point *points, PointIndex head)
{
  size_t count = 0;
  PointIndex curr = head;
  while (curr != PointIndex(-1))
  {
    count++;
    curr = points[curr].next;
  }
  return count;
}

struct MergeResult
{
  PointIndex head;
  PointIndex tail;
};

MergeResult MergeLists(Point *points, PointIndex leftHead, PointIndex rightHead)
{
  assert(leftHead != PointIndex(-1) && rightHead != PointIndex(-1));
  PointIndex currLeft = leftHead;
  PointIndex currRight = rightHead;
  MergeResult res;
  if (points[currLeft].data < points[currRight].data)
  {
    res.head = currLeft;
    currLeft = points[currLeft].next;
  }
  else
  {
    res.head = currRight;
    currRight = points[currRight].next;
  }
  PointIndex currMerged = res.head;
  while (currLeft != PointIndex(-1) || currRight != PointIndex(-1))
  {
    //itCount++; //~10000 iterations for random 1000-element array
    if (currRight == PointIndex(-1) || (currLeft != PointIndex(-1) && points[currLeft].data < points[currRight].data))
    {
      points[currMerged].next = currLeft;
      currMerged = currLeft;
      currLeft = points[currLeft].next;
    }
    else
    {
      points[currMerged].next = currRight;
      currMerged = currRight;
      currRight = points[currRight].next;
    }
  }
  points[currMerged].next = PointIndex(-1);
  res.tail = currMerged;
  return res;
}

PointIndex SeparateList(Point *points, PointIndex head, size_t count)
{
  assert(head != PointIndex(-1));
  PointIndex curr = head;
  for (size_t i = 0; i < count - 1 && points[curr].next != PointIndex(-1); i++)
    curr = points[curr].next;
  PointIndex nextHead = points[curr].next;
  points[curr].next = PointIndex(-1);
  return nextHead;
}

PointIndex MergeSort(Point *points, PointIndex head)
{
  size_t count = GetListSize(points, head);
  for (size_t gap = 1; gap < count; gap *= 2)
  {
    PointIndex lastTail = PointIndex(-1);
    PointIndex curr = head;
    while (curr != PointIndex(-1))
    {
      PointIndex leftHead = curr;
      PointIndex rightHead = SeparateList(points, leftHead, gap);
      if (rightHead == PointIndex(-1))
        break;

      PointIndex nextHead = SeparateList(points, rightHead, gap);

      MergeResult mergeResult = MergeLists(points, leftHead, rightHead);
      assert(mergeResult.head != PointIndex(-1));
      assert(mergeResult.tail != PointIndex(-1));
      if (lastTail != PointIndex(-1))
        points[lastTail].next = mergeResult.head;
      points[mergeResult.tail].next = nextHead;
      lastTail = mergeResult.tail;
      if (curr == head)
        head = mergeResult.head;
      curr = nextHead;
    }
  }
  return head;
}
void PrintList(Point *points, PointIndex head)
{
  size_t index = 0;
  float last = -1.0f;
  for(PointIndex curr = head; curr != PointIndex(-1); curr = points[curr].next)
  {
    std::cout << "[" << index << "]: " << points[curr].data << "\n";
    if (last > points[curr].data)
      std::cout << "\n\nFAYYYUUUL\n\n";
    last = points[curr].data;
    index++;
  }
}
void TestSort()
{
  std::default_random_engine eng;
  std::uniform_real_distribution<float> dis(0.0f, 1.0f);

  std::vector<Point> points;
  for (size_t i = 0; i < 1000; i++)
  {
    if (points.size() > 0)
      points.back().next = points.size();
    Point point;
    point.data = dis(eng);
    point.next = PointIndex(-1);
    points.push_back(point);
  }
  size_t itCount = 0;
  PointIndex newHead = MergeSort(points.data(), PointIndex(0));
  PrintList(points.data(), newHead);
}

#1
10:11, 10 июля 2019

Suslik
> Идея в том, что её можно гонять в шейдере для order-independent transparency
> для сортировки фрагментов без использования указателей и рекурсии на GPU.
Интересно на скольки элементах начинается профит от такой сортировки. Я твою реализацию не пробовал конечно, но на собственной реализации даже на 30 элементах у меня пузырёк доминировал.

#2
11:22, 10 июля 2019

Suslik
> }
> else
> {
Не вижу никакого смысла отделять закрывашку от else

#3
11:23, 10 июля 2019

Если элементов больше 100-200, квадратичные сортировки уже не катят, обычно, задержки становятся ощутимыми даже на глаз.
Другое дело, что у пузыря есть свойство использования уже имеющегося порядка. На некоторых задачах он чуть ли не за линейное время работать может.

Логарифмические сортировки чаще всего требуют полного доступа к данным, а слияние не требует. Таким способом не то что связанные списки, магнитофонные ленты сортировали. Можно сортировать файлы, не затягивая их целиком в память.

Странно, что не встретились нормальные реализации. Вещь то хрестоматийная...

#4
(Правка: 12:00) 11:55, 10 июля 2019

MrShoor
Написал бабблсорт для сравнения:

PointIndex BubbleSort(Point *points, PointIndex head)
{
  bool wasChanged;
  do
  {
    PointIndex curr = head;
    PointIndex prev = PointIndex(-1);
    PointIndex next = points[head].next;
    wasChanged = false;
    while (next != PointIndex(-1)) 
    {
      if (points[curr].data > points[next].data)
      {
        wasChanged = true;

        if (prev != PointIndex(-1))
        {
          PointIndex tmp = points[next].next;

          points[prev].next = next;
          points[next].next = curr;
          points[curr].next = tmp;
        }
        else 
        {
          PointIndex tmp = points[next].next;

          head = next;
          points[next].next = curr;
          points[curr].next = tmp;
        }

        prev = next;
        next = points[curr].next;
      }
      else
      {
        prev = curr;
        curr = next;
        next = points[next].next;
      }
    }
  } while (wasChanged);
  return head;
}

код сравнения производительности:

void TestSort()
{
  std::default_random_engine eng;
  std::uniform_real_distribution<float> dis(0.0f, 1.0f);

  size_t pointsCount = 10;
  size_t trialsCount = 10000;

  std::vector<Point> points;
  for (size_t i = 0; i < pointsCount; i++)
  {
    if (points.size() > 0)
      points.back().next = points.size();
    Point point;
    point.data = dis(eng);
    point.next = PointIndex(-1);
    points.push_back(point);
  }
  using hrc = std::chrono::high_resolution_clock;


  std::vector<Point> bckpPoints = points;

  hrc::time_point t1 = hrc::now();
  float test1 = 0;
  for (int i = 0; i < trialsCount; i++)
  {
    points = bckpPoints;
    PointIndex newHead = BubbleSort(points.data(), PointIndex(0));
    test1 += points[newHead].data;
  }
  hrc::time_point t2 = hrc::now();
  float test2 = 0;
  for (int i = 0; i < trialsCount; i++)
  {
    points = bckpPoints;
    PointIndex newHead = MergeSort(points.data(), PointIndex(0));
    test2 += points[newHead].data;
  }
  hrc::time_point t3 = hrc::now();

  std::cout << "Bubble : " << std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count() << "us\n";
  std::cout << "Merge: " << std::chrono::duration_cast<std::chrono::microseconds>(t3 - t2).count() << "us\n";
  std::cout << "test1:" << test1 << " test2: " << test2 << "\n";
}
в коде профайлинга не учитывается время копирования массива, но оно для обеих сортировок одинаковое. короче, по моим результатам переломная точка — примерно 10 элементов. если меньше 10 элементов в массиве, то моя реализация бабблсорта быстрее. если больше, то мерджсорт быстрее. для 100 элементов MergeSort() быстрее BubbleSort() примерно в 8 раз, для 1000 элементов — в 30 раз. это на самом деле не очень хорошо, потому что мне важнее всего производительность именно в районе 10 элементов. может, кто-то знает более эффективную сортировку для этого случая?

#5
(Правка: 14:27) 14:25, 10 июля 2019

Жесть. Написал insertion sort, который оказался быстрее баблсорта примерно раза в 4-10 раз на данных порядка 10-50 элементов:

PointIndex SortedInsert(Point *points, PointIndex head, PointIndex newPoint)
{
  if (head == PointIndex(-1) || points[head].data >= points[newPoint].data)
  {
    points[newPoint].next = head;
    head = newPoint;
  }
  else
  {
    PointIndex curr = head;
    for (; points[curr].next != PointIndex(-1) && points[points[curr].next].data < points[newPoint].data; curr = points[curr].next);
    points[newPoint].next = points[curr].next;
    points[curr].next = newPoint;
  }
  return head;
}
PointIndex InsertionSort(Point *points, PointIndex head)
{
  PointIndex newHead = PointIndex(-1);
  PointIndex curr = head;
  while (curr != PointIndex(-1))
  {
    PointIndex next = points[curr].next;
    newHead = SortedInsert(points, newHead, curr);
    curr = next;
  }
  return newHead;
}
совершенно глупый алгоритм со сложностью O(N^2), заруливает merge sort примерно до 60 элементов и стабильно быстрее баблсорта в 4-10 раз. возможно, оптимальную производительность можно получить, если использовать insertion sort вместо первых этапов работы merge sort'а, но не факт, что это покатит на GPU, так как будет использовать больше регистров, хуже occupancy и всё такое.

#6
(Правка: 15:22) 15:22, 10 июля 2019

Suslik
> insertion sort
Конечно. В большинстве std библиотек на многих языках он используется для сортировки небольших массивов. И более того, он используется как финальная стадия сортировки подмассивов в квиксорте.

#7
19:10, 10 июля 2019

Suslik
> Жесть. Написал insertion sort, который оказался быстрее баблсорта примерно раза
> в 4-10 раз на данных порядка 10-50 элементов
Да, сортировка вставками будет еще быстрее бабла за счет того, что операций записи меньше. В пользу бабла могу сказать, что:
1. Последний элемент можно выкидывать из сортировки и сразу же обрабатывать
2. Выходить из сортировки для OIT можно раньше, т.к. на если 0.95% света ты уже "погасил" первыми 4-5 семплами, то отсортированы или нет остальные фрагменты - обычно уже не важно.

#8
(Правка: 19:43) 19:31, 10 июля 2019

Suslik
Вот, я сделялъ:

void BubbleSortWithWork(Point *points, PointIndex head, std::function<void(PointIndex)> process)
{
  while (true)
  {
    PointIndex curr = head;
    PointIndex prev = PointIndex(-1);
    PointIndex next = points[head].next;
    if (next == PointIndex(-1)) {
        process(head);
        break;
    }
    while (next != PointIndex(-1)) 
    {
      if (points[curr].data > points[next].data)
      {
        PointIndex nextnext = points[next].next;

        if (prev != PointIndex(-1))
          points[prev].next = next;
        else
          head = next;
        points[next].next = curr;
        points[curr].next = nextnext;

        prev = next;
        next = nextnext;
      }
      else
      {
        prev = curr;
        curr = next;
        next = points[next].next;
      }
    }
    points[prev].next = -1; //откусываем от списка curr элемент
    process(curr);
  };
}
#9
19:58, 10 июля 2019

Suslik
> но не факт, что это покатит на GPU
Может тогда надо тестировать на гпу?
Какой-нибудь OpenCL поддерживает С++ и можно сразу же запустить этот алгоритм.

#10
3:43, 11 июля 2019

/A\
> Может тогда надо тестировать на гпу?
ну так потестирую, как только будет, что, лол. изобретаю очередной GI.

9К720
> В большинстве std библиотек на многих языках он используется для сортировки
> небольших массивов.
как я уже сказал, на GPU специфика другая и чем больше регистров в алгоритме задействовано, тем меньше тредов могут исполняться одновременно. поэтому иногда более простой с точки зрения объёма кода алгоритм работает быстрее более сложного и вычислительно эффективного.

MrShoor
> 2. Выходить из сортировки для OIT можно раньше, т.к. на если 0.95% света ты уже
> "погасил" первыми 4-5 семплами, то отсортированы или нет остальные фрагменты -
> обычно уже не важно.
да, только у меня не OIT.

MrShoor
> Вот, я сделялъ:
потестил. на 10 элементах, вроде, процентов на 20 быстрее бабблсорта, но если не использовать функцию process(), то всё равно значительно медленнее insertion sort'а.

#11
(Правка: 7:15) 3:46, 11 июля 2019

новый вопрос
раз уж тут все собрались, то спрошу в этом треде, не создавая нового. требуется найти или придумать алгоритм, суть в следующем. имеется N отсортированных списков, каждый пусть будет по M элементов. требуется проитерироваться в порядке сортировки по всем NM элементов быстрее, чем за MN^2, используя не более чем O(N) памяти. сами списки менять (сливать, например), запрещено, нужно именно проитерироваться по результату их слияния без сохранения этого результата.

#12
5:18, 11 июля 2019

Suslik
> требуется проитерироваться по всем NM элементов быстрее, чем за MN^2, используя
> не более чем O(N) памяти.
Ничего не понял, тебе же явно не только проитерироваться надо, т.к. проитерироваться это вот:

for (int i = 0; i < N; i++) {
  int curr = heads[i];
  while (curr != -1) {
    curr = items[curr].next;
  }
}
Давай, договаривай.
#13
7:15, 11 июля 2019

MrShoor
> Ничего не понял, тебе же явно не только проитерироваться надо, т.к.
> проитерироваться это вот:
а, ну в смысле проитерироваться отсортированно. добавил.

#14
7:20, 11 июля 2019

Suslik

> совершенно глупый алгоритм со сложностью O(N^2), заруливает merge sort
Ну, это каждый должен для себя выяснить, когда начинает копаться в алгоритмах сортировки ;)
А ещё есть такаие вещи, как сети сортировки. На малых N они зарулят вставку. Я не шарю в шейдерах, но сети параллелятся (bitonic sort, например), и, возможно, на GPU это может что-то дать.

Страницы: 1 2 3 4 Следующая »
ПрограммированиеФорумОбщее