Популярное
Свежее
Моя лента
Сообщения
Рейтинг
Пополнить Steam
Низкая комиссия
Темы
Игры
Гайды
Офтоп
Ночной музпостинг
Вопросы
Hollow Knight
Творчество
Музыка
Кино и сериалы
Инди
Показать все
DTF
О проекте
Правила
Реклама
Приложения
Аккаунт удален
26.07.2020

Статья удалена

Предисловие

На этой неделе я решил заняться оптимизацией методов, которые возвращают IAsyncEnumerable<T> в моей библиотеке. Использовать этот тип я начал недавно, да и он сам довольно молодой (впервые появился в preview-релизах .NET Core 3.0 в прошлом году). А тут появилось немного времени, да и интересно было, и я решил реализовать соответствующий метод.

В рамках данной "статьи" я поделюсь своим видением и реализацией параллельного исполнения кода в методе, возвращающем IAsyncEnumerable<T>. Я не утверждаю, что представленное мной решение верное или хорошее, скорее всего можно сделать лучше. Если у вас есть какие-то советы/предложения или я где-то ошибся (все-таки я недоджун и вполне возможно, что я просто в очередной раз позорюсь своим незнанием чего-либо) -- пишите в комментариях!

Проблема

Поначалу все казалось просто -- мне нужно ускорить метод-producer, который делает ДОЛГИЕ ВЕЩИ и асинхронно возвращает данные. Пример такого метода:

// Как это выглядит в не-асинхронном варианте private IEnumerable<int> ProduceInts() { for (int i = 0; i < 30; i++) { // Симулируем какую-то долгую работу Task.Delay(500).Wait(); yield return i; } } // Как это выглядит в асинхронном варианте private async IAsyncEnumerable<int> ProduceIntsAsync() { for (int i = 0; i < 30; i++) { // Симулируем какую-то долгую работу await Task.Delay(500); yield return i; // Или await ValueTask.FromResult(i) при особом желании } }

Логичным выходом для меня в такой ситуации всегда было использование Parallel.For/Parallel.ForEach или SemaphoreSlim. Однако, делать yield return через Parallel'ы невозможно, а код на SemaphoreSlim на мой взгляд не очень красив (и имеет определенные минусы).

Поиск в гугле не увенчался особым успехом. Было найдено интересное issue на GitHub в репозитории dotnet/runtime и несколько похожих статей/вопросов на StackOverflow, суть которых в параллелизации consumer'а. (К слову метод, IAsyncEnumerable.ForEachAsync доступен в nuget-пакете System.Linq.Async, а информация чуть новее, чем в вышеуказанном issue о IAsyncEnumerable.ParallelForEach написана в неплохой статье тут).

Решение?

И единственным решением для producer'а, которое я нашел в интернете, было решение с семафорами:

