dotnet distributed cache

did you ever think about what actually does distributed cache brings?

G loadbalancer loadbalancer app1 app1 loadbalancer->app1 app2 app2 loadbalancer->app2 cache cache app1->cache app2->cache

to me it seems like it is not an distributed cache but rather single point of failure

so here is an challange - lets implement our own distributed cache

cache interface

so in general our cache should be something like:

public interface INoCache<TKey, TVal> where TKey: IEquatable<TKey> where TVal: class
{
    TVal? Get(TKey key);
    void Set(TKey key, TVal val, TimeSpan? expire);
    void Remove(TKey key);
}

cache impl

aka nothing special here, simple interface, which we may implement like:

public class NoCache<TKey, TVal> : INoCache<TKey, TVal> where TKey : IEquatable<TKey> where TVal : class
{
    private readonly TimeSpan _timeSpan = TimeSpan.FromMinutes(10);
    private readonly IDictionary<TKey, Tuple<TVal, DateTime>> _dictionary = new Dictionary<TKey, Tuple<TVal, DateTime>>();
    private readonly ILogger<NoCache<TKey, TVal>> _logger;
    private Timer? _timer = null;

    public NoCache(ILogger<NoCache<TKey, TVal>> logger)
    {
        _logger = logger;
        _timer = new Timer(RemoveExpiredItems, null, TimeSpan.Zero, TimeSpan.FromMinutes(1));
    }

    public TVal? Get(TKey key)
    {
        if (!_dictionary.ContainsKey(key))
        {
            return null;
        }

        if (_dictionary[key].Item2 < DateTime.Now)
        {
            _dictionary.Remove(key);
            return null;
        }
        
        return _dictionary[key].Item1;
    }

    public void Set(TKey key, TVal val, TimeSpan? expire = null)
    {
        _dictionary[key] = new Tuple<TVal, DateTime>(val, DateTime.Now.Add(expire ?? _timeSpan));
    }

    public void Remove(TKey key)
    {
        _dictionary.Remove(key);
    }
    
    private void RemoveExpiredItems(object? state)
    {
        foreach (var key in _dictionary.Keys)
        {
            if (_dictionary[key].Item2 < DateTime.Now)
            {
                _dictionary.Remove(key);
                _logger.LogInformation("Timer removing expired {Key}", key);
            }
        }
    }
}

cache usage

to use it we gonna need some fake api, aka:

app.MapGet("/", ([FromServices] NoCache<int, Player> players) =>
{
    var player = players.Get(1);
    if (player != null) return player;
    player = new Player(1, "Player Number One", 1);
    players.Set(player.Id, player);
    return player;
});

app.MapGet("/{id:int}", ([FromServices] NoCache<int, Player> players, int id) =>
{
    var player = players.Get(id);
    if (player != null) return player;
    player = new Player(id, $"Player {id}", id);
    players.Set(player.Id, player);
    return player;
});

we are pretending like if it was real service talking to some kind of database and caching responses

distributed cache

and now is a fun part we will have something like this:

G LB LB app1 app1 LB->app1 app2 app2 LB->app2 app1->app2

idea here is that we somehow should notify other instances of app whenever something changed to cached items

to do so we will catch such changes into queue and in background notify all instances

here is the end result:

using System.Net;
using Microsoft.AspNetCore.Mvc;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton<IDateTimeProvider, DateTimeProvider>();
builder.Services.AddSingleton<NoCache<int, Player>>();
builder.Services.AddHostedService(p => p.GetRequiredService<NoCache<int, Player>>());

var app = builder.Build();

app.MapGet("/", ([FromServices] NoCache<int, Player> players) =>
{
    var player = players.Get(1);
    if (player != null) return player;
    player = new Player(1, "Player Number One", 1);
    players.Set(player.Id, player);
    return player;
});

app.MapGet("/{id:int}", ([FromServices] NoCache<int, Player> players, int id) =>
{
    var player = players.Get(id);
    if (player != null) return player;
    player = new Player(id, $"Player {id}", id);
    players.Set(player.Id, player);
    return player;
});

