Chapter 8: Async Streams & Streaming Responses (IAsyncEnumerable) - IndianTechnoEra
Latest update Android YouTube

Chapter 8: Async Streams & Streaming Responses (IAsyncEnumerable)

Introduction

Async streams allow your API to produce data gradually, instead of loading everything in memory at once.

This helps with:

  • Large datasets
  • Real-time updates
  • Log streaming
  • Progress streaming
  • Reducing memory pressure
  • Faster first-byte latency

8.1 What is IAsyncEnumerable<T>?

Most collections return all data at once:

IEnumerable<T>

Async streams return data one element at a time, asynchronously:

IAsyncEnumerable<T>

Comparison

Feature IEnumerable IAsyncEnumerable
Sync Yes No
Async No Yes
Suitable for I/O
Memory usage Large Minimal
Streaming No Yes

8.2 How Async Streams Work Internally

The await foreach loop consumes data as it becomes available.

Example:

await foreach (var item in GetNumbersAsync()) 
{ 
    Console.WriteLine(item); 
}

Execution:

  1. Fetch 1st item (await)
  2. Process
  3. Fetch next item
  4. Continue…

No need to load entire data in memory.

8.3 Implementing Your Own Async Stream

public async IAsyncEnumerable<int> GetNumbersAsync() 
{ 
    for (int i = 1; i <= 5; i++) 
    { 
        await Task.Delay(500); // simulate async work 
        yield return i; // stream element 
    } 
}

Consumer:

await foreach (var number in GetNumbersAsync()) 
{ 
    Console.WriteLine(number); 
}

8.4 Async Streaming in ASP.NET Core APIs

You can send streaming results directly to the client.

Controller Example

[HttpGet("stream/prices")] 
public async IAsyncEnumerable<decimal> StreamPrices() 
{ 
    for (int i = 0; i < 10; i++) 
    { 
        await Task.Delay(1000); 
        yield return Random.Shared.Next(100, 200); 
    } 
}

Client receives a stream of numbers like:

101
103
150
...

Benefits

  • Low memory usage
  • Client gets data progressively
  • Ideal for live dashboards, logs, events

8.5 Real-Life Example: Streaming Logs from Database

Repository

public async IAsyncEnumerable<LogEntry> StreamLogsAsync( 
    [EnumeratorCancellation] CancellationToken token) 
{ 
    await using var conn = new SqlConnection(_connString); 
    await conn.OpenAsync(token); 
    
    using var cmd = new SqlCommand("SELECT * FROM Logs ORDER BY Id", conn); 
    using var reader = await cmd.ExecuteReaderAsync(token); 
    
    while (await reader.ReadAsync(token)) 
    { 
        yield return new LogEntry 
        { 
            Id = reader.GetInt32(0), 
            Level = reader.GetString(1), 
            Message = reader.GetString(2) 
        }; 
    } 
}

Controller

[HttpGet("logs/stream")] 
public async IAsyncEnumerable<LogEntry> StreamLogsAsync( 
    CancellationToken token) 
{ 
    await foreach (var log in _repo.StreamLogsAsync(token)) 
    { 
        yield return log; 
    } 
}

📌 Perfect for:

  • Monitoring dashboards
  • Log viewers
  • Admin portals

8.6 Chunked HTTP Streaming

To stream non-JSON data, use Response.BodyWriter.

Example: Streaming Large File (Chunk-by-Chunk)

[HttpGet("download")] 
public async Task DownloadLargeFile(CancellationToken token) 
{ 
    var filePath = "very-large.zip"; 
    Response.ContentType = "application/octet-stream"; 
    
    await using var fs = System.IO.File.OpenRead(filePath); 
    var buffer = new byte[81920]; 
    int bytesRead; 
    
    while ((bytesRead = await fs.ReadAsync(buffer, token)) > 0) 
    { 
        await Response.Body.WriteAsync(buffer.AsMemory(0, bytesRead), token); 
        await Response.Body.FlushAsync(token); 
    } 
}

Benefits:

  • No memory explosion
  • Faster first-byte delivery
  • Ideal for large file downloads

8.7 Streaming With Minimal APIs

