Advanced Concurrency & Caching for LLD - IndianTechnoEra
Latest update Android YouTube

Advanced Concurrency & Caching for LLD

Chapter 10: Advanced Concurrency & Caching - Building High-Performance Systems

Series: Low Level Design for .NET Developers | Previous: Chapter 9: Case Study - Splitwise


📖 Introduction

Modern applications face two critical challenges: handling concurrent operations and managing data access performance. This chapter covers advanced techniques for:

  • Managing concurrent access to shared resources
  • Preventing race conditions and deadlocks
  • Implementing efficient caching strategies
  • Distributed locking and synchronization
  • Optimistic vs pessimistic concurrency control
  • Cache invalidation patterns

1. Concurrency Fundamentals

1.1 Understanding Race Conditions

A race condition occurs when multiple threads access shared data simultaneously, leading to unpredictable results.

// ❌ Race Condition Example
public class BankAccount
{
    public decimal Balance { get; private set; }
    
    public void Deposit(decimal amount)
    {
        // Race condition: multiple threads can read and write simultaneously
        var current = Balance;
        var newBalance = current + amount;
        Balance = newBalance; // Lost update possible
    }
}

// ✅ Thread-Safe Version
public class ThreadSafeBankAccount
{
    private decimal _balance;
    private readonly object _lock = new object();
    
    public decimal Balance 
    { 
        get 
        {
            lock (_lock)
            {
                return _balance;
            }
        }
    }
    
    public void Deposit(decimal amount)
    {
        lock (_lock)
        {
            _balance += amount;
        }
    }
    
    public void Withdraw(decimal amount)
    {
        lock (_lock)
        {
            if (_balance >= amount)
                _balance -= amount;
            else
                throw new InvalidOperationException("Insufficient funds");
        }
    }
}

1.2 Thread-Safe Collections

public class ThreadSafeCache
{
    // ConcurrentDictionary is thread-safe
    private readonly ConcurrentDictionary<string, object> _cache = new();
    
    // BlockingCollection for producer-consumer scenarios
    private readonly BlockingCollection<WorkItem> _workQueue = new();
    
    // ConcurrentBag for unordered collections
    private readonly ConcurrentBag<Task> _tasks = new();
    
    // ConcurrentQueue for FIFO operations
    private readonly ConcurrentQueue<string> _messageQueue = new();
    
    // ConcurrentStack for LIFO operations
    private readonly ConcurrentStack<string> _undoStack = new();
    
    public T GetOrAdd<T>(string key, Func<string, T> valueFactory)
    {
        return (T)_cache.GetOrAdd(key, k => valueFactory(k));
    }
    
    public bool TryRemove(string key, out object value)
    {
        return _cache.TryRemove(key, out value);
    }
    
    public void EnqueueWork(WorkItem item)
    {
        _workQueue.Add(item);
    }
    
    public async Task ProcessWorkAsync(CancellationToken cancellationToken)
    {
        foreach (var item in _workQueue.GetConsumingEnumerable(cancellationToken))
        {
            await ProcessItemAsync(item);
        }
    }
}

2. Synchronization Primitives

2.1 Lock Statement and Monitor

public class AccountManager
{
    private readonly object _balanceLock = new object();
    private readonly object _transactionLock = new object();
    private decimal _balance;
    private List<Transaction> _transactions = new();
    
    public void Transfer(AccountManager target, decimal amount)
    {
        // Avoid deadlock by always acquiring locks in the same order
        object lock1 = this._balanceLock;
        object lock2 = target._balanceLock;
        
        if (lock1.GetHashCode() > lock2.GetHashCode())
        {
            (lock1, lock2) = (lock2, lock1);
        }
        
        lock (lock1)
        {
            lock (lock2)
            {
                if (_balance >= amount)
                {
                    _balance -= amount;
                    target._balance += amount;
                }
            }
        }
    }
    
    public void AddTransaction(Transaction transaction)
    {
        // Use Monitor.TryEnter with timeout
        if (Monitor.TryEnter(_transactionLock, TimeSpan.FromSeconds(5)))
        {
            try
            {
                _transactions.Add(transaction);
            }
            finally
            {
                Monitor.Exit(_transactionLock);
            }
        }
        else
        {
            throw new TimeoutException("Could not acquire lock");
        }
    }
}

2.2 ReaderWriterLockSlim

public class ThreadSafeDataStore<T>
{
    private readonly ReaderWriterLockSlim _lock = new();
    private readonly Dictionary<string, T> _data = new();
    private DateTime _lastReadTime;
    private DateTime _lastWriteTime;
    