app.MapPost("/sync/players", app.Services.GetRequiredService<NoCache<int, Player>>().Sync);

app.Run();


public record Player(int Id, string Name, int Age);

public interface INoCache<TKey, TVal> where TKey: IEquatable<TKey> where TVal: class
{
    TVal? Get(TKey key);
    void Set(TKey key, TVal val, TimeSpan? expire);
    void Remove(TKey key);

    void Sync(NoCacheOperation<TKey, TVal> item);
}

public class NoCache<TKey, TVal> : BackgroundService, INoCache<TKey, TVal> where TKey : IEquatable<TKey> where TVal : class
{
    private readonly TimeSpan _timeSpan = TimeSpan.FromMinutes(10);
    private readonly SemaphoreSlim _semaphore = new(0);
    private readonly Queue<NoCacheOperation<TKey, TVal>> _queue = new();
    private readonly IDictionary<TKey, Tuple<TVal, DateTime>> _dictionary = new Dictionary<TKey, Tuple<TVal, DateTime>>();

    private readonly IDateTimeProvider _dateTimeProvider;
    private readonly ILogger<NoCache<TKey, TVal>> _logger;
    private Timer? _timer = null;

    public NoCache(IDateTimeProvider dateTimeProvider, ILogger<NoCache<TKey, TVal>> logger)
    {
        _dateTimeProvider = dateTimeProvider;
        _logger = logger;
        _timer = new Timer(RemoveExpiredItems, null, TimeSpan.Zero, TimeSpan.FromMinutes(1));
    }

    public TVal? Get(TKey key)
    {
        if (!_dictionary.ContainsKey(key))
        {
            return null;
        }

        if (_dictionary[key].Item2 < _dateTimeProvider.Now)
        {
            _dictionary.Remove(key);
            return null;
        }
        
        return _dictionary[key].Item1;
    }

    public void Set(TKey key, TVal val, TimeSpan? expire = null)
    {
        Enqueue(NoCacheOperationEnum.Set, key, val);
        _dictionary[key] = new Tuple<TVal, DateTime>(val, _dateTimeProvider.Now.Add(expire ?? _timeSpan));
    }

    public void Remove(TKey key)
    {
        Enqueue(NoCacheOperationEnum.Remove, key);
        _dictionary.Remove(key);
    }
    
    private void Enqueue(NoCacheOperationEnum operation, TKey key, TVal? val = null)
    {
        _queue.Enqueue(new NoCacheOperation<TKey, TVal>(operation, key, val));
        _semaphore.Release();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Background service started");
        await _semaphore.WaitAsync(stoppingToken);
        _logger.LogInformation("Semaphore released");
        while (!stoppingToken.IsCancellationRequested)
        {
            if (!_queue.TryDequeue(out var item) || item == null) continue;
            
            _logger.LogInformation("Dequeued {Item}", item);
            
            // TODO: each T have its own endpoint, all T have same headless service, move to configuration
            var host = Environment.GetEnvironmentVariable("NOCACHE_SYNC_HOST");
            if (string.IsNullOrEmpty(host)) continue;
            var client = new HttpClient();
            var addresses = await Dns.GetHostAddressesAsync(host, stoppingToken);
            foreach (var address in addresses)
            {
                if (address.ToString().Equals(Environment.GetEnvironmentVariable("MY_POD_IP"))) continue;
                await client.PostAsJsonAsync($"http://{address}/sync/players", item, cancellationToken: stoppingToken);
                _logger.LogInformation("Sent {Item} to {Address}", item, address);
            }
        }
    }
    
    private void RemoveExpiredItems(object? state)
    {
        foreach (var key in _dictionary.Keys)
        {
            if (_dictionary[key].Item2 < _dateTimeProvider.Now)
            {
                _dictionary.Remove(key);
                _logger.LogInformation("Timer removing expired {Key}", key);
            }
        }
    }