app.MapGet("/stream/time", async () => 
{ 
    async IAsyncEnumerable<string> Stream() 
    { 
        while (true) 
        { 
            yield return DateTime.UtcNow.ToString(); 
            await Task.Delay(1000); 
        } 
    } 
    return Stream(); 
});

Client receives:

2025-02-10T12:34:01Z
2025-02-10T12:34:02Z
...

8.8 Async Streams With CancellationToken

Cancellation is often required in streaming.

Example

public async IAsyncEnumerable<int> CountAsync( 
    [EnumeratorCancellation] CancellationToken token) 
{ 
    for (int i = 0; i < 100000; i++) 
    { 
        token.ThrowIfCancellationRequested(); 
        await Task.Delay(100, token); 
        yield return i; 
    } 
}

Controller

[HttpGet("count")] 
public IAsyncEnumerable<int> GetCount(CancellationToken token) 
{ 
    return CountAsync(token); 
}

8.9 Database Streaming vs Fetching Entire List

Approach Memory Usage Latency Scalability
ToListAsync() High First byte slow Medium
Async Stream Low First byte fast High

Async streaming is ideal when:

  • Dataset is very large
  • Client can consume partial data
  • Real-time updates required

8.10 Streaming JSON Lines (NDJSON)

Example: real-time event emitting API.

[HttpGet("events")] 
public async Task StreamEvents(CancellationToken token) 
{ 
    Response.ContentType = "application/x-ndjson"; 
    
    for (int i = 0; i < 100; i++) 
    { 
        var json = JsonSerializer.Serialize( 
            new { id = i, time = DateTime.UtcNow }); 
        
        await Response.WriteAsync(json + "\n", token); 
        await Response.Body.FlushAsync(token); 
        await Task.Delay(500); 
    } 
}

Client receives each JSON object as a separate line.

Used by:

  • Elasticsearch
  • Logging engines
  • Streaming pipelines

8.11 SignalR vs Async Streaming (When to Use What)

Feature Async Streams SignalR
HTTP-based Yes No
Real-time Good Excellent
Bidirectional No Yes
Scaling Medium High
Browser compatibility Standard Requires WebSockets/Fallback

Async streaming is great for:

  • Logs
  • Monitoring
  • Reporting
  • Streaming large datasets

SignalR is great for:

  • Chat apps
  • Real-time dashboards
  • Notifications

8.12 Memory Considerations (Critical)

Async streams:

  • Do not buffer entire dataset
  • Send entries as they are generated
  • Avoid large allocations

Worst thing you can do:

var list = await foreach { ... }; // NEVER accumulate entire stream

Streaming is about not storing everything.

8.13 Error Handling in Async Streams

Use try/catch inside generator, not consumer.

Example:

public async IAsyncEnumerable<LogEntry> StreamLogsAsync( 
    [EnumeratorCancellation] CancellationToken token) 
{ 
    try 
    { 
        // stream data 
    } 
    catch (Exception ex) 
    { 
        _logger.LogError(ex, "Streaming failed"); 
        yield break; 
    } 
}

8.14 Common Mistakes (And Fixes)

❌ Using ToListAsync() for huge datasets

Solution → Use streaming (yield return)

❌ Not adding [EnumeratorCancellation]

Solution → Always add the attribute

❌ Not flushing response

Solution → Use Response.Body.FlushAsync()

❌ Blocking calls inside stream

Solution → Always await

❌ Streaming sensitive or too much data

Solution → Apply pagination or chunk filtering

8.15 Best Practices Checklist

  • ✅ Use IAsyncEnumerable<T> for large datasets
  • ✅ Always pass CancellationToken
  • ✅ Use [EnumeratorCancellation] attribute
  • ✅ Flush periodically for real-time streaming
  • ✅ Avoid buffering data
  • ✅ Log errors inside stream, not in consumer
  • ✅ Use chunked encoding for large binary files
  • ✅ Use NDJSON for event streaming

8.16 EF Core Async Streaming with AsAsyncEnumerable()

The Critical Method: AsAsyncEnumerable()