    public T Read(string key)
    {
        _lock.EnterReadLock();
        try
        {
            _lastReadTime = DateTime.UtcNow;
            return _data.TryGetValue(key, out var value) ? value : default;
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }
    
    public void Write(string key, T value)
    {
        _lock.EnterWriteLock();
        try
        {
            _data[key] = value;
            _lastWriteTime = DateTime.UtcNow;
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }
    
    public T ReadUpgradable(string key)
    {
        _lock.EnterUpgradeableReadLock();
        try
        {
            if (_data.TryGetValue(key, out var value))
                return value;
                
            // Upgrade to write lock
            _lock.EnterWriteLock();
            try
            {
                value = FetchFromDatabase(key);
                _data[key] = value;
                return value;
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();
        }
    }
    
    private T FetchFromDatabase(string key)
    {
        // Simulate database fetch
        return default;
    }
}

2.3 SemaphoreSlim for Resource Pooling

public class DatabaseConnectionPool : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    private readonly ConcurrentBag<DbConnection> _connections;
    private readonly int _maxConnections;
    
    public DatabaseConnectionPool(int maxConnections)
    {
        _maxConnections = maxConnections;
        _semaphore = new SemaphoreSlim(maxConnections, maxConnections);
        _connections = new ConcurrentBag<DbConnection>();
        
        // Pre-create connections
        for (int i = 0; i < maxConnections; i++)
        {
            _connections.Add(CreateConnection());
        }
    }
    
    public async Task<PooledConnection> GetConnectionAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken);
        
        if (_connections.TryTake(out var connection))
        {
            return new PooledConnection(connection, this);
        }
        
        // Fallback - create new connection (should not happen with proper semaphore)
        connection = CreateConnection();
        return new PooledConnection(connection, this);
    }
    
    private void ReturnConnection(DbConnection connection)
    {
        _connections.Add(connection);
        _semaphore.Release();
    }
    
    private DbConnection CreateConnection()
    {
        // Create actual database connection
        return new SqlConnection("connection_string");
    }
    
    public void Dispose()
    {
        foreach (var connection in _connections)
        {
            connection?.Dispose();
        }
        _semaphore.Dispose();
    }
}

public class PooledConnection : IDisposable
{
    private readonly DbConnection _connection;
    private readonly DatabaseConnectionPool _pool;
    private bool _disposed;
    
    public PooledConnection(DbConnection connection, DatabaseConnectionPool pool)
    {
        _connection = connection;
        _pool = pool;
    }
    
    public DbConnection Connection => _connection;
    
    public void Dispose()
    {
        if (!_disposed)
        {
            _pool.ReturnConnection(_connection);
            _disposed = true;
        }
    }
}

2.4 AsyncLocal for Context Propagation

public class CorrelationContext
{
    private static readonly AsyncLocal<CorrelationContext> _current = new();
    
    public string CorrelationId { get; set; }
    public string UserId { get; set; }
    public string RequestPath { get; set; }
    public DateTime StartTime { get; set; }
    
    public static CorrelationContext Current
    {
        get => _current.Value;
        set => _current.Value = value;
    }
    
    public static IDisposable BeginScope(string correlationId, string userId)
    {
        var original = Current;
        Current = new CorrelationContext
        {
            CorrelationId = correlationId,
            UserId = userId,
            StartTime = DateTime.UtcNow
        };
        
        return new DisposableScope(() => Current = original);
    }
    
    private class DisposableScope : IDisposable
    {
        private readonly Action _onDispose;
        
        public DisposableScope(Action onDispose)
        {
            _onDispose = onDispose;
        }
        
        public void Dispose()
        {
            _onDispose?.Invoke();
        }
    }
}

// Middleware to set correlation context
public class CorrelationMiddleware
{
    private readonly RequestDelegate _next;
    
    public CorrelationMiddleware(RequestDelegate next)
    {
        _next = next;
    }
    
    public async Task InvokeAsync(HttpContext context)
    {
        var correlationId = context.Request.Headers["X-Correlation-ID"].FirstOrDefault() 
            ?? Guid.NewGuid().ToString();
            
        using (CorrelationContext.BeginScope(correlationId, context.User?.Identity?.Name))
        {
            context.Response.Headers["X-Correlation-ID"] = correlationId;
            await _next(context);
        }
    }
}

3. Optimistic vs Pessimistic Concurrency

3.1 Optimistic Concurrency with EF Core

public class Product
{
    public int Id { get; set; }
    public string Name { get; set; }
    public int Quantity { get; set; }
    
    // Concurrency token - tracks row version
    public byte[] RowVersion { get; set; }
}

public class InventoryService
{
    private readonly AppDbContext _context;
    private readonly ILogger<InventoryService> _logger;
    
    public InventoryService(AppDbContext context, ILogger<InventoryService> logger)
    {
        _context = context;
        _logger = logger;
    }
    
    public async Task<bool> DeductStockAsync(int productId, int quantity, int retryCount = 3)
    {
        for (int attempt = 1; attempt <= retryCount; attempt++)
        {
            try
            {
                var product = await _context.Products.FindAsync(productId);
                
                if (product.Quantity < quantity)
                    return false;
                    
                product.Quantity -= quantity;
                
                await _context.SaveChangesAsync();
                return true;
            }
            catch (DbUpdateConcurrencyException ex)
            {
                _logger.LogWarning(ex, "Concurrency conflict on attempt {Attempt}", attempt);
                
                // Reload and retry
                foreach (var entry in ex.Entries)
                {
                    await entry.ReloadAsync();
                }
                
                if (attempt == retryCount)
                {
                    _logger.LogError("Failed to update after {RetryCount} attempts", retryCount);
                    throw;
                }
            }
        }
        
        return false;
    }
}

3.2 Optimistic Concurrency with Version Number

public class Document
{
    public int Id { get; set; }
    public string Title { get; set; }
    public string Content { get; set; }
    public int Version { get; private set; }
    
