dotnet distributed cache
did you ever think about what actually does distributed cache brings?
to me it seems like it is not an distributed cache but rather single point of failure
so here is an challange - lets implement our own distributed cache
cache interface
so in general our cache should be something like:
public interface INoCache<TKey, TVal> where TKey: IEquatable<TKey> where TVal: class
{
TVal? Get(TKey key);
void Set(TKey key, TVal val, TimeSpan? expire);
void Remove(TKey key);
}
cache impl
aka nothing special here, simple interface, which we may implement like:
public class NoCache<TKey, TVal> : INoCache<TKey, TVal> where TKey : IEquatable<TKey> where TVal : class
{
private readonly TimeSpan _timeSpan = TimeSpan.FromMinutes(10);
private readonly IDictionary<TKey, Tuple<TVal, DateTime>> _dictionary = new Dictionary<TKey, Tuple<TVal, DateTime>>();
private readonly ILogger<NoCache<TKey, TVal>> _logger;
private Timer? _timer = null;
public NoCache(ILogger<NoCache<TKey, TVal>> logger)
{
_logger = logger;
_timer = new Timer(RemoveExpiredItems, null, TimeSpan.Zero, TimeSpan.FromMinutes(1));
}
public TVal? Get(TKey key)
{
if (!_dictionary.ContainsKey(key))
{
return null;
}
if (_dictionary[key].Item2 < DateTime.Now)
{
_dictionary.Remove(key);
return null;
}
return _dictionary[key].Item1;
}
public void Set(TKey key, TVal val, TimeSpan? expire = null)
{
_dictionary[key] = new Tuple<TVal, DateTime>(val, DateTime.Now.Add(expire ?? _timeSpan));
}
public void Remove(TKey key)
{
_dictionary.Remove(key);
}
private void RemoveExpiredItems(object? state)
{
foreach (var key in _dictionary.Keys)
{
if (_dictionary[key].Item2 < DateTime.Now)
{
_dictionary.Remove(key);
_logger.LogInformation("Timer removing expired {Key}", key);
}
}
}
}
cache usage
to use it we gonna need some fake api, aka:
app.MapGet("/", ([FromServices] NoCache<int, Player> players) =>
{
var player = players.Get(1);
if (player != null) return player;
player = new Player(1, "Player Number One", 1);
players.Set(player.Id, player);
return player;
});
app.MapGet("/{id:int}", ([FromServices] NoCache<int, Player> players, int id) =>
{
var player = players.Get(id);
if (player != null) return player;
player = new Player(id, $"Player {id}", id);
players.Set(player.Id, player);
return player;
});
we are pretending like if it was real service talking to some kind of database and caching responses
distributed cache
and now is a fun part we will have something like this:
idea here is that we somehow should notify other instances of app whenever something changed to cached items
to do so we will catch such changes into queue and in background notify all instances
here is the end result:
using System.Net;
using Microsoft.AspNetCore.Mvc;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<IDateTimeProvider, DateTimeProvider>();
builder.Services.AddSingleton<NoCache<int, Player>>();
builder.Services.AddHostedService(p => p.GetRequiredService<NoCache<int, Player>>());
var app = builder.Build();
app.MapGet("/", ([FromServices] NoCache<int, Player> players) =>
{
var player = players.Get(1);
if (player != null) return player;
player = new Player(1, "Player Number One", 1);
players.Set(player.Id, player);
return player;
});
app.MapGet("/{id:int}", ([FromServices] NoCache<int, Player> players, int id) =>
{
var player = players.Get(id);
if (player != null) return player;
player = new Player(id, $"Player {id}", id);
players.Set(player.Id, player);
return player;
});
app.MapPost("/sync/players", app.Services.GetRequiredService<NoCache<int, Player>>().Sync);
app.Run();
public record Player(int Id, string Name, int Age);
public interface INoCache<TKey, TVal> where TKey: IEquatable<TKey> where TVal: class
{
TVal? Get(TKey key);
void Set(TKey key, TVal val, TimeSpan? expire);
void Remove(TKey key);
void Sync(NoCacheOperation<TKey, TVal> item);
}
public class NoCache<TKey, TVal> : BackgroundService, INoCache<TKey, TVal> where TKey : IEquatable<TKey> where TVal : class
{
private readonly TimeSpan _timeSpan = TimeSpan.FromMinutes(10);
private readonly SemaphoreSlim _semaphore = new(0);
private readonly Queue<NoCacheOperation<TKey, TVal>> _queue = new();
private readonly IDictionary<TKey, Tuple<TVal, DateTime>> _dictionary = new Dictionary<TKey, Tuple<TVal, DateTime>>();
private readonly IDateTimeProvider _dateTimeProvider;
private readonly ILogger<NoCache<TKey, TVal>> _logger;
private Timer? _timer = null;
public NoCache(IDateTimeProvider dateTimeProvider, ILogger<NoCache<TKey, TVal>> logger)
{
_dateTimeProvider = dateTimeProvider;
_logger = logger;
_timer = new Timer(RemoveExpiredItems, null, TimeSpan.Zero, TimeSpan.FromMinutes(1));
}
public TVal? Get(TKey key)
{
if (!_dictionary.ContainsKey(key))
{
return null;
}
if (_dictionary[key].Item2 < _dateTimeProvider.Now)
{
_dictionary.Remove(key);
return null;
}
return _dictionary[key].Item1;
}
public void Set(TKey key, TVal val, TimeSpan? expire = null)
{
Enqueue(NoCacheOperationEnum.Set, key, val);
_dictionary[key] = new Tuple<TVal, DateTime>(val, _dateTimeProvider.Now.Add(expire ?? _timeSpan));
}
public void Remove(TKey key)
{
Enqueue(NoCacheOperationEnum.Remove, key);
_dictionary.Remove(key);
}
private void Enqueue(NoCacheOperationEnum operation, TKey key, TVal? val = null)
{
_queue.Enqueue(new NoCacheOperation<TKey, TVal>(operation, key, val));
_semaphore.Release();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Background service started");
await _semaphore.WaitAsync(stoppingToken);
_logger.LogInformation("Semaphore released");
while (!stoppingToken.IsCancellationRequested)
{
if (!_queue.TryDequeue(out var item) || item == null) continue;
_logger.LogInformation("Dequeued {Item}", item);
// TODO: each T have its own endpoint, all T have same headless service, move to configuration
var host = Environment.GetEnvironmentVariable("NOCACHE_SYNC_HOST");
if (string.IsNullOrEmpty(host)) continue;
var client = new HttpClient();
var addresses = await Dns.GetHostAddressesAsync(host, stoppingToken);
foreach (var address in addresses)
{
if (address.ToString().Equals(Environment.GetEnvironmentVariable("MY_POD_IP"))) continue;
await client.PostAsJsonAsync($"http://{address}/sync/players", item, cancellationToken: stoppingToken);
_logger.LogInformation("Sent {Item} to {Address}", item, address);
}
}
}
private void RemoveExpiredItems(object? state)
{
foreach (var key in _dictionary.Keys)
{
if (_dictionary[key].Item2 < _dateTimeProvider.Now)
{
_dictionary.Remove(key);
_logger.LogInformation("Timer removing expired {Key}", key);
}
}
}
public void Sync(NoCacheOperation<TKey, TVal> item)
{
if (item.Operation == NoCacheOperationEnum.Remove && _dictionary.ContainsKey(item.Key))
{
_dictionary.Remove(item.Key);
_logger.LogInformation("Synced removal of {Key}", item.Key);
}
if (item.Operation == NoCacheOperationEnum.Set && !_dictionary.ContainsKey(item.Key) && item.Value != null)
{
_dictionary[item.Key] = new Tuple<TVal, DateTime>(item.Value, DateTime.UtcNow.Add(_timeSpan));
_logger.LogInformation("Synced addition of {Item}", item.Value);
}
}
}
public enum NoCacheOperationEnum { Set, Remove }
public record NoCacheOperation<TKey, TValue>(NoCacheOperationEnum Operation, TKey Key, TValue? Value);
public interface IDateTimeProvider
{
DateTime Now { get; }
}
public class DateTimeProvider : IDateTimeProvider
{
public DateTime Now => DateTime.UtcNow;
}
take a closer look at how we are syncing changes in Sync
method
we are utilizing ability of DNS to return to us all ip addresses of all instances of our app
here are followup files to wire everything up
Dockerfile
FROM mcr.microsoft.com/dotnet/sdk:6.0 as build
WORKDIR /app
COPY *.csproj .
RUN dotnet restore
COPY . .
RUN dotnet publish -c Release -o publish
FROM mcr.microsoft.com/dotnet/aspnet:6.0
WORKDIR /app
RUN apt update && apt install -y netcat dnsutils curl iputils-ping
COPY --from=build /app/publish .
EXPOSE 80
ENV ASPNETCORE_ENVIRONMENT=Production
ENTRYPOINT [ "dotnet", "DistributedCache.dll"]
kube.yml
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: nocache
namespace: dev
labels:
app: nocache
annotations:
owner: [email protected]
spec:
replicas: 2
revisionHistoryLimit: 1
selector:
matchLabels:
app: nocache
template:
metadata:
labels:
app: nocache
annotations:
owner: [email protected]
spec:
containers:
- name: nocache
image: gcr.io/majestic-cairn-171208/nocache:1
imagePullPolicy: Always
ports:
- containerPort: 80
env:
- name: NOCACHE_SYNC_HOST
value: nocacheheadless
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: poolDestination
operator: In
values:
- app
---
# Usual service, like everywhere else
apiVersion: v1
kind: Service
metadata:
name: nocache
namespace: dev
labels:
app: nocache
annotations:
owner: [email protected]
spec:
type: ClusterIP
ports:
- port: 80
selector:
app: nocache
---
# Headless service
# the only difference, except name is "clusterIP: None" which makes service "headless"
# headless here means that there wont be a dedicated ip address to balance requests
# and instead dns will just return all alive po ip addresses
apiVersion: v1
kind: Service
metadata:
name: nocacheheadless
namespace: dev
labels:
app: nocache
annotations:
owner: [email protected]
spec:
type: ClusterIP
clusterIP: None
ports:
- port: 80
selector:
app: nocache
# kubectl apply -f kube.yml
# kubectl exec -it nocache-xxxxx -- bash
we are utilizing so called "headless" kubernetes service, it does mean, that inside kubernetes, whenever we are trying to resolve our service, we will receive all desired ip addresses which we may traverse
Notes:
- yes, of course it may be done in parallel, also we should use keepalive connections and etc but for demo it is good enough already
- alternatively we may talk directly to kubernetes api to get all instances but it will be much more complicated without any need, but on the other hand we can catch addition of new instancess and fullfill their cache
- indeed there is possibility for race conditions - but that is a tradeoff in comparison to single point of failure (ok, you may deploy highly available cache - but how can you be sure it wont have same race conditions)