// WRONG: Loads ALL data into memory first
public async Task<List<Product>> GetProductsAsync()
{
    return await _db.Products
        .Where(p => p.IsActive)
        .ToListAsync();
    // Entire dataset loaded into memory
}

// CORRECT: Streams data one by one
public async IAsyncEnumerable<Product> StreamProductsAsync(
    [EnumeratorCancellation] CancellationToken token)
{
    var query = _db.Products
        .Where(p => p.IsActive)
        .AsNoTracking()
        .AsAsyncEnumerable();
        // Returns IAsyncEnumerable<Product>
    
    await foreach (var product in query.WithCancellation(token))
    {
        yield return product;
    }
}

Performance Comparison: 1 Million Records

// Memory usage comparison
Method              | Memory Peak | Time to First Byte
------------------- | ----------- | ------------------
ToListAsync()       | 500 MB      | 5 seconds
AsAsyncEnumerable() | 50 KB       | 50 milliseconds

// ToListAsync(): Loads all, then sends
// AsAsyncEnumerable(): Streams as they're read from DB

EF Core Streaming Best Practices

public async IAsyncEnumerable<ProductDto> StreamProductsDtoAsync(
    [EnumeratorCancellation] CancellationToken token)
{
    // Project to DTO BEFORE streaming to reduce memory
    var query = _db.Products
        .Where(p => p.IsActive)
        .Select(p => new ProductDto
        {
            Id = p.Id,
            Name = p.Name,
            Price = p.Price
            // DON'T include navigation properties unless needed
        })
        .AsNoTracking()
        .AsAsyncEnumerable();
    
    await foreach (var dto in query.WithCancellation(token))
    {
        yield return dto;
        
        // Optional: Yield every N items to allow GC
        if (_itemCount++ % 1000 == 0)
            await Task.Yield();
    }
}

8.17 ConfigureAwait(false) with IAsyncEnumerable

The Problem: Async Streams and SynchronizationContext

// DANGER: Async streams capture SynchronizationContext
public async IAsyncEnumerable<string> StreamDataAsync()
{
    // Each "await foreach" iteration captures context
    await foreach (var item in GetItemsAsync())
    {
        // This runs on captured context (UI thread in WinForms/WPF)
        yield return item;
    }
}

Solution: ConfigureAwait(false) for Library Code

public async IAsyncEnumerable<string> StreamDataAsync(
    [EnumeratorCancellation] CancellationToken token)
{
    // ConfigureAwait(false) on each async operation
    await using var connection = await OpenConnectionAsync()
        .ConfigureAwait(false);
    
    await foreach (var item in GetItemsAsync()
        .WithCancellation(token)
        .ConfigureAwait(false))
    {
        yield return item;
        // Continue on ThreadPool, not captured context
    }
}

// Extension method for ConfigureAwait with IAsyncEnumerable
public static async IAsyncEnumerable<T> WithConfigureAwait<T>(
    this IAsyncEnumerable<T> source,
    bool continueOnCapturedContext = false)
{
    await foreach (var item in source.ConfigureAwait(continueOnCapturedContext))
    {
        yield return item;
    }
}

8.18 Backpressure Handling in Async Streams

The Problem: Producer/Consumer Speed Mismatch

// Fast producer, slow consumer = Memory pressure
public async IAsyncEnumerable<LogEntry> StreamLogsFastAsync()
{
    while (true)
    {
        var logs = await _logService.GetNewLogsAsync();
        foreach (var log in logs)
        {
            yield return log; // Produces faster than consumed
        }
        await Task.Delay(100); // Still too fast
    }
}

Solution: SemaphoreSlim for Rate Limiting

public async IAsyncEnumerable<LogEntry> StreamLogsWithBackpressureAsync(
    [EnumeratorCancellation] CancellationToken token)
{
    var semaphore = new SemaphoreSlim(initialCount: 10, maxCount: 10);
    
    while (!token.IsCancellationRequested)
    {
        // Wait for slot before producing
        await semaphore.WaitAsync(token);
        
        try
        {
            var logs = await _logService.GetNewLogsAsync(token);
            
            foreach (var log in logs)
            {
                yield return log;
                // Consumer controls pace by awaiting
            }
        }
        finally
        {
            // Signal that consumer has processed
            // This happens when consumer awaits next item
            // Actual implementation needs coordination
        }
        
        await Task.Delay(1000, token); // Rate limit
    }
}