    public void Update(string title, string content)
    {
        Title = title;
        Content = content;
        Version++;
    }
}

public class DocumentRepository
{
    private readonly AppDbContext _context;
    
    public async Task<bool> UpdateDocumentAsync(int id, string title, string content, int expectedVersion)
    {
        // Update with version check
        var affectedRows = await _context.Database.ExecuteSqlRawAsync(
            "UPDATE Documents SET Title = {0}, Content = {1}, Version = Version + 1 " +
            "WHERE Id = {2} AND Version = {3}",
            title, content, id, expectedVersion);
            
        return affectedRows > 0;
    }
}

3.3 Pessimistic Locking

public class SeatBookingService
{
    private readonly AppDbContext _context;
    
    public async Task<bool> BookSeatAsync(int seatId, int userId)
    {
        // Use transaction with isolation level
        using var transaction = await _context.Database.BeginTransactionAsync(
            System.Data.IsolationLevel.Serializable);
            
        try
        {
            // Lock the seat for update
            var seat = await _context.Seats
                .FromSqlRaw("SELECT * FROM Seats WITH (UPDLOCK, ROWLOCK) WHERE Id = {0}", seatId)
                .FirstOrDefaultAsync();
                
            if (seat == null || seat.IsBooked)
                return false;
                
            seat.Book(userId);
            await _context.SaveChangesAsync();
            
            await transaction.CommitAsync();
            return true;
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
    
    // SQL Server specific table hints
    public async Task<Seat> GetSeatWithLockAsync(int seatId)
    {
        // UPDLOCK - Update lock, HOLDLOCK - Hold until transaction completes
        return await _context.Seats
            .FromSqlRaw("SELECT * FROM Seats WITH (UPDLOCK, HOLDLOCK) WHERE Id = {0}", seatId)
            .FirstOrDefaultAsync();
    }
}

4. Distributed Locking

4.1 Redis Distributed Lock (Redlock Algorithm)

public class RedisDistributedLock : IDistributedLock
{
    private readonly IDatabase _redis;
    private readonly string _lockKey;
    private readonly TimeSpan _expiry;
    private string _lockValue;
    
    public RedisDistributedLock(IConnectionMultiplexer redis, string lockKey, TimeSpan expiry)
    {
        _redis = redis.GetDatabase();
        _lockKey = $"lock:{lockKey}";
        _expiry = expiry;
    }
    
    public async Task<bool> AcquireAsync(CancellationToken cancellationToken = default)
    {
        _lockValue = Guid.NewGuid().ToString();
        
        // SET NX (Not Exists) with expiry
        var acquired = await _redis.StringSetAsync(
            _lockKey, 
            _lockValue, 
            _expiry, 
            When.NotExists);
            
        if (!acquired)
        {
            // Lock is held by someone else
            return false;
        }
        
        // Verify we're the lock owner (prevent race conditions)
        var currentValue = await _redis.StringGetAsync(_lockKey);
        return currentValue == _lockValue;
    }
    
    public async Task ReleaseAsync()
    {
        // Lua script to ensure atomic release (only release if we own it)
        var script = @"
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end";
            
        await _redis.ScriptEvaluateAsync(
            script,
            new RedisKey[] { _lockKey },
            new RedisValue[] { _lockValue });
    }
    
    public async Task<IDisposable> AcquireWithAutoReleaseAsync(CancellationToken cancellationToken = default)
    {
        if (await AcquireAsync(cancellationToken))
        {
            return new LockReleaser(this);
        }
        
        return null;
    }
    
    private class LockReleaser : IDisposable
    {
        private readonly RedisDistributedLock _lock;
        
        public LockReleaser(RedisDistributedLock lock)
        {
            _lock = lock;
        }
        
        public void Dispose()
        {
            _lock.ReleaseAsync().GetAwaiter().GetResult();
        }
    }
}

// Usage with Polly retry
public class DistributedLockService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<DistributedLockService> _logger;
    
    public DistributedLockService(IConnectionMultiplexer redis, ILogger<DistributedLockService> logger)
    {
        _redis = redis;
        _logger = logger;
    }
    
    public async Task<T> ExecuteWithLockAsync<T>(
        string lockKey, 
        Func<Task<T>> action, 
        TimeSpan lockExpiry,
        CancellationToken cancellationToken = default)
    {
        var retryPolicy = Policy
            .HandleResult<bool>(acquired => !acquired)
            .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromMilliseconds(100 * retryAttempt));
            
        var lock = new RedisDistributedLock(_redis, lockKey, lockExpiry);
        
        var acquired = await retryPolicy.ExecuteAsync(() => lock.AcquireAsync(cancellationToken));
        
        if (!acquired)
        {
            throw new TimeoutException($"Could not acquire lock for key: {lockKey}");
        }
        
        try
        {
            return await action();
        }
        finally
        {
            await lock.ReleaseAsync();
        }
    }
}

4.2 SQL Server Distributed Lock (Application Lock)

public class SqlServerDistributedLock : IDistributedLock
{
    private readonly SqlConnection _connection;
    private readonly string _lockName;
    private readonly int _timeoutSeconds;
    
    public SqlServerDistributedLock(string connectionString, string lockName, int timeoutSeconds = 30)
    {
        _connection = new SqlConnection(connectionString);
        _lockName = lockName;
        _timeoutSeconds = timeoutSeconds;
    }
    
