Introduction
Async streams allow your API to produce data gradually, instead of loading everything in memory at once.
This helps with:
- Large datasets
- Real-time updates
- Log streaming
- Progress streaming
- Reducing memory pressure
- Faster first-byte latency
8.1 What is IAsyncEnumerable<T>?
Most collections return all data at once:
IEnumerable<T>
Async streams return data one element at a time, asynchronously:
IAsyncEnumerable<T>
Comparison
| Feature | IEnumerable | IAsyncEnumerable |
|---|---|---|
| Sync | Yes | No |
| Async | No | Yes |
| Suitable for I/O | ❌ | ✅ |
| Memory usage | Large | Minimal |
| Streaming | No | Yes |
8.2 How Async Streams Work Internally
The await foreach loop consumes data as it becomes available.
Example:
await foreach (var item in GetNumbersAsync())
{
Console.WriteLine(item);
}
Execution:
- Fetch 1st item (await)
- Process
- Fetch next item
- Continue…
No need to load entire data in memory.
8.3 Implementing Your Own Async Stream
public async IAsyncEnumerable<int> GetNumbersAsync()
{
for (int i = 1; i <= 5; i++)
{
await Task.Delay(500); // simulate async work
yield return i; // stream element
}
}
Consumer:
await foreach (var number in GetNumbersAsync())
{
Console.WriteLine(number);
}
8.4 Async Streaming in ASP.NET Core APIs
You can send streaming results directly to the client.
Controller Example
[HttpGet("stream/prices")]
public async IAsyncEnumerable<decimal> StreamPrices()
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(1000);
yield return Random.Shared.Next(100, 200);
}
}
Client receives a stream of numbers like:
101
103
150
...
Benefits
- Low memory usage
- Client gets data progressively
- Ideal for live dashboards, logs, events
8.5 Real-Life Example: Streaming Logs from Database
Repository
public async IAsyncEnumerable<LogEntry> StreamLogsAsync(
[EnumeratorCancellation] CancellationToken token)
{
await using var conn = new SqlConnection(_connString);
await conn.OpenAsync(token);
using var cmd = new SqlCommand("SELECT * FROM Logs ORDER BY Id", conn);
using var reader = await cmd.ExecuteReaderAsync(token);
while (await reader.ReadAsync(token))
{
yield return new LogEntry
{
Id = reader.GetInt32(0),
Level = reader.GetString(1),
Message = reader.GetString(2)
};
}
}
Controller
[HttpGet("logs/stream")]
public async IAsyncEnumerable<LogEntry> StreamLogsAsync(
CancellationToken token)
{
await foreach (var log in _repo.StreamLogsAsync(token))
{
yield return log;
}
}
📌 Perfect for:
- Monitoring dashboards
- Log viewers
- Admin portals
8.6 Chunked HTTP Streaming
To stream non-JSON data, use Response.BodyWriter.
Example: Streaming Large File (Chunk-by-Chunk)
[HttpGet("download")]
public async Task DownloadLargeFile(CancellationToken token)
{
var filePath = "very-large.zip";
Response.ContentType = "application/octet-stream";
await using var fs = System.IO.File.OpenRead(filePath);
var buffer = new byte[81920];
int bytesRead;
while ((bytesRead = await fs.ReadAsync(buffer, token)) > 0)
{
await Response.Body.WriteAsync(buffer.AsMemory(0, bytesRead), token);
await Response.Body.FlushAsync(token);
}
}
Benefits:
- No memory explosion
- Faster first-byte delivery
- Ideal for large file downloads
8.7 Streaming With Minimal APIs
app.MapGet("/stream/time", async () =>
{
async IAsyncEnumerable<string> Stream()
{
while (true)
{
yield return DateTime.UtcNow.ToString();
await Task.Delay(1000);
}
}
return Stream();
});
Client receives:
2025-02-10T12:34:01Z
2025-02-10T12:34:02Z
...
8.8 Async Streams With CancellationToken
Cancellation is often required in streaming.
Example
public async IAsyncEnumerable<int> CountAsync(
[EnumeratorCancellation] CancellationToken token)
{
for (int i = 0; i < 100000; i++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(100, token);
yield return i;
}
}
Controller
[HttpGet("count")]
public IAsyncEnumerable<int> GetCount(CancellationToken token)
{
return CountAsync(token);
}
8.9 Database Streaming vs Fetching Entire List
| Approach | Memory Usage | Latency | Scalability |
|---|---|---|---|
| ToListAsync() | High | First byte slow | Medium |
| Async Stream | Low | First byte fast | High |
Async streaming is ideal when:
- Dataset is very large
- Client can consume partial data
- Real-time updates required
8.10 Streaming JSON Lines (NDJSON)
Example: real-time event emitting API.
[HttpGet("events")]
public async Task StreamEvents(CancellationToken token)
{
Response.ContentType = "application/x-ndjson";
for (int i = 0; i < 100; i++)
{
var json = JsonSerializer.Serialize(
new { id = i, time = DateTime.UtcNow });
await Response.WriteAsync(json + "\n", token);
await Response.Body.FlushAsync(token);
await Task.Delay(500);
}
}
Client receives each JSON object as a separate line.
Used by:
- Elasticsearch
- Logging engines
- Streaming pipelines
8.11 SignalR vs Async Streaming (When to Use What)
| Feature | Async Streams | SignalR |
|---|---|---|
| HTTP-based | Yes | No |
| Real-time | Good | Excellent |
| Bidirectional | No | Yes |
| Scaling | Medium | High |
| Browser compatibility | Standard | Requires WebSockets/Fallback |
Async streaming is great for:
- Logs
- Monitoring
- Reporting
- Streaming large datasets
SignalR is great for:
- Chat apps
- Real-time dashboards
- Notifications
8.12 Memory Considerations (Critical)
Async streams:
- Do not buffer entire dataset
- Send entries as they are generated
- Avoid large allocations
Worst thing you can do:
var list = await foreach { ... }; // NEVER accumulate entire stream
Streaming is about not storing everything.
8.13 Error Handling in Async Streams
Use try/catch inside generator, not consumer.
Example:
public async IAsyncEnumerable<LogEntry> StreamLogsAsync(
[EnumeratorCancellation] CancellationToken token)
{
try
{
// stream data
}
catch (Exception ex)
{
_logger.LogError(ex, "Streaming failed");
yield break;
}
}
8.14 Common Mistakes (And Fixes)
❌ Using ToListAsync() for huge datasets
Solution → Use streaming (yield return)
❌ Not adding [EnumeratorCancellation]
Solution → Always add the attribute
❌ Not flushing response
Solution → Use Response.Body.FlushAsync()
❌ Blocking calls inside stream
Solution → Always await
❌ Streaming sensitive or too much data
Solution → Apply pagination or chunk filtering
8.15 Best Practices Checklist
- ✅ Use IAsyncEnumerable<T> for large datasets
- ✅ Always pass CancellationToken
- ✅ Use [EnumeratorCancellation] attribute
- ✅ Flush periodically for real-time streaming
- ✅ Avoid buffering data
- ✅ Log errors inside stream, not in consumer
- ✅ Use chunked encoding for large binary files
- ✅ Use NDJSON for event streaming
8.16 EF Core Async Streaming with AsAsyncEnumerable()
The Critical Method: AsAsyncEnumerable()
// WRONG: Loads ALL data into memory first
public async Task<List<Product>> GetProductsAsync()
{
return await _db.Products
.Where(p => p.IsActive)
.ToListAsync();
// Entire dataset loaded into memory
}
// CORRECT: Streams data one by one
public async IAsyncEnumerable<Product> StreamProductsAsync(
[EnumeratorCancellation] CancellationToken token)
{
var query = _db.Products
.Where(p => p.IsActive)
.AsNoTracking()
.AsAsyncEnumerable();
// Returns IAsyncEnumerable<Product>
await foreach (var product in query.WithCancellation(token))
{
yield return product;
}
}
Performance Comparison: 1 Million Records
// Memory usage comparison
Method | Memory Peak | Time to First Byte
------------------- | ----------- | ------------------
ToListAsync() | 500 MB | 5 seconds
AsAsyncEnumerable() | 50 KB | 50 milliseconds
// ToListAsync(): Loads all, then sends
// AsAsyncEnumerable(): Streams as they're read from DB
EF Core Streaming Best Practices
public async IAsyncEnumerable<ProductDto> StreamProductsDtoAsync(
[EnumeratorCancellation] CancellationToken token)
{
// Project to DTO BEFORE streaming to reduce memory
var query = _db.Products
.Where(p => p.IsActive)
.Select(p => new ProductDto
{
Id = p.Id,
Name = p.Name,
Price = p.Price
// DON'T include navigation properties unless needed
})
.AsNoTracking()
.AsAsyncEnumerable();
await foreach (var dto in query.WithCancellation(token))
{
yield return dto;
// Optional: Yield every N items to allow GC
if (_itemCount++ % 1000 == 0)
await Task.Yield();
}
}
8.17 ConfigureAwait(false) with IAsyncEnumerable
The Problem: Async Streams and SynchronizationContext
// DANGER: Async streams capture SynchronizationContext
public async IAsyncEnumerable<string> StreamDataAsync()
{
// Each "await foreach" iteration captures context
await foreach (var item in GetItemsAsync())
{
// This runs on captured context (UI thread in WinForms/WPF)
yield return item;
}
}
Solution: ConfigureAwait(false) for Library Code
public async IAsyncEnumerable<string> StreamDataAsync(
[EnumeratorCancellation] CancellationToken token)
{
// ConfigureAwait(false) on each async operation
await using var connection = await OpenConnectionAsync()
.ConfigureAwait(false);
await foreach (var item in GetItemsAsync()
.WithCancellation(token)
.ConfigureAwait(false))
{
yield return item;
// Continue on ThreadPool, not captured context
}
}
// Extension method for ConfigureAwait with IAsyncEnumerable
public static async IAsyncEnumerable<T> WithConfigureAwait<T>(
this IAsyncEnumerable<T> source,
bool continueOnCapturedContext = false)
{
await foreach (var item in source.ConfigureAwait(continueOnCapturedContext))
{
yield return item;
}
}
8.18 Backpressure Handling in Async Streams
The Problem: Producer/Consumer Speed Mismatch
// Fast producer, slow consumer = Memory pressure
public async IAsyncEnumerable<LogEntry> StreamLogsFastAsync()
{
while (true)
{
var logs = await _logService.GetNewLogsAsync();
foreach (var log in logs)
{
yield return log; // Produces faster than consumed
}
await Task.Delay(100); // Still too fast
}
}
Solution: SemaphoreSlim for Rate Limiting
public async IAsyncEnumerable<LogEntry> StreamLogsWithBackpressureAsync(
[EnumeratorCancellation] CancellationToken token)
{
var semaphore = new SemaphoreSlim(initialCount: 10, maxCount: 10);
while (!token.IsCancellationRequested)
{
// Wait for slot before producing
await semaphore.WaitAsync(token);
try
{
var logs = await _logService.GetNewLogsAsync(token);
foreach (var log in logs)
{
yield return log;
// Consumer controls pace by awaiting
}
}
finally
{
// Signal that consumer has processed
// This happens when consumer awaits next item
// Actual implementation needs coordination
}
await Task.Delay(1000, token); // Rate limit
}
}
Channel-Based Backpressure (Advanced)
public async IAsyncEnumerable<DataPoint> StreamWithChannelAsync(
[EnumeratorCancellation] CancellationToken token)
{
// Channel provides built-in backpressure
var channel = Channel.CreateBounded<DataPoint>(
new BoundedChannelOptions(1000) // Buffer capacity
{
FullMode = BoundedChannelFullMode.Wait // Block when full
});
// Producer task
_ = Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
var data = await FetchDataAsync(token);
await channel.Writer.WriteAsync(data, token);
}
channel.Writer.Complete();
}, token);
// Consumer (stream)
await foreach (var item in channel.Reader.ReadAllAsync(token))
{
yield return item;
}
}
8.19 Parallel Processing with Async Streams
Processing Items in Parallel Batches
public async IAsyncEnumerable<ProcessedItem> ProcessInParallelAsync(
IAsyncEnumerable<RawItem> source,
[EnumeratorCancellation] CancellationToken token,
int maxDegreeOfParallelism = 4)
{
var channel = Channel.CreateUnbounded<ProcessedItem>();
// Parallel processor
var processorTasks = Enumerable.Range(0, maxDegreeOfParallelism)
.Select(_ => Task.Run(async () =>
{
await foreach (var item in source.WithCancellation(token))
{
var processed = await ProcessItemAsync(item, token);
await channel.Writer.WriteAsync(processed, token);
}
}, token))
.ToArray();
// Close channel when all processors complete
_ = Task.WhenAll(processorTasks)
.ContinueWith(_ => channel.Writer.Complete());
// Stream results as they complete
await foreach (var result in channel.Reader.ReadAllAsync(token))
{
yield return result;
}
}
Ordered Parallel Processing (Maintain Order)
public async IAsyncEnumerable<ProcessedItem> ProcessInParallelOrderedAsync(
IAsyncEnumerable<RawItem> source,
[EnumeratorCancellation] CancellationToken token)
{
var index = 0;
var completionQueue = new Dictionary<int, ProcessedItem>();
await foreach (var batch in source.BufferAsync(100).WithCancellation(token))
{
var batchTasks = batch.Select(async (item, batchIndex) =>
{
var result = await ProcessItemAsync(item, token);
return (batchIndex, result);
});
var batchResults = await Task.WhenAll(batchTasks);
// Store in dictionary for ordering
foreach (var (batchIdx, result) in batchResults)
{
var globalIndex = index + batchIdx;
completionQueue[globalIndex] = result;
// Yield in order
while (completionQueue.TryGetValue(index, out var nextResult))
{
yield return nextResult;
completionQueue.Remove(index);
index++;
}
}
index += batch.Count;
}
}
8.20 Streaming Pagination for Very Large Datasets
The Problem: Offset/Limit Doesn't Scale
// BAD: Offset pagination for streaming
public async IAsyncEnumerable<Product> StreamProductsOffsetAsync(
int pageSize = 1000,
[EnumeratorCancellation] CancellationToken token)
{
int offset = 0;
while (true)
{
var page = await _db.Products
.OrderBy(p => p.Id)
.Skip(offset)
.Take(pageSize)
.ToListAsync(token);
if (!page.Any()) break;
foreach (var product in page)
{
yield return product;
}
offset += pageSize;
// Problem: Skip gets slower as offset increases
// O(offset) performance degradation
}
}
Solution: Keyset Pagination (Seek Method)
public async IAsyncEnumerable<Product> StreamProductsKeysetAsync(
[EnumeratorCancellation] CancellationToken token,
int pageSize = 1000)
{
int? lastId = null;
while (!token.IsCancellationRequested)
{
var query = _db.Products
.OrderBy(p => p.Id)
.AsNoTracking();
// Seek instead of Skip
if (lastId.HasValue)
{
query = query.Where(p => p.Id > lastId.Value);
}
var page = await query
.Take(pageSize)
.ToListAsync(token);
if (!page.Any()) break;
foreach (var product in page)
{
yield return product;
}
lastId = page.Last().Id;
// Give GC chance to collect
if (page.Count == pageSize)
await Task.Yield();
}
}
8.21 Testing Async Streams
Unit Testing Async Stream Methods
[Fact]
public async Task StreamProductsAsync_ReturnsExpectedItems()
{
// Arrange
var mockData = new List<Product>
{
new Product { Id = 1, Name = "Product 1" },
new Product { Id = 2, Name = "Product 2" }
};
var mockDb = new Mock<IProductRepository>();
mockDb.Setup(r => r.StreamProductsAsync(It.IsAny<CancellationToken>()))
.Returns(mockData.ToAsyncEnumerable());
var service = new ProductService(mockDb.Object);
// Act
var results = new List<Product>();
await foreach (var product in service.StreamProductsAsync(CancellationToken.None))
{
results.Add(product);
}
// Assert
Assert.Equal(2, results.Count);
Assert.Equal("Product 1", results[0].Name);
}
[Fact]
public async Task StreamProductsAsync_Cancels_WhenTokenCancelled()
{
// Arrange
var cts = new CancellationTokenSource();
var service = new ProductService();
// Act
var streamTask = Task.Run(async () =>
{
var results = new List<Product>();
await foreach (var item in service.StreamProductsAsync(cts.Token))
{
results.Add(item);
}
return results;
});
// Cancel immediately
cts.Cancel();
// Assert
await Assert.ThrowsAsync<OperationCanceledException>(
() => streamTask);
}
Integration Testing Streaming Endpoints
[Fact]
public async Task StreamEndpoint_ReturnsStreamingResponse()
{
// Arrange
var factory = new WebApplicationFactory<Program>();
var client = factory.CreateClient();
// Act
var response = await client.GetAsync("/api/products/stream",
HttpCompletionOption.ResponseHeadersRead);
// Assert streaming headers
Assert.Equal("chunked",
response.Headers.TransferEncoding.ToString());
// Read stream incrementally
var stream = await response.Content.ReadAsStreamAsync();
var reader = new StreamReader(stream);
var firstLine = await reader.ReadLineAsync();
Assert.NotNull(firstLine);
// Verify JSON lines format
var product = JsonSerializer.Deserialize<Product>(firstLine);
Assert.NotNull(product);
}
8.22 Memory Diagnostics for Async Streams
Detecting Memory Leaks in Async Streams
public class StreamMemoryMonitor
{
private readonly ILogger<StreamMemoryMonitor> _logger;
private long _totalBytesAllocated;
public async IAsyncEnumerable<T> MonitorStreamAsync<T>(
IAsyncEnumerable<T> source,
string streamName,
[EnumeratorCancellation] CancellationToken token)
{
var initialMemory = GC.GetTotalMemory(forceFullCollection: false);
await foreach (var item in source.WithCancellation(token))
{
var currentMemory = GC.GetTotalMemory(forceFullCollection: false);
var delta = currentMemory - initialMemory;
if (delta > 100 * 1024 * 1024) // 100 MB threshold
{
_logger.LogWarning(
"Stream {StreamName} allocated {DeltaBytes} bytes",
streamName, delta);
}
yield return item;
// Force GC periodically for long streams
if (_totalBytesAllocated++ % 10000 == 0)
{
GC.Collect(0, GCCollectionMode.Optimized);
}
}
}
}
// Usage
await foreach (var item in _monitor.MonitorStreamAsync(
_repo.StreamLargeDatasetAsync(token),
"LargeDatasetStream",
token))
{
// Process item
}
Diagnostic Source for Async Streams
// Subscribe to async stream diagnostics
DiagnosticListener.AllListeners.Subscribe(new AsyncStreamObserver());
public class AsyncStreamObserver : IObserver<DiagnosticListener>
{
public void OnNext(DiagnosticListener listener)
{
if (listener.Name == "Microsoft.AspNetCore.Http")
{
listener.Subscribe(new AsyncStreamEventsObserver());
}
}
private class AsyncStreamEventsObserver : IObserver<KeyValuePair<string, object>>
{
public void OnNext(KeyValuePair<string, object> value)
{
if (value.Key == "Microsoft.AspNetCore.Http.Streaming")
{
var data = value.Value as HttpContext;
_logger.LogInformation("Streaming started for {Path}",
data.Request.Path);
}
}
}
}
8.23 Async Streams with Compression
Compressing Async Stream Responses
[HttpGet("stream/compressed")]
public async Task StreamCompressedAsync(CancellationToken token)
{
Response.Headers.Add("Content-Encoding", "gzip");
Response.ContentType = "application/json";
await using var gzipStream = new GZipStream(
Response.Body,
CompressionLevel.Fastest,
leaveOpen: true);
await using var writer = new StreamWriter(gzipStream);
await foreach (var item in _repo.StreamLargeDatasetAsync(token))
{
var json = JsonSerializer.Serialize(item);
await writer.WriteLineAsync(json);
await writer.FlushAsync(); // Important for streaming
// Yield to prevent buffer buildup
await Task.Yield();
}
}
Progressive Compression with Chunked Encoding
public async IAsyncEnumerable<byte[]> StreamWithCompressionAsync(
IAsyncEnumerable<string> source,
[EnumeratorCancellation] CancellationToken token)
{
await using var compressedStream = new MemoryStream();
await using var gzipStream = new GZipStream(
compressedStream,
CompressionMode.Compress,
leaveOpen: true);
await foreach (var data in source.WithCancellation(token))
{
var bytes = Encoding.UTF8.GetBytes(data + "\n");
await gzipStream.WriteAsync(bytes, token);
await gzipStream.FlushAsync(token);
// Return compressed chunks as they're ready
if (compressedStream.Length > 8192) // 8KB chunks
{
var chunk = compressedStream.ToArray();
compressedStream.SetLength(0);
yield return chunk;
}
}
// Final chunk
if (compressedStream.Length > 0)
{
yield return compressedStream.ToArray();
}
}
Chapter 8 Summary
- Async streams let APIs send data piece-by-piece
- Reduces memory usage dramatically
- Consumers get results much faster
- Perfect for:
- Logs
- Events
- Metrics
- Large datasets
- Real-time feeds
- Supports cancellation tokens for safe streaming
- Works with EF Core, ADO.NET, files, external APIs
✅ Chapter 8 (Deep Dive) is now complete.
Next chapter will take async to the next level:
👉 Chapter 9: Background Work & Async Processing
- IHostedService / BackgroundService
- Queued background jobs
- Email/SMS processing
- Long processing tasks without blocking requests
- Reliable async job scheduling
- Real-life example with queue + worker + API integration
Would you like me to continue with Chapter 9 (Deep Dive) next?