Azure Storage Tables mass cleanup unwanted records
Here is an use case - we have an pretty big table with bunch of unwanted records that we want to remove
The goal is to remove records older than X months
Doing thins one by one will be incredibly slow
But Azure Storage is designed in a way to handle bazillion of requests
So instead let's parallelize everything
Here is an conceptual flow
readers
First of all we may retriever records like so:
client.QueryAsync<TableEntity>("ParititonKey ge '0' and ParittionKey lt '1'")
which will receive only records where partition key starts from "0"
with this in place we may do something like:
var entities = new BlockingCollection<TableEntity>();
var filters = new [] {
"ParititonKey ge '0' and ParittionKey lt '2'",
"ParititonKey ge '2' and ParittionKey lt '4'",
"ParititonKey ge '4' and ParittionKey lt '6'",
"ParititonKey ge '8'"
};
Task.Run(() => filters.Select(filter => {
await foreach (var entity in client.QueryAsync<TableEntity>(filter)) {
entities.Add(entity);
}
}))
as you can gues we are going to create four connections and read data in parallel, here you will notice slownes of your wifi faster than azure storage which will handle this easily
grouper
Even so responses have records sorted by partition key we still need to group them into batches by 100 items for further deletion
so our readers will push records to blocking collection and we will have dedicated task to process this collection, aka:
Task.Run(() =>
{
var partitions = new ConcurrentDictionary<string, List<TableEntity>>();
var prev = string.Empty;
foreach (var entity in entities.GetConsumingEnumerable(token.Token))
{
if (prev == string.Empty)
{
prev = entity.PartitionKey;
}
partitions.AddOrUpdate(entity.PartitionKey,
key => [entity],
(key, list) =>
{
list.Add(entity);
return list;
});
if (partitions[entity.PartitionKey].Count >= 100 || entity.PartitionKey != prev)
{
if (partitions.TryRemove(entity.PartitionKey, out var batch))
{
batches.Add(batch);
prev = entity.PartitionKey;
}
}
}
foreach (var batch in partitions.Values)
{
batches.Add(batch);
}
batches.CompleteAdding();
});
remover
and finally we have our blocking collection with prepared batches for deletion which may be as easy as
Task.Run(() => Parallel.ForEach(Partitioner.Create(batches.GetConsumingEnumerable(token.Token), EnumerablePartitionerOptions.NoBuffering), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, async batch =>
{
await client.SubmitTransactionAsync(batch.Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)), token.Token).ConfigureAwait(false);
}));
printer
just for fun we may want to collect some counters of how we are doing and print them in dedicated thread, in my case output is something like:
elapsed: 00:00:49
rps: 3,281
consumed: 161,000
skipped: 160,696
pushed: 304
partitions: 58
batched: 188
deleted: 188
failures: 0
memory: 110
With all this in place whole table was processed in hour or two, average consumtion speed was approx ~10K documents per seconds, in a minute this scrip did deleted almost ~100K docs
The interesting fact - replacing table entity with dedicated class tripples memory usage
Whenever we need to cleanup something big our goal is just to run it somewhere with ethernet connection, as close as possible to the storage and play with number of filters
From a runner perspective - cpu and disk are not a problem, the network utilization may become a bottleneck, but still you may run multiple such scripts in different places
Here is what I have end up with
using System.Collections.Concurrent;
using System.Diagnostics;
using Azure;
using Azure.Data.Tables; // dotnet add package Azure.Data.Tables
var token = new CancellationTokenSource();
var client = new TableClient(new Uri("https://my-awesome-app.table.core.windows.net/"), "Sample", new TableSharedKeyCredential("my-awesome-app", "xxxxxxxxxxxxxxxxx"));
var consumed = 0;
var skipped = 0;
var pushed = 0;
var batched = 0;
var deleted = 0;
var partitions = 0;
var failures = 0;
var entities = new BlockingCollection<Sample>();
var batches = new BlockingCollection<IList<Sample>>();
var filters = new[]
{
"PartitionKey ge '0' and PartitionKey lt '5'",
"PartitionKey ge '5'",
// "PartitionKey ge '0' and PartitionKey lt '1' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '1' and PartitionKey lt '2' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '2' and PartitionKey lt '3' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '3' and PartitionKey lt '4' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '4' and PartitionKey lt '5' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '5' and PartitionKey lt '6' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '6' and PartitionKey lt '7' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '7' and PartitionKey lt '8' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '8' and PartitionKey lt '9' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '9' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
// "PartitionKey ge '0' and PartitionKey lt '1'",
// "PartitionKey ge '1' and PartitionKey lt '2'",
// "PartitionKey ge '2' and PartitionKey lt '3'",
// "PartitionKey ge '3' and PartitionKey lt '4'",
// "PartitionKey ge '4' and PartitionKey lt '5'",
// "PartitionKey ge '5' and PartitionKey lt '6'",
// "PartitionKey ge '6' and PartitionKey lt '7'",
// "PartitionKey ge '7' and PartitionKey lt '8'",
// "PartitionKey ge '8' and PartitionKey lt '9'",
// "PartitionKey ge '9'",
};
Console.Clear();
Console.CancelKeyPress += (sender, args) =>
{
token.Cancel();
Console.WriteLine();
Console.WriteLine("Exiting...");
Environment.Exit(0);
};
var readers = filters.Select(filter => Task.Run(async () =>
{
await foreach (var entity in client.QueryAsync<Sample>(filter, 1000, ["PartitionKey", "RowKey", "Timestamp"], token.Token))
{
Interlocked.Increment(ref consumed);
if (entity.Timestamp >= DateTime.UtcNow.Date.AddMonths(-3))
{
Interlocked.Increment(ref skipped);
continue;
}
Interlocked.Increment(ref pushed);
entities.Add(entity);
}
}));
var grouper = Task.Run(() =>
{
var dict = new ConcurrentDictionary<string, List<Sample>>();
var prev = string.Empty;
foreach (var entity in entities.GetConsumingEnumerable(token.Token))
{
if (prev == string.Empty)
{
prev = entity.PartitionKey;
}
dict.AddOrUpdate(entity.PartitionKey,
key => {
Interlocked.Increment(ref partitions);
return [entity];
},
(key, list) =>
{
list.Add(entity);
return list;
});
if (dict[entity.PartitionKey].Count >= 100 || entity.PartitionKey != prev)
{
if (dict.TryRemove(entity.PartitionKey, out var batch))
{
batches.Add(batch);
prev = entity.PartitionKey;
Interlocked.Increment(ref batched);
Interlocked.Decrement(ref partitions);
}
}
}
foreach (var batch in dict.Values)
{
batches.Add(batch);
Interlocked.Increment(ref batched);
Interlocked.Decrement(ref partitions);
}
batches.CompleteAdding();
});
var remover = Task.Run(() => Parallel.ForEach(Partitioner.Create(batches.GetConsumingEnumerable(token.Token), EnumerablePartitionerOptions.NoBuffering), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, async batch =>
{
try {
await client.SubmitTransactionAsync(batch.Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)), token.Token).ConfigureAwait(false);
Interlocked.Add(ref deleted, batch.Count);
} catch(Exception ex) {
Interlocked.Increment(ref failures);
}
}));
var printer = Task.Run(() => {
var timer = Stopwatch.StartNew();
while(!token.IsCancellationRequested && !batches.IsCompleted)
{
var b = batches.IsCompleted ? "done" : "running";
Console.SetCursorPosition(0, 0);
Console.WriteLine(timer.Elapsed.ToString("'elapsed: 'hh':'mm':'ss"));
Console.WriteLine($"rps: {consumed / timer.Elapsed.TotalSeconds,10:N0}");
Console.WriteLine($"consumed: {consumed,10:N0}");
Console.WriteLine($"skipped: {skipped,10:N0}");
Console.WriteLine($"pushed: {pushed,10:N0}");
Console.WriteLine($"partitions: {partitions,10:N0}");
Console.WriteLine($"batched: {batched,10:N0}");
Console.WriteLine($"deleted: {deleted,10:N0}");
Console.WriteLine($"failures: {failures,10:N0}");
Console.WriteLine($"memory: {Process.GetCurrentProcess().WorkingSet64 / 1024 / 1024,10:N0}");
Console.WriteLine($"batches: {b}");
Thread.Sleep(1000);
}
Console.WriteLine();
Console.WriteLine("Done");
});
await Task.WhenAll(readers);
entities.CompleteAdding();
await grouper;
await remover;
await printer;
record Sample: ITableEntity {
public required string PartitionKey { get; set; }
public required string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }
}
Ideally bunch of code may be removed with help of custom partitioner or may be RX or even better some linq, but it was not the goal