    public async Task<bool> AcquireAsync(CancellationToken cancellationToken = default)
    {
        await _connection.OpenAsync(cancellationToken);
        
        using var command = new SqlCommand(
            "sp_getapplock", _connection)
        {
            CommandType = CommandType.StoredProcedure
        };
        
        command.Parameters.AddWithValue("@Resource", _lockName);
        command.Parameters.AddWithValue("@LockMode", "Exclusive");
        command.Parameters.AddWithValue("@LockOwner", "Session");
        command.Parameters.AddWithValue("@LockTimeout", _timeoutSeconds * 1000);
        
        var returnParameter = command.Parameters.Add("@Result", SqlDbType.Int);
        returnParameter.Direction = ParameterDirection.ReturnValue;
        
        await command.ExecuteNonQueryAsync(cancellationToken);
        
        var result = (int)returnParameter.Value;
        return result >= 0; // 0 = lock acquired, 1 = lock granted after wait
    }
    
    public async Task ReleaseAsync()
    {
        using var command = new SqlCommand(
            "sp_releaseapplock", _connection)
        {
            CommandType = CommandType.StoredProcedure
        };
        
        command.Parameters.AddWithValue("@Resource", _lockName);
        command.Parameters.AddWithValue("@LockOwner", "Session");
        
        await command.ExecuteNonQueryAsync();
        await _connection.CloseAsync();
    }
    
    public void Dispose()
    {
        _connection?.Dispose();
    }
}

5. Caching Strategies

5.1 In-Memory Caching with IMemoryCache

public class CachedProductService
{
    private readonly IMemoryCache _cache;
    private readonly IProductRepository _repository;
    private readonly ILogger<CachedProductService> _logger;
    private readonly SemaphoreSlim _cacheLock = new(1, 1);
    
    public CachedProductService(
        IMemoryCache cache, 
        IProductRepository repository,
        ILogger<CachedProductService> logger)
    {
        _cache = cache;
        _repository = repository;
        _logger = logger;
    }
    
    // Cache-Aside Pattern
    public async Task<Product> GetProductAsync(int id, CancellationToken cancellationToken = default)
    {
        var cacheKey = $"product_{id}";
        
        // Try to get from cache
        if (_cache.TryGetValue(cacheKey, out Product cachedProduct))
        {
            _logger.LogDebug("Cache hit for product {ProductId}", id);
            return cachedProduct;
        }
        
        _logger.LogDebug("Cache miss for product {ProductId}", id);
        
        // Use semaphore to prevent multiple simultaneous cache misses
        await _cacheLock.WaitAsync(cancellationToken);
        
        try
        {
            // Double-check after acquiring lock
            if (_cache.TryGetValue(cacheKey, out cachedProduct))
                return cachedProduct;
                
            // Fetch from database
            var product = await _repository.GetByIdAsync(id, cancellationToken);
            
            if (product != null)
            {
                // Set cache with expiration policies
                var cacheOptions = new MemoryCacheEntryOptions
                {
                    AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(15),
                    SlidingExpiration = TimeSpan.FromMinutes(5),
                    Priority = CacheItemPriority.Normal
                };
                
                // Register cache eviction callback
                cacheOptions.RegisterPostEvictionCallback((key, value, reason, state) =>
                {
                    _logger.LogInformation("Cache entry {Key} evicted due to {Reason}", key, reason);
                });
                
                _cache.Set(cacheKey, product, cacheOptions);
            }
            
            return product;
        }
        finally
        {
            _cacheLock.Release();
        }
    }
    
    // Cache invalidation
    public async Task UpdateProductAsync(Product product, CancellationToken cancellationToken = default)
    {
        await _repository.UpdateAsync(product, cancellationToken);
        
        // Invalidate cache
        var cacheKey = $"product_{product.Id}";
        _cache.Remove(cacheKey);
        
        _logger.LogInformation("Cache invalidated for product {ProductId}", product.Id);
    }
}

5.2 Distributed Caching with Redis

public class RedisCachedService
{
    private readonly IDistributedCache _cache;
    private readonly IProductRepository _repository;
    private readonly ILogger<RedisCachedService> _logger;
    
    public RedisCachedService(
        IDistributedCache cache,
        IProductRepository repository,
        ILogger<RedisCachedService> logger)
    {
        _cache = cache;
        _repository = repository;
        _logger = logger;
    }
    
    public async Task<Product> GetProductAsync(int id, CancellationToken cancellationToken = default)
    {
        var cacheKey = $"product:{id}";
        
        // Try to get from Redis
        var cachedData = await _cache.GetStringAsync(cacheKey, cancellationToken);
        
        if (cachedData != null)
        {
            _logger.LogDebug("Redis cache hit for product {ProductId}", id);
            return JsonSerializer.Deserialize<Product>(cachedData);
        }
        
        _logger.LogDebug("Redis cache miss for product {ProductId}", id);
        
        // Fetch from database
        var product = await _repository.GetByIdAsync(id, cancellationToken);
        
        if (product != null)
        {
            var options = new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(15),
                SlidingExpiration = TimeSpan.FromMinutes(5)
            };
            
            var jsonData = JsonSerializer.Serialize(product);
            await _cache.SetStringAsync(cacheKey, jsonData, options, cancellationToken);
        }
        