private async IAsyncEnumerable<int> ProduceIntsAsync() { using SemaphoreSlim semaphoreSlim = new SemaphoreSlim(Environment.ProcessorCount); HashSet<Task<int>> tasks = new HashSet<Task<int>>(); for (int i = 0; i < 30; i++) { await semaphoreSlim.WaitAsync(); int inner = i; async Task<int> ProduceInt() { // Симулируем какую-то долгую работу await Task.Delay(500); semaphoreSlim.Release(); return inner; } tasks.Add(ProduceInt()); } while (tasks.Count != 0) { using Task<int> task = await Task.WhenAny(tasks); tasks.Remove(task); yield return await task; } }

Но лично мне не нравится в вышеуказанном решении несколько вещей. Во-первых, выделение памяти на коллекцию Task'ов, коих может быть очень и очень много. Во-вторых, отсутствие возможности выбирать степень параллелизации автоматически (как это сделано в Parallel'ах; в данном случае я задал это число, равным Environment.ProcessorCount).

Что же делать, как реализовать это через Parallel.For? Потратив на бессмысленные попытки что-то сделать и думные думы весь день, я совершенно случайно вспомнил про еще одну замечательную фичу C# 8.0 -- Channels (если вдруг вам это понадобится -- вот хорошая статья для начинающих об использовании каналов в producer-consumer паттерне с примерами).

Я сразу вспомнил, что у возвращаемый тип у метода ChannelReader.ReadAllAsync -- IAsyncEnumerable, и в моей голове мгновенно возник дьявольский план -- использовать Parallel'ы с помощью каналов (лучше делать это просто через каналы и читать ChannelReader в consumer'е без IAsyncEnumerable, но это не является целью в данной статье)!

Первая попытка реализовать это дело была успешной по производительности, но имела один изъян -- лишний перебор готовой коллекции. Ра

private async Task ProduceIntsAsync(ChannelWriter<int> channelWriter) { // Симулируем какую-то долгую работу await Task.Delay(Delay); // Parallel не очень хорошо дружит с асинхронными методами внутри, поэтому вместо WriteAsync я использую TryWrite await Task.Run(() => Parallel.For(Min, Max, i => channelWriter.TryWrite(i))); } public async IAsyncEnumerable<int> ProduceIntsAsync() { Channel<int> channel = Channel.CreateUnbounded<int>(); using Task producer = ProduceIntsAsync(channel.Writer); await producer.ContinueWith(_ => channel.Writer.TryComplete()); await foreach (int i in channel.Reader.ReadAllAsync()) yield return i; }

Но решение нашлось быстро: достаточно возвращаться ChannelReader.ReadAllAsync() не делая метод асинхронным:

private async Task ProduceIntsAsync(ChannelWriter<int> channelWriter) { // Симулируем какую-то долгую работу await Task.Delay(Delay); // Parallel не очень хорошо дружит с асинхронными методами внутри, поэтому вместо WriteAsync я использую TryWrite await Task.Run(() => Parallel.For(Min, Max, i => channelWriter.TryWrite(i))); } private IAsyncEnumerable<int> ProduceInts() { Channel<int> channel = Channel.CreateUnbounded<int>(); Task producer = ProduceIntsAsync(channel.Writer); producer.ContinueWith(_ => channel.Writer.TryComplete()); return channel.Reader.ReadAllAsync(); } // Или так, если вам очень хочется await'ить Task снаружи и Dispose()'ить его, не хуже по производительности private IAsyncEnumerable<int> ProduceInts(out Task producer) { Channel<int> channel = Channel.CreateUnbounded<int>(); producer = ProduceIntsAsync(channel.Writer); producer.ContinueWith(_ => channel.Writer.TryComplete()).ConfigureAwait(false); return channel.Reader.ReadAllAsync(); }

Бенчмарки

Для написания бенчмарков я использовал библиотеку BenchmarkDotNet. Помимо производительности замеряем Allocated memory и Allocated native memory.

Было произведено два вида бенчмарков: для быстрого producer'а и для медленного.

Методы тестировались следующие:

  • IeDefault -- простой однопоточный producer, возвращающий IEnumerable<int>;
  • IaeDefaultAsync -- простой однопоточный асинхронный producer, возвращающий IAsyncEnumerable<int>;
  • IaeChannels -- не асинхронное решение с каналами (финальный вариант, предложенный в статье);
  • IaeChannelsAsync -- асинхронное решение с каналами (предпоследний вариант, предложенный в статье);
  • IaeSemaphoreAsync -- асинхронное решение с семафорами (первый вариант, предложенный в статье);

В цикле создавалось 30 элементов, задержка 100 мс, NUMBER_OF_PROCESSORS == 8. Консольное приложение собиралось и запускалось на последнем на (данный момент -- 7) preview-билде .NET 5.0.

BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19041.388 (2004/?/20H1) Intel Core i7-6700K CPU 4.00GHz (Skylake), 1 CPU, 8 logical and 4 physical cores .NET Core SDK=5.0.100-preview.7.20366.6 [Host] : .NET Core 5.0.0 (CoreCLR 5.0.20.36411, CoreFX 5.0.20.36411), X64 RyuJIT DefaultJob : .NET Core 5.0.0 (CoreCLR 5.0.20.36411, CoreFX 5.0.20.36411), X64 RyuJIT

Результаты для быстрого producer'а (сорри, что скриншотом):

Быстрый producer
Быстрый producer

Результаты для медленного producer'a:

Медленный producer<br />
Медленный producer

Итоги

Предложенный вариант использования Parallel с помощью каналов показывает себя хорошо как с точки производительности, так и с точки потребления памяти. Однако, стоит заметить, что ощутимый прирост вы получите только если у вас producer медленнее concumer'а.

Если кому-то будет интересен полный исходный код бенчмарков -- выложу позже на GitHub.

Теги: #программирование #csharp