Chapter 10: Advanced Concurrency & Caching - Building High-Performance Systems
Series: Low Level Design for .NET Developers | Previous: Chapter 9: Case Study - Splitwise
📖 Introduction
Modern applications face two critical challenges: handling concurrent operations and managing data access performance. This chapter covers advanced techniques for:
- Managing concurrent access to shared resources
- Preventing race conditions and deadlocks
- Implementing efficient caching strategies
- Distributed locking and synchronization
- Optimistic vs pessimistic concurrency control
- Cache invalidation patterns
1. Concurrency Fundamentals
1.1 Understanding Race Conditions
A race condition occurs when multiple threads access shared data simultaneously, leading to unpredictable results.
// ❌ Race Condition Example
public class BankAccount
{
public decimal Balance { get; private set; }
public void Deposit(decimal amount)
{
// Race condition: multiple threads can read and write simultaneously
var current = Balance;
var newBalance = current + amount;
Balance = newBalance; // Lost update possible
}
}
// ✅ Thread-Safe Version
public class ThreadSafeBankAccount
{
private decimal _balance;
private readonly object _lock = new object();
public decimal Balance
{
get
{
lock (_lock)
{
return _balance;
}
}
}
public void Deposit(decimal amount)
{
lock (_lock)
{
_balance += amount;
}
}
public void Withdraw(decimal amount)
{
lock (_lock)
{
if (_balance >= amount)
_balance -= amount;
else
throw new InvalidOperationException("Insufficient funds");
}
}
}
1.2 Thread-Safe Collections
public class ThreadSafeCache
{
// ConcurrentDictionary is thread-safe
private readonly ConcurrentDictionary<string, object> _cache = new();
// BlockingCollection for producer-consumer scenarios
private readonly BlockingCollection<WorkItem> _workQueue = new();
// ConcurrentBag for unordered collections
private readonly ConcurrentBag<Task> _tasks = new();
// ConcurrentQueue for FIFO operations
private readonly ConcurrentQueue<string> _messageQueue = new();
// ConcurrentStack for LIFO operations
private readonly ConcurrentStack<string> _undoStack = new();
public T GetOrAdd<T>(string key, Func<string, T> valueFactory)
{
return (T)_cache.GetOrAdd(key, k => valueFactory(k));
}
public bool TryRemove(string key, out object value)
{
return _cache.TryRemove(key, out value);
}
public void EnqueueWork(WorkItem item)
{
_workQueue.Add(item);
}
public async Task ProcessWorkAsync(CancellationToken cancellationToken)
{
foreach (var item in _workQueue.GetConsumingEnumerable(cancellationToken))
{
await ProcessItemAsync(item);
}
}
}
2. Synchronization Primitives
2.1 Lock Statement and Monitor
public class AccountManager
{
private readonly object _balanceLock = new object();
private readonly object _transactionLock = new object();
private decimal _balance;
private List<Transaction> _transactions = new();
public void Transfer(AccountManager target, decimal amount)
{
// Avoid deadlock by always acquiring locks in the same order
object lock1 = this._balanceLock;
object lock2 = target._balanceLock;
if (lock1.GetHashCode() > lock2.GetHashCode())
{
(lock1, lock2) = (lock2, lock1);
}
lock (lock1)
{
lock (lock2)
{
if (_balance >= amount)
{
_balance -= amount;
target._balance += amount;
}
}
}
}
public void AddTransaction(Transaction transaction)
{
// Use Monitor.TryEnter with timeout
if (Monitor.TryEnter(_transactionLock, TimeSpan.FromSeconds(5)))
{
try
{
_transactions.Add(transaction);
}
finally
{
Monitor.Exit(_transactionLock);
}
}
else
{
throw new TimeoutException("Could not acquire lock");
}
}
}
2.2 ReaderWriterLockSlim
public class ThreadSafeDataStore<T>
{
private readonly ReaderWriterLockSlim _lock = new();
private readonly Dictionary<string, T> _data = new();
private DateTime _lastReadTime;
private DateTime _lastWriteTime;
public T Read(string key)
{
_lock.EnterReadLock();
try
{
_lastReadTime = DateTime.UtcNow;
return _data.TryGetValue(key, out var value) ? value : default;
}
finally
{
_lock.ExitReadLock();
}
}
public void Write(string key, T value)
{
_lock.EnterWriteLock();
try
{
_data[key] = value;
_lastWriteTime = DateTime.UtcNow;
}
finally
{
_lock.ExitWriteLock();
}
}
public T ReadUpgradable(string key)
{
_lock.EnterUpgradeableReadLock();
try
{
if (_data.TryGetValue(key, out var value))
return value;
// Upgrade to write lock
_lock.EnterWriteLock();
try
{
value = FetchFromDatabase(key);
_data[key] = value;
return value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private T FetchFromDatabase(string key)
{
// Simulate database fetch
return default;
}
}
2.3 SemaphoreSlim for Resource Pooling
public class DatabaseConnectionPool : IDisposable
{
private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentBag<DbConnection> _connections;
private readonly int _maxConnections;
public DatabaseConnectionPool(int maxConnections)
{
_maxConnections = maxConnections;
_semaphore = new SemaphoreSlim(maxConnections, maxConnections);
_connections = new ConcurrentBag<DbConnection>();
// Pre-create connections
for (int i = 0; i < maxConnections; i++)
{
_connections.Add(CreateConnection());
}
}
public async Task<PooledConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken);
if (_connections.TryTake(out var connection))
{
return new PooledConnection(connection, this);
}
// Fallback - create new connection (should not happen with proper semaphore)
connection = CreateConnection();
return new PooledConnection(connection, this);
}
private void ReturnConnection(DbConnection connection)
{
_connections.Add(connection);
_semaphore.Release();
}
private DbConnection CreateConnection()
{
// Create actual database connection
return new SqlConnection("connection_string");
}
public void Dispose()
{
foreach (var connection in _connections)
{
connection?.Dispose();
}
_semaphore.Dispose();
}
}
public class PooledConnection : IDisposable
{
private readonly DbConnection _connection;
private readonly DatabaseConnectionPool _pool;
private bool _disposed;
public PooledConnection(DbConnection connection, DatabaseConnectionPool pool)
{
_connection = connection;
_pool = pool;
}
public DbConnection Connection => _connection;
public void Dispose()
{
if (!_disposed)
{
_pool.ReturnConnection(_connection);
_disposed = true;
}
}
}
2.4 AsyncLocal for Context Propagation
public class CorrelationContext
{
private static readonly AsyncLocal<CorrelationContext> _current = new();
public string CorrelationId { get; set; }
public string UserId { get; set; }
public string RequestPath { get; set; }
public DateTime StartTime { get; set; }
public static CorrelationContext Current
{
get => _current.Value;
set => _current.Value = value;
}
public static IDisposable BeginScope(string correlationId, string userId)
{
var original = Current;
Current = new CorrelationContext
{
CorrelationId = correlationId,
UserId = userId,
StartTime = DateTime.UtcNow
};
return new DisposableScope(() => Current = original);
}
private class DisposableScope : IDisposable
{
private readonly Action _onDispose;
public DisposableScope(Action onDispose)
{
_onDispose = onDispose;
}
public void Dispose()
{
_onDispose?.Invoke();
}
}
}
// Middleware to set correlation context
public class CorrelationMiddleware
{
private readonly RequestDelegate _next;
public CorrelationMiddleware(RequestDelegate next)
{
_next = next;
}
public async Task InvokeAsync(HttpContext context)
{
var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault()
?? Guid.NewGuid().ToString();
using (CorrelationContext.BeginScope(correlationId, context.User?.Identity?.Name))
{
context.Response.Headers["X-Correlation-ID"] = correlationId;
await _next(context);
}
}
}
3. Optimistic vs Pessimistic Concurrency
3.1 Optimistic Concurrency with EF Core
public class Product
{
public int Id { get; set; }
public string Name { get; set; }
public int Quantity { get; set; }
// Concurrency token - tracks row version
public byte[] RowVersion { get; set; }
}
public class InventoryService
{
private readonly AppDbContext _context;
private readonly ILogger<InventoryService> _logger;
public InventoryService(AppDbContext context, ILogger<InventoryService> logger)
{
_context = context;
_logger = logger;
}
public async Task<bool> DeductStockAsync(int productId, int quantity, int retryCount = 3)
{
for (int attempt = 1; attempt <= retryCount; attempt++)
{
try
{
var product = await _context.Products.FindAsync(productId);
if (product.Quantity < quantity)
return false;
product.Quantity -= quantity;
await _context.SaveChangesAsync();
return true;
}
catch (DbUpdateConcurrencyException ex)
{
_logger.LogWarning(ex, "Concurrency conflict on attempt {Attempt}", attempt);
// Reload and retry
foreach (var entry in ex.Entries)
{
await entry.ReloadAsync();
}
if (attempt == retryCount)
{
_logger.LogError("Failed to update after {RetryCount} attempts", retryCount);
throw;
}
}
}
return false;
}
}
3.2 Optimistic Concurrency with Version Number
public class Document
{
public int Id { get; set; }
public string Title { get; set; }
public string Content { get; set; }
public int Version { get; private set; }
public void Update(string title, string content)
{
Title = title;
Content = content;
Version++;
}
}
public class DocumentRepository
{
private readonly AppDbContext _context;
public async Task<bool> UpdateDocumentAsync(int id, string title, string content, int expectedVersion)
{
// Update with version check
var affectedRows = await _context.Database.ExecuteSqlRawAsync(
"UPDATE Documents SET Title = {0}, Content = {1}, Version = Version + 1 " +
"WHERE Id = {2} AND Version = {3}",
title, content, id, expectedVersion);
return affectedRows > 0;
}
}
3.3 Pessimistic Locking
public class SeatBookingService
{
private readonly AppDbContext _context;
public async Task<bool> BookSeatAsync(int seatId, int userId)
{
// Use transaction with isolation level
using var transaction = await _context.Database.BeginTransactionAsync(
System.Data.IsolationLevel.Serializable);
try
{
// Lock the seat for update
var seat = await _context.Seats
.FromSqlRaw("SELECT * FROM Seats WITH (UPDLOCK, ROWLOCK) WHERE Id = {0}", seatId)
.FirstOrDefaultAsync();
if (seat == null || seat.IsBooked)
return false;
seat.Book(userId);
await _context.SaveChangesAsync();
await transaction.CommitAsync();
return true;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
// SQL Server specific table hints
public async Task<Seat> GetSeatWithLockAsync(int seatId)
{
// UPDLOCK - Update lock, HOLDLOCK - Hold until transaction completes
return await _context.Seats
.FromSqlRaw("SELECT * FROM Seats WITH (UPDLOCK, HOLDLOCK) WHERE Id = {0}", seatId)
.FirstOrDefaultAsync();
}
}
4. Distributed Locking
4.1 Redis Distributed Lock (Redlock Algorithm)
public class RedisDistributedLock : IDistributedLock
{
private readonly IDatabase _redis;
private readonly string _lockKey;
private readonly TimeSpan _expiry;
private string _lockValue;
public RedisDistributedLock(IConnectionMultiplexer redis, string lockKey, TimeSpan expiry)
{
_redis = redis.GetDatabase();
_lockKey = $"lock:{lockKey}";
_expiry = expiry;
}
public async Task<bool> AcquireAsync(CancellationToken cancellationToken = default)
{
_lockValue = Guid.NewGuid().ToString();
// SET NX (Not Exists) with expiry
var acquired = await _redis.StringSetAsync(
_lockKey,
_lockValue,
_expiry,
When.NotExists);
if (!acquired)
{
// Lock is held by someone else
return false;
}
// Verify we're the lock owner (prevent race conditions)
var currentValue = await _redis.StringGetAsync(_lockKey);
return currentValue == _lockValue;
}
public async Task ReleaseAsync()
{
// Lua script to ensure atomic release (only release if we own it)
var script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end";
await _redis.ScriptEvaluateAsync(
script,
new RedisKey[] { _lockKey },
new RedisValue[] { _lockValue });
}
public async Task<IDisposable> AcquireWithAutoReleaseAsync(CancellationToken cancellationToken = default)
{
if (await AcquireAsync(cancellationToken))
{
return new LockReleaser(this);
}
return null;
}
private class LockReleaser : IDisposable
{
private readonly RedisDistributedLock _lock;
public LockReleaser(RedisDistributedLock lock)
{
_lock = lock;
}
public void Dispose()
{
_lock.ReleaseAsync().GetAwaiter().GetResult();
}
}
}
// Usage with Polly retry
public class DistributedLockService
{
private readonly IConnectionMultiplexer _redis;
private readonly ILogger<DistributedLockService> _logger;
public DistributedLockService(IConnectionMultiplexer redis, ILogger<DistributedLockService> logger)
{
_redis = redis;
_logger = logger;
}
public async Task<T> ExecuteWithLockAsync<T>(
string lockKey,
Func<Task<T>> action,
TimeSpan lockExpiry,
CancellationToken cancellationToken = default)
{
var retryPolicy = Policy
.HandleResult<bool>(acquired => !acquired)
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromMilliseconds(100 * retryAttempt));
var lock = new RedisDistributedLock(_redis, lockKey, lockExpiry);
var acquired = await retryPolicy.ExecuteAsync(() => lock.AcquireAsync(cancellationToken));
if (!acquired)
{
throw new TimeoutException($"Could not acquire lock for key: {lockKey}");
}
try
{
return await action();
}
finally
{
await lock.ReleaseAsync();
}
}
}
4.2 SQL Server Distributed Lock (Application Lock)
public class SqlServerDistributedLock : IDistributedLock
{
private readonly SqlConnection _connection;
private readonly string _lockName;
private readonly int _timeoutSeconds;
public SqlServerDistributedLock(string connectionString, string lockName, int timeoutSeconds = 30)
{
_connection = new SqlConnection(connectionString);
_lockName = lockName;
_timeoutSeconds = timeoutSeconds;
}
public async Task<bool> AcquireAsync(CancellationToken cancellationToken = default)
{
await _connection.OpenAsync(cancellationToken);
using var command = new SqlCommand(
"sp_getapplock", _connection)
{
CommandType = CommandType.StoredProcedure
};
command.Parameters.AddWithValue("@Resource", _lockName);
command.Parameters.AddWithValue("@LockMode", "Exclusive");
command.Parameters.AddWithValue("@LockOwner", "Session");
command.Parameters.AddWithValue("@LockTimeout", _timeoutSeconds * 1000);
var returnParameter = command.Parameters.Add("@Result", SqlDbType.Int);
returnParameter.Direction = ParameterDirection.ReturnValue;
await command.ExecuteNonQueryAsync(cancellationToken);
var result = (int)returnParameter.Value;
return result >= 0; // 0 = lock acquired, 1 = lock granted after wait
}
public async Task ReleaseAsync()
{
using var command = new SqlCommand(
"sp_releaseapplock", _connection)
{
CommandType = CommandType.StoredProcedure
};
command.Parameters.AddWithValue("@Resource", _lockName);
command.Parameters.AddWithValue("@LockOwner", "Session");
await command.ExecuteNonQueryAsync();
await _connection.CloseAsync();
}
public void Dispose()
{
_connection?.Dispose();
}
}
5. Caching Strategies
5.1 In-Memory Caching with IMemoryCache
public class CachedProductService
{
private readonly IMemoryCache _cache;
private readonly IProductRepository _repository;
private readonly ILogger<CachedProductService> _logger;
private readonly SemaphoreSlim _cacheLock = new(1, 1);
public CachedProductService(
IMemoryCache cache,
IProductRepository repository,
ILogger<CachedProductService> logger)
{
_cache = cache;
_repository = repository;
_logger = logger;
}
// Cache-Aside Pattern
public async Task<Product> GetProductAsync(int id, CancellationToken cancellationToken = default)
{
var cacheKey = $"product_{id}";
// Try to get from cache
if (_cache.TryGetValue(cacheKey, out Product cachedProduct))
{
_logger.LogDebug("Cache hit for product {ProductId}", id);
return cachedProduct;
}
_logger.LogDebug("Cache miss for product {ProductId}", id);
// Use semaphore to prevent multiple simultaneous cache misses
await _cacheLock.WaitAsync(cancellationToken);
try
{
// Double-check after acquiring lock
if (_cache.TryGetValue(cacheKey, out cachedProduct))
return cachedProduct;
// Fetch from database
var product = await _repository.GetByIdAsync(id, cancellationToken);
if (product != null)
{
// Set cache with expiration policies
var cacheOptions = new MemoryCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(15),
SlidingExpiration = TimeSpan.FromMinutes(5),
Priority = CacheItemPriority.Normal
};
// Register cache eviction callback
cacheOptions.RegisterPostEvictionCallback((key, value, reason, state) =>
{
_logger.LogInformation("Cache entry {Key} evicted due to {Reason}", key, reason);
});
_cache.Set(cacheKey, product, cacheOptions);
}
return product;
}
finally
{
_cacheLock.Release();
}
}
// Cache invalidation
public async Task UpdateProductAsync(Product product, CancellationToken cancellationToken = default)
{
await _repository.UpdateAsync(product, cancellationToken);
// Invalidate cache
var cacheKey = $"product_{product.Id}";
_cache.Remove(cacheKey);
_logger.LogInformation("Cache invalidated for product {ProductId}", product.Id);
}
}
5.2 Distributed Caching with Redis
public class RedisCachedService
{
private readonly IDistributedCache _cache;
private readonly IProductRepository _repository;
private readonly ILogger<RedisCachedService> _logger;
public RedisCachedService(
IDistributedCache cache,
IProductRepository repository,
ILogger<RedisCachedService> logger)
{
_cache = cache;
_repository = repository;
_logger = logger;
}
public async Task<Product> GetProductAsync(int id, CancellationToken cancellationToken = default)
{
var cacheKey = $"product:{id}";
// Try to get from Redis
var cachedData = await _cache.GetStringAsync(cacheKey, cancellationToken);
if (cachedData != null)
{
_logger.LogDebug("Redis cache hit for product {ProductId}", id);
return JsonSerializer.Deserialize<Product>(cachedData);
}
_logger.LogDebug("Redis cache miss for product {ProductId}", id);
// Fetch from database
var product = await _repository.GetByIdAsync(id, cancellationToken);
if (product != null)
{
var options = new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(15),
SlidingExpiration = TimeSpan.FromMinutes(5)
};
var jsonData = JsonSerializer.Serialize(product);
await _cache.SetStringAsync(cacheKey, jsonData, options, cancellationToken);
}
return product;
}
// Cache-Aside with lazy loading
public async Task<Product> GetOrCreateAsync(int id, CancellationToken cancellationToken = default)
{
var cacheKey = $"product:{id}";
return await _cache.GetOrCreateAsync(cacheKey, async entry =>
{
entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(15);
entry.SlidingExpiration = TimeSpan.FromMinutes(5);
_logger.LogDebug("Loading product {ProductId} from database", id);
return await _repository.GetByIdAsync(id, cancellationToken);
}, cancellationToken);
}
}
// Extension method for IDistributedCache
public static class DistributedCacheExtensions
{
public static async Task<T> GetOrCreateAsync<T>(
this IDistributedCache cache,
string key,
Func<DistributedCacheEntryOptions, Task<T>> factory,
CancellationToken cancellationToken = default)
{
var cachedData = await cache.GetStringAsync(key, cancellationToken);
if (cachedData != null)
{
return JsonSerializer.Deserialize<T>(cachedData);
}
var options = new DistributedCacheEntryOptions();
var value = await factory(options);
var jsonData = JsonSerializer.Serialize(value);
await cache.SetStringAsync(key, jsonData, options, cancellationToken);
return value;
}
}
5.3 Cache Patterns
// Write-Through Cache Pattern
public class WriteThroughCache<TKey, TValue>
{
private readonly IDistributedCache _cache;
private readonly IRepository<TKey, TValue> _repository;
private readonly TimeSpan _expiration;
public WriteThroughCache(IDistributedCache cache, IRepository<TKey, TValue> repository, TimeSpan expiration)
{
_cache = cache;
_repository = repository;
_expiration = expiration;
}
public async Task<TValue> GetAsync(TKey key, CancellationToken cancellationToken = default)
{
var cacheKey = $"{typeof(TValue).Name}:{key}";
var cached = await _cache.GetStringAsync(cacheKey, cancellationToken);
if (cached != null)
return JsonSerializer.Deserialize<TValue>(cached);
var value = await _repository.GetAsync(key, cancellationToken);
if (value != null)
{
await _cache.SetStringAsync(
cacheKey,
JsonSerializer.Serialize(value),
new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = _expiration },
cancellationToken);
}
return value;
}
public async Task SetAsync(TKey key, TValue value, CancellationToken cancellationToken = default)
{
// Write to database first
await _repository.SetAsync(key, value, cancellationToken);
// Then update cache
var cacheKey = $"{typeof(TValue).Name}:{key}";
await _cache.SetStringAsync(
cacheKey,
JsonSerializer.Serialize(value),
new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = _expiration },
cancellationToken);
}
}
// Write-Behind (Write-Back) Cache Pattern
public class WriteBehindCache<TKey, TValue>
{
private readonly IDistributedCache _cache;
private readonly IRepository<TKey, TValue> _repository;
private readonly ConcurrentQueue<WriteOperation> _writeQueue = new();
private readonly Timer _flushTimer;
private readonly int _batchSize;
public WriteBehindCache(IDistributedCache cache, IRepository<TKey, TValue> repository, int flushIntervalSeconds = 30, int batchSize = 100)
{
_cache = cache;
_repository = repository;
_batchSize = batchSize;
_flushTimer = new Timer(FlushQueue, null, TimeSpan.FromSeconds(flushIntervalSeconds), TimeSpan.FromSeconds(flushIntervalSeconds));
}
public async Task SetAsync(TKey key, TValue value, CancellationToken cancellationToken = default)
{
// Write to cache immediately
var cacheKey = $"{typeof(TValue).Name}:{key}";
await _cache.SetStringAsync(
cacheKey,
JsonSerializer.Serialize(value),
new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1) },
cancellationToken);
// Queue for database write
_writeQueue.Enqueue(new WriteOperation { Key = key, Value = value });
}
private async void FlushQueue(object state)
{
var batch = new List<WriteOperation>();
while (batch.Count < _batchSize && _writeQueue.TryDequeue(out var operation))
{
batch.Add(operation);
}
if (batch.Any())
{
await _repository.BatchSetAsync(batch.ToDictionary(b => b.Key, b => b.Value));
}
}
private class WriteOperation
{
public TKey Key { get; set; }
public TValue Value { get; set; }
}
}
6. Cache Invalidation Strategies
6.1 Time-Based Invalidation
public class TimeBasedCacheInvalidation
{
private readonly IMemoryCache _cache;
public void SetWithSlidingExpiration(string key, object value)
{
_cache.Set(key, value, new MemoryCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromMinutes(5),
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
});
}
public void SetWithAbsoluteExpiration(string key, object value)
{
// Cache expires at specific time
var expirationTime = DateTime.UtcNow.Date.AddDays(1).AddHours(3); // 3 AM tomorrow
_cache.Set(key, value, expirationTime - DateTime.UtcNow);
}
}
6.2 Event-Based Invalidation (Pub/Sub)
public class CacheInvalidationService
{
private readonly IConnectionMultiplexer _redis;
private readonly ISubscriber _subscriber;
private readonly IMemoryCache _memoryCache;
public CacheInvalidationService(IConnectionMultiplexer redis, IMemoryCache memoryCache)
{
_redis = redis;
_memoryCache = memoryCache;
_subscriber = redis.GetSubscriber();
// Subscribe to invalidation events
_subscriber.Subscribe("cache:invalidate", (channel, message) =>
{
var key = message.ToString();
_memoryCache.Remove(key);
});
}
public async Task InvalidateAsync(string key)
{
// Invalidate local cache
_memoryCache.Remove(key);
// Publish invalidation to other instances
await _subscriber.PublishAsync("cache:invalidate", key);
}
}
6.3 Version-Based Invalidation
public class VersionedCacheService
{
private readonly IDistributedCache _cache;
private readonly IProductRepository _repository;
public async Task<Product> GetProductAsync(int id)
{
var versionKey = $"product_version:{id}";
var currentVersion = await _cache.GetStringAsync(versionKey) ?? "1";
var cacheKey = $"product:{id}:v{currentVersion}";
var cached = await _cache.GetStringAsync(cacheKey);
if (cached != null)
return JsonSerializer.Deserialize<Product>(cached);
var product = await _repository.GetByIdAsync(id);
// Store with version
await _cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(product));
return product;
}
public async Task UpdateProductAsync(Product product)
{
await _repository.UpdateAsync(product);
// Increment version
var versionKey = $"product_version:{product.Id}";
var currentVersion = await _cache.GetStringAsync(versionKey) ?? "1";
var newVersion = (int.Parse(currentVersion) + 1).ToString();
await _cache.SetStringAsync(versionKey, newVersion);
// Old cache entries with old version are now stale
}
}
7. Performance Optimization Techniques
7.1 Batched Operations
public class BatchedDataService
{
private readonly IProductRepository _repository;
private readonly List<int> _pendingIds = new();
private readonly object _lock = new();
private readonly Timer _batchTimer;
private readonly int _batchSize;
public BatchedDataService(IProductRepository repository, int batchSize = 50, int batchIntervalMs = 100)
{
_repository = repository;
_batchSize = batchSize;
_batchTimer = new Timer(ProcessBatch, null, batchIntervalMs, batchIntervalMs);
}
public async Task<Product> GetProductAsync(int id)
{
lock (_lock)
{
_pendingIds.Add(id);
if (_pendingIds.Count >= _batchSize)
{
ProcessBatch(null);
}
}
// Wait for batch processing
return await WaitForProductAsync(id);
}
private async void ProcessBatch(object state)
{
List<int> ids;
lock (_lock)
{
ids = _pendingIds.ToList();
_pendingIds.Clear();
}
if (ids.Any())
{
var products = await _repository.GetByIdsAsync(ids);
// Store in local cache
}
}
}
7.2 Connection Pooling
public class HttpClientPool
{
private readonly ConcurrentBag<HttpClient> _clients = new();
private readonly SemaphoreSlim _semaphore;
private readonly int _maxConnections;
public HttpClientPool(int maxConnections = 10)
{
_maxConnections = maxConnections;
_semaphore = new SemaphoreSlim(maxConnections, maxConnections);
// Pre-create connections
for (int i = 0; i < maxConnections; i++)
{
_clients.Add(CreateHttpClient());
}
}
public async Task<PooledHttpClient> GetClientAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken);
if (_clients.TryTake(out var client))
{
return new PooledHttpClient(client, this);
}
// Fallback
client = CreateHttpClient();
return new PooledHttpClient(client, this);
}
private void ReturnClient(HttpClient client)
{
_clients.Add(client);
_semaphore.Release();
}
private HttpClient CreateHttpClient()
{
var handler = new SocketsHttpHandler
{
PooledConnectionLifetime = TimeSpan.FromMinutes(5),
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2),
MaxConnectionsPerServer = 10,
EnableMultipleHttp2Connections = true
};
return new HttpClient(handler);
}
private class PooledHttpClient : IDisposable
{
private readonly HttpClient _client;
private readonly HttpClientPool _pool;
public PooledHttpClient(HttpClient client, HttpClientPool pool)
{
_client = client;
_pool = pool;
}
public HttpClient Client => _client;
public void Dispose()
{
_pool.ReturnClient(_client);
}
}
}
8. Deadlock Prevention and Detection
8.1 Deadlock Detection
public class DeadlockDetector
{
private readonly ConcurrentDictionary<string, LockInfo> _activeLocks = new();
private readonly ILogger<DeadlockDetector> _logger;
private readonly Timer _monitorTimer;
public DeadlockDetector(ILogger<DeadlockDetector> logger)
{
_logger = logger;
_monitorTimer = new Timer(DetectDeadlocks, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
}
public IDisposable AcquireLock(string resourceId, string threadId)
{
var lockInfo = new LockInfo
{
ResourceId = resourceId,
ThreadId = threadId,
AcquiredAt = DateTime.UtcNow,
StackTrace = Environment.StackTrace
};
if (!_activeLocks.TryAdd(resourceId, lockInfo))
{
throw new InvalidOperationException($"Resource {resourceId} already locked");
}
return new LockReleaser(this, resourceId);
}
private void DetectDeadlocks(object state)
{
var now = DateTime.UtcNow;
foreach (var lockInfo in _activeLocks.Values)
{
var duration = now - lockInfo.AcquiredAt;
if (duration > TimeSpan.FromMinutes(5))
{
_logger.LogWarning("Potential deadlock detected on resource {ResourceId} held for {Duration}",
lockInfo.ResourceId, duration);
// Log stack trace for debugging
_logger.LogDebug("Lock held by thread {ThreadId}:\n{StackTrace}",
lockInfo.ThreadId, lockInfo.StackTrace);
}
}
}
private class LockReleaser : IDisposable
{
private readonly DeadlockDetector _detector;
private readonly string _resourceId;
public LockReleaser(DeadlockDetector detector, string resourceId)
{
_detector = detector;
_resourceId = resourceId;
}
public void Dispose()
{
_detector._activeLocks.TryRemove(_resourceId, out _);
}
}
private class LockInfo
{
public string ResourceId { get; set; }
public string ThreadId { get; set; }
public DateTime AcquiredAt { get; set; }
public string StackTrace { get; set; }
}
}
8.2 Deadlock Prevention Strategies
public class DeadlockSafeTransaction
{
public async Task TransferMoneyAsync(
IAccountRepository repository,
int fromId,
int toId,
decimal amount,
ILogger logger)
{
// Always acquire locks in consistent order
var firstId = Math.Min(fromId, toId);
var secondId = Math.Max(fromId, toId);
using var tx = await repository.BeginTransactionAsync();
try
{
var firstAccount = await repository.GetWithLockAsync(firstId);
var secondAccount = await repository.GetWithLockAsync(secondId);
// Now we have both locks safely
if (firstAccount.Id == fromId)
{
firstAccount.Withdraw(amount);
secondAccount.Deposit(amount);
}
else
{
secondAccount.Withdraw(amount);
firstAccount.Deposit(amount);
}
await tx.CommitAsync();
}
catch (Exception ex)
{
await tx.RollbackAsync();
logger.LogError(ex, "Transaction failed");
throw;
}
}
public async Task TransferWithTimeoutAsync(
IAccountRepository repository,
int fromId,
int toId,
decimal amount,
TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
try
{
await TransferMoneyAsync(repository, fromId, toId, amount, null, cts.Token);
}
catch (OperationCanceledException)
{
throw new TimeoutException("Transaction timed out");
}
}
}
9. Performance Monitoring and Metrics
public class PerformanceMetricsService
{
private readonly ILogger<PerformanceMetricsService> _logger;
private readonly ConcurrentDictionary<string, Metrics> _metrics = new();
public PerformanceMetricsService(ILogger<PerformanceMetricsService> logger)
{
_logger = logger;
}
public IDisposable MeasureOperation(string operationName)
{
return new OperationMeasurer(this, operationName);
}
public void RecordExecutionTime(string operationName, TimeSpan duration)
{
var metrics = _metrics.GetOrAdd(operationName, _ => new Metrics());
metrics.AddExecutionTime(duration);
// Log slow operations
if (duration > TimeSpan.FromSeconds(1))
{
_logger.LogWarning("Slow operation {Operation} took {Duration}ms",
operationName, duration.TotalMilliseconds);
}
}
public void RecordError(string operationName, Exception ex)
{
var metrics = _metrics.GetOrAdd(operationName, _ => new Metrics());
metrics.RecordError(ex.GetType().Name);
_logger.LogError(ex, "Error in operation {Operation}", operationName);
}
public Dictionary<string, MetricsSummary> GetMetrics()
{
return _metrics.ToDictionary(
m => m.Key,
m => m.Value.GetSummary());
}
private class OperationMeasurer : IDisposable
{
private readonly PerformanceMetricsService _service;
private readonly string _operationName;
private readonly Stopwatch _stopwatch;
public OperationMeasurer(PerformanceMetricsService service, string operationName)
{
_service = service;
_operationName = operationName;
_stopwatch = Stopwatch.StartNew();
}
public void Dispose()
{
_stopwatch.Stop();
_service.RecordExecutionTime(_operationName, _stopwatch.Elapsed);
}
}
private class Metrics
{
private long _totalCalls;
private long _totalTimeMs;
private long _minTimeMs = long.MaxValue;
private long _maxTimeMs;
private readonly ConcurrentDictionary<string, long> _errors = new();
public void AddExecutionTime(TimeSpan duration)
{
Interlocked.Increment(ref _totalCalls);
Interlocked.Add(ref _totalTimeMs, (long)duration.TotalMilliseconds);
var ms = (long)duration.TotalMilliseconds;
// Update min
var currentMin = _minTimeMs;
while (ms < currentMin)
{
var original = Interlocked.CompareExchange(ref _minTimeMs, ms, currentMin);
if (original == currentMin) break;
currentMin = original;
}
// Update max
var currentMax = _maxTimeMs;
while (ms > currentMax)
{
var original = Interlocked.CompareExchange(ref _maxTimeMs, ms, currentMax);
if (original == currentMax) break;
currentMax = original;
}
}
public void RecordError(string errorType)
{
_errors.AddOrUpdate(errorType, 1, (_, count) => count + 1);
}
public MetricsSummary GetSummary()
{
var totalCalls = Interlocked.Read(ref _totalCalls);
var totalTimeMs = Interlocked.Read(ref _totalTimeMs);
return new MetricsSummary
{
TotalCalls = totalCalls,
AverageTimeMs = totalCalls > 0 ? totalTimeMs / totalCalls : 0,
MinTimeMs = _minTimeMs == long.MaxValue ? 0 : _minTimeMs,
MaxTimeMs = _maxTimeMs,
Errors = _errors.ToDictionary(e => e.Key, e => e.Value)
};
}
}
public class MetricsSummary
{
public long TotalCalls { get; set; }
public long AverageTimeMs { get; set; }
public long MinTimeMs { get; set; }
public long MaxTimeMs { get; set; }
public Dictionary<string, long> Errors { get; set; }
}
}
// Usage
public class OrderServiceWithMetrics
{
private readonly PerformanceMetricsService _metrics;
public OrderServiceWithMetrics(PerformanceMetricsService metrics)
{
_metrics = metrics;
}
public async Task<Order> CreateOrderAsync(OrderRequest request)
{
using (_metrics.MeasureOperation("CreateOrder"))
{
try
{
// Business logic
var order = new Order();
await Task.Delay(100);
return order;
}
catch (Exception ex)
{
_metrics.RecordError("CreateOrder", ex);
throw;
}
}
}
}
10. Summary and Best Practices
10.1 Concurrency Best Practices
- Choose the right synchronization primitive: lock for simple scenarios, ReaderWriterLockSlim for read-heavy, SemaphoreSlim for resource pooling
- Avoid lock(this), lock(typeof(MyClass)), lock("string") - Use private readonly objects
- Always acquire locks in consistent order to prevent deadlocks
- Use timeout mechanisms for lock acquisition
- Prefer Concurrent collections over manual synchronization
- Use async/await with SemaphoreSlim.WaitAsync instead of blocking
10.2 Caching Best Practices
- Set appropriate expiration policies - Absolute vs Sliding
- Implement cache invalidation - Don't rely solely on time-based expiration
- Use distributed cache for multi-instance deployments
- Monitor cache hit rates to optimize caching strategy
- Implement cache-aside pattern with retry logic
- Consider write-through/write-behind for critical data
10.3 Performance Optimization Checklist
- ✅ Use connection pooling for database and HTTP clients
- ✅ Implement batching for high-frequency operations
- ✅ Cache expensive computations
- ✅ Use async I/O to free threads
- ✅ Monitor and log slow operations
- ✅ Implement circuit breaker patterns for external dependencies
- ✅ Use compression for large payloads
- ✅ Profile and optimize hot paths
📝 Practice Exercises:
- Implement a thread-safe producer-consumer queue using BlockingCollection
- Create a distributed lock using Redis with automatic renewal (lease)
- Build a cache-aside service with memory and Redis fallback layers
- Implement deadlock detection with graph analysis
- Create a performance metrics dashboard with percentile calculations
- Build a write-behind cache for high-throughput data ingestion
- Implement optimistic concurrency with version numbers in EF Core
Happy Coding! 🚀