        return product;
    }
    
    // Cache-Aside with lazy loading
    public async Task<Product> GetOrCreateAsync(int id, CancellationToken cancellationToken = default)
    {
        var cacheKey = $"product:{id}";
        
        return await _cache.GetOrCreateAsync(cacheKey, async entry =>
        {
            entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(15);
            entry.SlidingExpiration = TimeSpan.FromMinutes(5);
            
            _logger.LogDebug("Loading product {ProductId} from database", id);
            return await _repository.GetByIdAsync(id, cancellationToken);
        }, cancellationToken);
    }
}

// Extension method for IDistributedCache
public static class DistributedCacheExtensions
{
    public static async Task<T> GetOrCreateAsync<T>(
        this IDistributedCache cache,
        string key,
        Func<DistributedCacheEntryOptions, Task<T>> factory,
        CancellationToken cancellationToken = default)
    {
        var cachedData = await cache.GetStringAsync(key, cancellationToken);
        
        if (cachedData != null)
        {
            return JsonSerializer.Deserialize<T>(cachedData);
        }
        
        var options = new DistributedCacheEntryOptions();
        var value = await factory(options);
        
        var jsonData = JsonSerializer.Serialize(value);
        await cache.SetStringAsync(key, jsonData, options, cancellationToken);
        
        return value;
    }
}

5.3 Cache Patterns

// Write-Through Cache Pattern
public class WriteThroughCache<TKey, TValue>
{
    private readonly IDistributedCache _cache;
    private readonly IRepository<TKey, TValue> _repository;
    private readonly TimeSpan _expiration;
    
    public WriteThroughCache(IDistributedCache cache, IRepository<TKey, TValue> repository, TimeSpan expiration)
    {
        _cache = cache;
        _repository = repository;
        _expiration = expiration;
    }
    
    public async Task<TValue> GetAsync(TKey key, CancellationToken cancellationToken = default)
    {
        var cacheKey = $"{typeof(TValue).Name}:{key}";
        var cached = await _cache.GetStringAsync(cacheKey, cancellationToken);
        
        if (cached != null)
            return JsonSerializer.Deserialize<TValue>(cached);
            
        var value = await _repository.GetAsync(key, cancellationToken);
        
        if (value != null)
        {
            await _cache.SetStringAsync(
                cacheKey, 
                JsonSerializer.Serialize(value), 
                new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = _expiration },
                cancellationToken);
        }
        
        return value;
    }
    
    public async Task SetAsync(TKey key, TValue value, CancellationToken cancellationToken = default)
    {
        // Write to database first
        await _repository.SetAsync(key, value, cancellationToken);
        
        // Then update cache
        var cacheKey = $"{typeof(TValue).Name}:{key}";
        await _cache.SetStringAsync(
            cacheKey,
            JsonSerializer.Serialize(value),
            new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = _expiration },
            cancellationToken);
    }
}

// Write-Behind (Write-Back) Cache Pattern
public class WriteBehindCache<TKey, TValue>
{
    private readonly IDistributedCache _cache;
    private readonly IRepository<TKey, TValue> _repository;
    private readonly ConcurrentQueue<WriteOperation> _writeQueue = new();
    private readonly Timer _flushTimer;
    private readonly int _batchSize;
    
    public WriteBehindCache(IDistributedCache cache, IRepository<TKey, TValue> repository, int flushIntervalSeconds = 30, int batchSize = 100)
    {
        _cache = cache;
        _repository = repository;
        _batchSize = batchSize;
        _flushTimer = new Timer(FlushQueue, null, TimeSpan.FromSeconds(flushIntervalSeconds), TimeSpan.FromSeconds(flushIntervalSeconds));
    }
    
    public async Task SetAsync(TKey key, TValue value, CancellationToken cancellationToken = default)
    {
        // Write to cache immediately
        var cacheKey = $"{typeof(TValue).Name}:{key}";
        await _cache.SetStringAsync(
            cacheKey,
            JsonSerializer.Serialize(value),
            new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1) },
            cancellationToken);
            
        // Queue for database write
        _writeQueue.Enqueue(new WriteOperation { Key = key, Value = value });
    }
    
    private async void FlushQueue(object state)
    {
        var batch = new List<WriteOperation>();
        
        while (batch.Count < _batchSize && _writeQueue.TryDequeue(out var operation))
        {
            batch.Add(operation);
        }
        
        if (batch.Any())
        {
            await _repository.BatchSetAsync(batch.ToDictionary(b => b.Key, b => b.Value));
        }
    }
    
    private class WriteOperation
    {
        public TKey Key { get; set; }
        public TValue Value { get; set; }
    }
}

6. Cache Invalidation Strategies

6.1 Time-Based Invalidation

public class TimeBasedCacheInvalidation
{
    private readonly IMemoryCache _cache;
    
    public void SetWithSlidingExpiration(string key, object value)
    {
        _cache.Set(key, value, new MemoryCacheEntryOptions
        {
            SlidingExpiration = TimeSpan.FromMinutes(5),
            AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
        });
    }
    