Channel-Based Backpressure (Advanced)

public async IAsyncEnumerable<DataPoint> StreamWithChannelAsync(
    [EnumeratorCancellation] CancellationToken token)
{
    // Channel provides built-in backpressure
    var channel = Channel.CreateBounded<DataPoint>(
        new BoundedChannelOptions(1000) // Buffer capacity
        {
            FullMode = BoundedChannelFullMode.Wait // Block when full
        });
    
    // Producer task
    _ = Task.Run(async () =>
    {
        while (!token.IsCancellationRequested)
        {
            var data = await FetchDataAsync(token);
            await channel.Writer.WriteAsync(data, token);
        }
        channel.Writer.Complete();
    }, token);
    
    // Consumer (stream)
    await foreach (var item in channel.Reader.ReadAllAsync(token))
    {
        yield return item;
    }
}

8.19 Parallel Processing with Async Streams

Processing Items in Parallel Batches

public async IAsyncEnumerable<ProcessedItem> ProcessInParallelAsync(
    IAsyncEnumerable<RawItem> source,
    [EnumeratorCancellation] CancellationToken token,
    int maxDegreeOfParallelism = 4)
{
    var channel = Channel.CreateUnbounded<ProcessedItem>();
    
    // Parallel processor
    var processorTasks = Enumerable.Range(0, maxDegreeOfParallelism)
        .Select(_ => Task.Run(async () =>
        {
            await foreach (var item in source.WithCancellation(token))
            {
                var processed = await ProcessItemAsync(item, token);
                await channel.Writer.WriteAsync(processed, token);
            }
        }, token))
        .ToArray();
    
    // Close channel when all processors complete
    _ = Task.WhenAll(processorTasks)
        .ContinueWith(_ => channel.Writer.Complete());
    
    // Stream results as they complete
    await foreach (var result in channel.Reader.ReadAllAsync(token))
    {
        yield return result;
    }
}

Ordered Parallel Processing (Maintain Order)

public async IAsyncEnumerable<ProcessedItem> ProcessInParallelOrderedAsync(
    IAsyncEnumerable<RawItem> source,
    [EnumeratorCancellation] CancellationToken token)
{
    var index = 0;
    var completionQueue = new Dictionary<int, ProcessedItem>();
    
    await foreach (var batch in source.BufferAsync(100).WithCancellation(token))
    {
        var batchTasks = batch.Select(async (item, batchIndex) =>
        {
            var result = await ProcessItemAsync(item, token);
            return (batchIndex, result);
        });
        
        var batchResults = await Task.WhenAll(batchTasks);
        
        // Store in dictionary for ordering
        foreach (var (batchIdx, result) in batchResults)
        {
            var globalIndex = index + batchIdx;
            completionQueue[globalIndex] = result;
            
            // Yield in order
            while (completionQueue.TryGetValue(index, out var nextResult))
            {
                yield return nextResult;
                completionQueue.Remove(index);
                index++;
            }
        }
        
        index += batch.Count;
    }
}

8.20 Streaming Pagination for Very Large Datasets

The Problem: Offset/Limit Doesn't Scale

// BAD: Offset pagination for streaming
public async IAsyncEnumerable<Product> StreamProductsOffsetAsync(
    int pageSize = 1000,
    [EnumeratorCancellation] CancellationToken token)
{
    int offset = 0;
    
    while (true)
    {
        var page = await _db.Products
            .OrderBy(p => p.Id)
            .Skip(offset)
            .Take(pageSize)
            .ToListAsync(token);
        
        if (!page.Any()) break;
        
        foreach (var product in page)
        {
            yield return product;
        }
        
        offset += pageSize;
        
        // Problem: Skip gets slower as offset increases
        // O(offset) performance degradation
    }
}

Solution: Keyset Pagination (Seek Method)