    public void Sync(NoCacheOperation<TKey, TVal> item)
    {
        if (item.Operation == NoCacheOperationEnum.Remove && _dictionary.ContainsKey(item.Key))
        {
            _dictionary.Remove(item.Key);
            _logger.LogInformation("Synced removal of {Key}", item.Key);
        }

        if (item.Operation == NoCacheOperationEnum.Set && !_dictionary.ContainsKey(item.Key) && item.Value != null)
        {
            _dictionary[item.Key] = new Tuple<TVal, DateTime>(item.Value, DateTime.UtcNow.Add(_timeSpan));
            _logger.LogInformation("Synced addition of {Item}", item.Value);
        }
    }
}

public enum NoCacheOperationEnum { Set, Remove }
public record NoCacheOperation<TKey, TValue>(NoCacheOperationEnum Operation, TKey Key, TValue? Value);

public interface IDateTimeProvider
{
    DateTime Now { get; }
}

public class DateTimeProvider : IDateTimeProvider
{
    public DateTime Now => DateTime.UtcNow;
}

take a closer look at how we are syncing changes in Sync method

we are utilizing ability of DNS to return to us all ip addresses of all instances of our app

here are followup files to wire everything up

Dockerfile

FROM mcr.microsoft.com/dotnet/sdk:6.0 as build
WORKDIR /app

COPY *.csproj .
RUN dotnet restore

COPY . .
RUN dotnet publish -c Release -o publish


FROM mcr.microsoft.com/dotnet/aspnet:6.0
WORKDIR /app
RUN apt update && apt install -y netcat dnsutils curl iputils-ping
COPY --from=build /app/publish .
EXPOSE 80
ENV ASPNETCORE_ENVIRONMENT=Production
ENTRYPOINT [ "dotnet", "DistributedCache.dll"]

kube.yml

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nocache
  namespace: dev
  labels:
    app: nocache
  annotations:
    owner: [email protected]
spec:
  replicas: 2
  revisionHistoryLimit: 1
  selector:
    matchLabels:
      app: nocache
  template:
    metadata:
      labels:
        app: nocache
      annotations:
        owner: [email protected]
    spec:
      containers:
        - name: nocache
          image: gcr.io/majestic-cairn-171208/nocache:1
          imagePullPolicy: Always
          ports:
            - containerPort: 80
          env:
            - name: NOCACHE_SYNC_HOST
              value: nocacheheadless
            - name: MY_POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: poolDestination
                    operator: In
                    values:
                      - app
---
# Usual service, like everywhere else
apiVersion: v1
kind: Service
metadata:
  name: nocache
  namespace: dev
  labels:
    app: nocache
  annotations:
    owner: [email protected]
spec:
  type: ClusterIP
  ports:
    - port: 80
  selector:
    app: nocache
---
# Headless service
# the only difference, except name is "clusterIP: None" which makes service "headless"
# headless here means that there wont be a dedicated ip address to balance requests
# and instead dns will just return all alive po ip addresses
apiVersion: v1
kind: Service
metadata:
  name: nocacheheadless
  namespace: dev
  labels:
    app: nocache
  annotations:
    owner: [email protected]
spec:
  type: ClusterIP
  clusterIP: None
  ports:
    - port: 80
  selector:
    app: nocache
    
# kubectl apply -f kube.yml

# kubectl exec -it nocache-xxxxx -- bash

we are utilizing so called "headless" kubernetes service, it does mean, that inside kubernetes, whenever we are trying to resolve our service, we will receive all desired ip addresses which we may traverse

Notes:

  • yes, of course it may be done in parallel, also we should use keepalive connections and etc but for demo it is good enough already
  • alternatively we may talk directly to kubernetes api to get all instances but it will be much more complicated without any need, but on the other hand we can catch addition of new instancess and fullfill their cache
  • indeed there is possibility for race conditions - but that is a tradeoff in comparison to single point of failure (ok, you may deploy highly available cache - but how can you be sure it wont have same race conditions)