    public void SetWithAbsoluteExpiration(string key, object value)
    {
        // Cache expires at specific time
        var expirationTime = DateTime.UtcNow.Date.AddDays(1).AddHours(3); // 3 AM tomorrow
        _cache.Set(key, value, expirationTime - DateTime.UtcNow);
    }
}

6.2 Event-Based Invalidation (Pub/Sub)

public class CacheInvalidationService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ISubscriber _subscriber;
    private readonly IMemoryCache _memoryCache;
    
    public CacheInvalidationService(IConnectionMultiplexer redis, IMemoryCache memoryCache)
    {
        _redis = redis;
        _memoryCache = memoryCache;
        _subscriber = redis.GetSubscriber();
        
        // Subscribe to invalidation events
        _subscriber.Subscribe("cache:invalidate", (channel, message) =>
        {
            var key = message.ToString();
            _memoryCache.Remove(key);
        });
    }
    
    public async Task InvalidateAsync(string key)
    {
        // Invalidate local cache
        _memoryCache.Remove(key);
        
        // Publish invalidation to other instances
        await _subscriber.PublishAsync("cache:invalidate", key);
    }
}

6.3 Version-Based Invalidation

public class VersionedCacheService
{
    private readonly IDistributedCache _cache;
    private readonly IProductRepository _repository;
    
    public async Task<Product> GetProductAsync(int id)
    {
        var versionKey = $"product_version:{id}";
        var currentVersion = await _cache.GetStringAsync(versionKey) ?? "1";
        var cacheKey = $"product:{id}:v{currentVersion}";
        
        var cached = await _cache.GetStringAsync(cacheKey);
        if (cached != null)
            return JsonSerializer.Deserialize<Product>(cached);
            
        var product = await _repository.GetByIdAsync(id);
        
        // Store with version
        await _cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(product));
        
        return product;
    }
    
    public async Task UpdateProductAsync(Product product)
    {
        await _repository.UpdateAsync(product);
        
        // Increment version
        var versionKey = $"product_version:{product.Id}";
        var currentVersion = await _cache.GetStringAsync(versionKey) ?? "1";
        var newVersion = (int.Parse(currentVersion) + 1).ToString();
        
        await _cache.SetStringAsync(versionKey, newVersion);
        
        // Old cache entries with old version are now stale
    }
}

7. Performance Optimization Techniques

7.1 Batched Operations

public class BatchedDataService
{
    private readonly IProductRepository _repository;
    private readonly List<int> _pendingIds = new();
    private readonly object _lock = new();
    private readonly Timer _batchTimer;
    private readonly int _batchSize;
    
    public BatchedDataService(IProductRepository repository, int batchSize = 50, int batchIntervalMs = 100)
    {
        _repository = repository;
        _batchSize = batchSize;
        _batchTimer = new Timer(ProcessBatch, null, batchIntervalMs, batchIntervalMs);
    }
    
    public async Task<Product> GetProductAsync(int id)
    {
        lock (_lock)
        {
            _pendingIds.Add(id);
            
            if (_pendingIds.Count >= _batchSize)
            {
                ProcessBatch(null);
            }
        }
        
        // Wait for batch processing
        return await WaitForProductAsync(id);
    }
    
    private async void ProcessBatch(object state)
    {
        List<int> ids;
        lock (_lock)
        {
            ids = _pendingIds.ToList();
            _pendingIds.Clear();
        }
        
        if (ids.Any())
        {
            var products = await _repository.GetByIdsAsync(ids);
            // Store in local cache
        }
    }
}

7.2 Connection Pooling

public class HttpClientPool
{
    private readonly ConcurrentBag<HttpClient> _clients = new();
    private readonly SemaphoreSlim _semaphore;
    private readonly int _maxConnections;
    
    public HttpClientPool(int maxConnections = 10)
    {
        _maxConnections = maxConnections;
        _semaphore = new SemaphoreSlim(maxConnections, maxConnections);
        
        // Pre-create connections
        for (int i = 0; i < maxConnections; i++)
        {
            _clients.Add(CreateHttpClient());
        }
    }
    
    public async Task<PooledHttpClient> GetClientAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken);
        
        if (_clients.TryTake(out var client))
        {
            return new PooledHttpClient(client, this);
        }
        
        // Fallback
        client = CreateHttpClient();
        return new PooledHttpClient(client, this);
    }
    
    private void ReturnClient(HttpClient client)
    {
        _clients.Add(client);
        _semaphore.Release();
    }
    
    private HttpClient CreateHttpClient()
    {
        var handler = new SocketsHttpHandler
        {
            PooledConnectionLifetime = TimeSpan.FromMinutes(5),
            PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2),
            MaxConnectionsPerServer = 10,
            EnableMultipleHttp2Connections = true
        };
        
        return new HttpClient(handler);
    }
    
    private class PooledHttpClient : IDisposable
    {
        private readonly HttpClient _client;
        private readonly HttpClientPool _pool;
        
        public PooledHttpClient(HttpClient client, HttpClientPool pool)
        {
            _client = client;
            _pool = pool;
        }
        
        public HttpClient Client => _client;
        
        public void Dispose()
        {
            _pool.ReturnClient(_client);
        }
    }
}

8. Deadlock Prevention and Detection

8.1 Deadlock Detection

public class DeadlockDetector
{
    private readonly ConcurrentDictionary<string, LockInfo> _activeLocks = new();
    private readonly ILogger<DeadlockDetector> _logger;
    private readonly Timer _monitorTimer;
    