public async IAsyncEnumerable<Product> StreamProductsKeysetAsync(
    [EnumeratorCancellation] CancellationToken token,
    int pageSize = 1000)
{
    int? lastId = null;
    
    while (!token.IsCancellationRequested)
    {
        var query = _db.Products
            .OrderBy(p => p.Id)
            .AsNoTracking();
        
        // Seek instead of Skip
        if (lastId.HasValue)
        {
            query = query.Where(p => p.Id > lastId.Value);
        }
        
        var page = await query
            .Take(pageSize)
            .ToListAsync(token);
        
        if (!page.Any()) break;
        
        foreach (var product in page)
        {
            yield return product;
        }
        
        lastId = page.Last().Id;
        
        // Give GC chance to collect
        if (page.Count == pageSize)
            await Task.Yield();
    }
}

8.21 Testing Async Streams

Unit Testing Async Stream Methods

[Fact]
public async Task StreamProductsAsync_ReturnsExpectedItems()
{
    // Arrange
    var mockData = new List<Product>
    {
        new Product { Id = 1, Name = "Product 1" },
        new Product { Id = 2, Name = "Product 2" }
    };
    
    var mockDb = new Mock<IProductRepository>();
    mockDb.Setup(r => r.StreamProductsAsync(It.IsAny<CancellationToken>()))
          .Returns(mockData.ToAsyncEnumerable());
    
    var service = new ProductService(mockDb.Object);
    
    // Act
    var results = new List<Product>();
    await foreach (var product in service.StreamProductsAsync(CancellationToken.None))
    {
        results.Add(product);
    }
    
    // Assert
    Assert.Equal(2, results.Count);
    Assert.Equal("Product 1", results[0].Name);
}

[Fact]
public async Task StreamProductsAsync_Cancels_WhenTokenCancelled()
{
    // Arrange
    var cts = new CancellationTokenSource();
    var service = new ProductService();
    
    // Act
    var streamTask = Task.Run(async () =>
    {
        var results = new List<Product>();
        await foreach (var item in service.StreamProductsAsync(cts.Token))
        {
            results.Add(item);
        }
        return results;
    });
    
    // Cancel immediately
    cts.Cancel();
    
    // Assert
    await Assert.ThrowsAsync<OperationCanceledException>(
        () => streamTask);
}

Integration Testing Streaming Endpoints

[Fact]
public async Task StreamEndpoint_ReturnsStreamingResponse()
{
    // Arrange
    var factory = new WebApplicationFactory<Program>();
    var client = factory.CreateClient();
    
    // Act
    var response = await client.GetAsync("/api/products/stream", 
        HttpCompletionOption.ResponseHeadersRead);
    
    // Assert streaming headers
    Assert.Equal("chunked", 
        response.Headers.TransferEncoding.ToString());
    
    // Read stream incrementally
    var stream = await response.Content.ReadAsStreamAsync();
    var reader = new StreamReader(stream);
    
    var firstLine = await reader.ReadLineAsync();
    Assert.NotNull(firstLine);
    
    // Verify JSON lines format
    var product = JsonSerializer.Deserialize<Product>(firstLine);
    Assert.NotNull(product);
}

8.22 Memory Diagnostics for Async Streams

Detecting Memory Leaks in Async Streams

public class StreamMemoryMonitor
{
    private readonly ILogger<StreamMemoryMonitor> _logger;
    private long _totalBytesAllocated;
    
    public async IAsyncEnumerable<T> MonitorStreamAsync<T>(
        IAsyncEnumerable<T> source,
        string streamName,
        [EnumeratorCancellation] CancellationToken token)
    {
        var initialMemory = GC.GetTotalMemory(forceFullCollection: false);
        
        await foreach (var item in source.WithCancellation(token))
        {
            var currentMemory = GC.GetTotalMemory(forceFullCollection: false);
            var delta = currentMemory - initialMemory;
            
            if (delta > 100 * 1024 * 1024) // 100 MB threshold
            {
                _logger.LogWarning(
                    "Stream {StreamName} allocated {DeltaBytes} bytes",
                    streamName, delta);
            }
            
            yield return item;
            
            // Force GC periodically for long streams
            if (_totalBytesAllocated++ % 10000 == 0)
            {
                GC.Collect(0, GCCollectionMode.Optimized);
            }
        }
    }
}