    public DeadlockDetector(ILogger<DeadlockDetector> logger)
    {
        _logger = logger;
        _monitorTimer = new Timer(DetectDeadlocks, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
    }
    
    public IDisposable AcquireLock(string resourceId, string threadId)
    {
        var lockInfo = new LockInfo
        {
            ResourceId = resourceId,
            ThreadId = threadId,
            AcquiredAt = DateTime.UtcNow,
            StackTrace = Environment.StackTrace
        };
        
        if (!_activeLocks.TryAdd(resourceId, lockInfo))
        {
            throw new InvalidOperationException($"Resource {resourceId} already locked");
        }
        
        return new LockReleaser(this, resourceId);
    }
    
    private void DetectDeadlocks(object state)
    {
        var now = DateTime.UtcNow;
        
        foreach (var lockInfo in _activeLocks.Values)
        {
            var duration = now - lockInfo.AcquiredAt;
            
            if (duration > TimeSpan.FromMinutes(5))
            {
                _logger.LogWarning("Potential deadlock detected on resource {ResourceId} held for {Duration}",
                    lockInfo.ResourceId, duration);
                    
                // Log stack trace for debugging
                _logger.LogDebug("Lock held by thread {ThreadId}:\n{StackTrace}",
                    lockInfo.ThreadId, lockInfo.StackTrace);
            }
        }
    }
    
    private class LockReleaser : IDisposable
    {
        private readonly DeadlockDetector _detector;
        private readonly string _resourceId;
        
        public LockReleaser(DeadlockDetector detector, string resourceId)
        {
            _detector = detector;
            _resourceId = resourceId;
        }
        
        public void Dispose()
        {
            _detector._activeLocks.TryRemove(_resourceId, out _);
        }
    }
    
    private class LockInfo
    {
        public string ResourceId { get; set; }
        public string ThreadId { get; set; }
        public DateTime AcquiredAt { get; set; }
        public string StackTrace { get; set; }
    }
}

8.2 Deadlock Prevention Strategies

public class DeadlockSafeTransaction
{
    public async Task TransferMoneyAsync(
        IAccountRepository repository, 
        int fromId, 
        int toId, 
        decimal amount,
        ILogger logger)
    {
        // Always acquire locks in consistent order
        var firstId = Math.Min(fromId, toId);
        var secondId = Math.Max(fromId, toId);
        
        using var tx = await repository.BeginTransactionAsync();
        
        try
        {
            var firstAccount = await repository.GetWithLockAsync(firstId);
            var secondAccount = await repository.GetWithLockAsync(secondId);
            
            // Now we have both locks safely
            if (firstAccount.Id == fromId)
            {
                firstAccount.Withdraw(amount);
                secondAccount.Deposit(amount);
            }
            else
            {
                secondAccount.Withdraw(amount);
                firstAccount.Deposit(amount);
            }
            
            await tx.CommitAsync();
        }
        catch (Exception ex)
        {
            await tx.RollbackAsync();
            logger.LogError(ex, "Transaction failed");
            throw;
        }
    }
    
    public async Task TransferWithTimeoutAsync(
        IAccountRepository repository,
        int fromId,
        int toId,
        decimal amount,
        TimeSpan timeout)
    {
        using var cts = new CancellationTokenSource(timeout);
        
        try
        {
            await TransferMoneyAsync(repository, fromId, toId, amount, null, cts.Token);
        }
        catch (OperationCanceledException)
        {
            throw new TimeoutException("Transaction timed out");
        }
    }
}

9. Performance Monitoring and Metrics

public class PerformanceMetricsService
{
    private readonly ILogger<PerformanceMetricsService> _logger;
    private readonly ConcurrentDictionary<string, Metrics> _metrics = new();
    
    public PerformanceMetricsService(ILogger<PerformanceMetricsService> logger)
    {
        _logger = logger;
    }
    
    public IDisposable MeasureOperation(string operationName)
    {
        return new OperationMeasurer(this, operationName);
    }
    
    public void RecordExecutionTime(string operationName, TimeSpan duration)
    {
        var metrics = _metrics.GetOrAdd(operationName, _ => new Metrics());
        metrics.AddExecutionTime(duration);
        
        // Log slow operations
        if (duration > TimeSpan.FromSeconds(1))
        {
            _logger.LogWarning("Slow operation {Operation} took {Duration}ms", 
                operationName, duration.TotalMilliseconds);
        }
    }
    
    public void RecordError(string operationName, Exception ex)
    {
        var metrics = _metrics.GetOrAdd(operationName, _ => new Metrics());
        metrics.RecordError(ex.GetType().Name);
        
        _logger.LogError(ex, "Error in operation {Operation}", operationName);
    }
    
    public Dictionary<string, MetricsSummary> GetMetrics()
    {
        return _metrics.ToDictionary(
            m => m.Key,
            m => m.Value.GetSummary());
    }
    
    private class OperationMeasurer : IDisposable
    {
        private readonly PerformanceMetricsService _service;
        private readonly string _operationName;
        private readonly Stopwatch _stopwatch;
        
        public OperationMeasurer(PerformanceMetricsService service, string operationName)
        {
            _service = service;
            _operationName = operationName;
            _stopwatch = Stopwatch.StartNew();
        }
        