// Usage
await foreach (var item in _monitor.MonitorStreamAsync(
    _repo.StreamLargeDatasetAsync(token),
    "LargeDatasetStream",
    token))
{
    // Process item
}

Diagnostic Source for Async Streams

// Subscribe to async stream diagnostics
DiagnosticListener.AllListeners.Subscribe(new AsyncStreamObserver());

public class AsyncStreamObserver : IObserver<DiagnosticListener>
{
    public void OnNext(DiagnosticListener listener)
    {
        if (listener.Name == "Microsoft.AspNetCore.Http")
        {
            listener.Subscribe(new AsyncStreamEventsObserver());
        }
    }
    
    private class AsyncStreamEventsObserver : IObserver<KeyValuePair<string, object>>
    {
        public void OnNext(KeyValuePair<string, object> value)
        {
            if (value.Key == "Microsoft.AspNetCore.Http.Streaming")
            {
                var data = value.Value as HttpContext;
                _logger.LogInformation("Streaming started for {Path}", 
                    data.Request.Path);
            }
        }
    }
}

8.23 Async Streams with Compression

Compressing Async Stream Responses

[HttpGet("stream/compressed")]
public async Task StreamCompressedAsync(CancellationToken token)
{
    Response.Headers.Add("Content-Encoding", "gzip");
    Response.ContentType = "application/json";
    
    await using var gzipStream = new GZipStream(
        Response.Body, 
        CompressionLevel.Fastest, 
        leaveOpen: true);
    
    await using var writer = new StreamWriter(gzipStream);
    
    await foreach (var item in _repo.StreamLargeDatasetAsync(token))
    {
        var json = JsonSerializer.Serialize(item);
        await writer.WriteLineAsync(json);
        await writer.FlushAsync(); // Important for streaming
        
        // Yield to prevent buffer buildup
        await Task.Yield();
    }
}

Progressive Compression with Chunked Encoding

public async IAsyncEnumerable<byte[]> StreamWithCompressionAsync(
    IAsyncEnumerable<string> source,
    [EnumeratorCancellation] CancellationToken token)
{
    await using var compressedStream = new MemoryStream();
    await using var gzipStream = new GZipStream(
        compressedStream, 
        CompressionMode.Compress, 
        leaveOpen: true);
    
    await foreach (var data in source.WithCancellation(token))
    {
        var bytes = Encoding.UTF8.GetBytes(data + "\n");
        await gzipStream.WriteAsync(bytes, token);
        await gzipStream.FlushAsync(token);
        
        // Return compressed chunks as they're ready
        if (compressedStream.Length > 8192) // 8KB chunks
        {
            var chunk = compressedStream.ToArray();
            compressedStream.SetLength(0);
            yield return chunk;
        }
    }
    
    // Final chunk
    if (compressedStream.Length > 0)
    {
        yield return compressedStream.ToArray();
    }
}

Chapter 8 Summary

  • Async streams let APIs send data piece-by-piece
  • Reduces memory usage dramatically
  • Consumers get results much faster
  • Perfect for:
    • Logs
    • Events
    • Metrics
    • Large datasets
    • Real-time feeds
  • Supports cancellation tokens for safe streaming
  • Works with EF Core, ADO.NET, files, external APIs

✅ Chapter 8 (Deep Dive) is now complete.

Next chapter will take async to the next level:

👉 Chapter 9: Background Work & Async Processing

  • IHostedService / BackgroundService
  • Queued background jobs
  • Email/SMS processing
  • Long processing tasks without blocking requests
  • Reliable async job scheduling
  • Real-life example with queue + worker + API integration

Would you like me to continue with Chapter 9 (Deep Dive) next?

Post a Comment

Feel free to ask your query...
Cookie Consent
We serve cookies on this site to analyze traffic, remember your preferences, and optimize your experience.
Oops!
It seems there is something wrong with your internet connection. Please connect to the internet and start browsing again.
AdBlock Detected!
We have detected that you are using adblocking plugin in your browser.
The revenue we earn by the advertisements is used to manage this website, we request you to whitelist our website in your adblocking plugin.
Site is Blocked
Sorry! This site is not available in your country.