        public void Dispose()
        {
            _stopwatch.Stop();
            _service.RecordExecutionTime(_operationName, _stopwatch.Elapsed);
        }
    }
    
    private class Metrics
    {
        private long _totalCalls;
        private long _totalTimeMs;
        private long _minTimeMs = long.MaxValue;
        private long _maxTimeMs;
        private readonly ConcurrentDictionary<string, long> _errors = new();
        
        public void AddExecutionTime(TimeSpan duration)
        {
            Interlocked.Increment(ref _totalCalls);
            Interlocked.Add(ref _totalTimeMs, (long)duration.TotalMilliseconds);
            
            var ms = (long)duration.TotalMilliseconds;
            
            // Update min
            var currentMin = _minTimeMs;
            while (ms < currentMin)
            {
                var original = Interlocked.CompareExchange(ref _minTimeMs, ms, currentMin);
                if (original == currentMin) break;
                currentMin = original;
            }
            
            // Update max
            var currentMax = _maxTimeMs;
            while (ms > currentMax)
            {
                var original = Interlocked.CompareExchange(ref _maxTimeMs, ms, currentMax);
                if (original == currentMax) break;
                currentMax = original;
            }
        }
        
        public void RecordError(string errorType)
        {
            _errors.AddOrUpdate(errorType, 1, (_, count) => count + 1);
        }
        
        public MetricsSummary GetSummary()
        {
            var totalCalls = Interlocked.Read(ref _totalCalls);
            var totalTimeMs = Interlocked.Read(ref _totalTimeMs);
            
            return new MetricsSummary
            {
                TotalCalls = totalCalls,
                AverageTimeMs = totalCalls > 0 ? totalTimeMs / totalCalls : 0,
                MinTimeMs = _minTimeMs == long.MaxValue ? 0 : _minTimeMs,
                MaxTimeMs = _maxTimeMs,
                Errors = _errors.ToDictionary(e => e.Key, e => e.Value)
            };
        }
    }
    
    public class MetricsSummary
    {
        public long TotalCalls { get; set; }
        public long AverageTimeMs { get; set; }
        public long MinTimeMs { get; set; }
        public long MaxTimeMs { get; set; }
        public Dictionary<string, long> Errors { get; set; }
    }
}

// Usage
public class OrderServiceWithMetrics
{
    private readonly PerformanceMetricsService _metrics;
    
    public OrderServiceWithMetrics(PerformanceMetricsService metrics)
    {
        _metrics = metrics;
    }
    
    public async Task<Order> CreateOrderAsync(OrderRequest request)
    {
        using (_metrics.MeasureOperation("CreateOrder"))
        {
            try
            {
                // Business logic
                var order = new Order();
                await Task.Delay(100);
                return order;
            }
            catch (Exception ex)
            {
                _metrics.RecordError("CreateOrder", ex);
                throw;
            }
        }
    }
}

10. Summary and Best Practices

10.1 Concurrency Best Practices

  • Choose the right synchronization primitive: lock for simple scenarios, ReaderWriterLockSlim for read-heavy, SemaphoreSlim for resource pooling
  • Avoid lock(this), lock(typeof(MyClass)), lock("string") - Use private readonly objects
  • Always acquire locks in consistent order to prevent deadlocks
  • Use timeout mechanisms for lock acquisition
  • Prefer Concurrent collections over manual synchronization
  • Use async/await with SemaphoreSlim.WaitAsync instead of blocking

10.2 Caching Best Practices

  • Set appropriate expiration policies - Absolute vs Sliding
  • Implement cache invalidation - Don't rely solely on time-based expiration
  • Use distributed cache for multi-instance deployments
  • Monitor cache hit rates to optimize caching strategy
  • Implement cache-aside pattern with retry logic
  • Consider write-through/write-behind for critical data

10.3 Performance Optimization Checklist

  • ✅ Use connection pooling for database and HTTP clients
  • ✅ Implement batching for high-frequency operations
  • ✅ Cache expensive computations
  • ✅ Use async I/O to free threads
  • ✅ Monitor and log slow operations
  • ✅ Implement circuit breaker patterns for external dependencies
  • ✅ Use compression for large payloads
  • ✅ Profile and optimize hot paths

📝 Practice Exercises:

  1. Implement a thread-safe producer-consumer queue using BlockingCollection
  2. Create a distributed lock using Redis with automatic renewal (lease)
  3. Build a cache-aside service with memory and Redis fallback layers
  4. Implement deadlock detection with graph analysis
  5. Create a performance metrics dashboard with percentile calculations
  6. Build a write-behind cache for high-throughput data ingestion
  7. Implement optimistic concurrency with version numbers in EF Core

Happy Coding! 🚀

إرسال تعليق

Feel free to ask your query...
Cookie Consent
We serve cookies on this site to analyze traffic, remember your preferences, and optimize your experience.
Oops!
It seems there is something wrong with your internet connection. Please connect to the internet and start browsing again.
AdBlock Detected!
We have detected that you are using adblocking plugin in your browser.
The revenue we earn by the advertisements is used to manage this website, we request you to whitelist our website in your adblocking plugin.
Site is Blocked
Sorry! This site is not available in your country.