Azure Storage Tables mass cleanup unwanted records

Here is an use case - we have an pretty big table with bunch of unwanted records that we want to remove

The goal is to remove records older than X months

Doing thins one by one will be incredibly slow

But Azure Storage is designed in a way to handle bazillion of requests

So instead let's parallelize everything

Here is an conceptual flow

readers

First of all we may retriever records like so:

client.QueryAsync<TableEntity>("ParititonKey ge '0' and ParittionKey lt '1'")

which will receive only records where partition key starts from "0"

with this in place we may do something like:

var entities = new BlockingCollection<TableEntity>();

var filters = new [] {
  "ParititonKey ge '0' and ParittionKey lt '2'",
  "ParititonKey ge '2' and ParittionKey lt '4'",
  "ParititonKey ge '4' and ParittionKey lt '6'",
  "ParititonKey ge '8'"
};

Task.Run(() => filters.Select(filter => {
  await foreach (var entity in client.QueryAsync<TableEntity>(filter)) {
    entities.Add(entity);
  }
}))

as you can gues we are going to create four connections and read data in parallel, here you will notice slownes of your wifi faster than azure storage which will handle this easily

grouper

Even so responses have records sorted by partition key we still need to group them into batches by 100 items for further deletion

so our readers will push records to blocking collection and we will have dedicated task to process this collection, aka:

Task.Run(() =>
{
    var partitions = new ConcurrentDictionary<string, List<TableEntity>>();
    var prev = string.Empty;

    foreach (var entity in entities.GetConsumingEnumerable(token.Token))
    {
        if (prev == string.Empty)
        {
            prev = entity.PartitionKey;
        }

        partitions.AddOrUpdate(entity.PartitionKey,
            key => [entity],
            (key, list) =>
            {
                list.Add(entity);
                return list;
            });



        if (partitions[entity.PartitionKey].Count >= 100 || entity.PartitionKey != prev)
        {
            if (partitions.TryRemove(entity.PartitionKey, out var batch))
            {
                batches.Add(batch);
                prev = entity.PartitionKey;
            }
        }
    }

    foreach (var batch in partitions.Values)
    {
        batches.Add(batch);
    }

    batches.CompleteAdding();
});

remover

and finally we have our blocking collection with prepared batches for deletion which may be as easy as

Task.Run(() => Parallel.ForEach(Partitioner.Create(batches.GetConsumingEnumerable(token.Token), EnumerablePartitionerOptions.NoBuffering), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, async batch =>
{
    await client.SubmitTransactionAsync(batch.Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)), token.Token).ConfigureAwait(false);
}));

printer

just for fun we may want to collect some counters of how we are doing and print them in dedicated thread, in my case output is something like:

elapsed:       00:00:49
rps:              3,281
consumed:       161,000
skipped:        160,696
pushed:             304
partitions:          58
batched:            188
deleted:            188
failures:             0
memory:             110

With all this in place whole table was processed in hour or two, average consumtion speed was approx ~10K documents per seconds, in a minute this scrip did deleted almost ~100K docs

The interesting fact - replacing table entity with dedicated class tripples memory usage

Whenever we need to cleanup something big our goal is just to run it somewhere with ethernet connection, as close as possible to the storage and play with number of filters

From a runner perspective - cpu and disk are not a problem, the network utilization may become a bottleneck, but still you may run multiple such scripts in different places

Here is what I have end up with

using System.Collections.Concurrent;
using System.Diagnostics;
using Azure;
using Azure.Data.Tables; // dotnet add package Azure.Data.Tables

var token = new CancellationTokenSource();
var client = new TableClient(new Uri("https://my-awesome-app.table.core.windows.net/"), "Sample", new TableSharedKeyCredential("my-awesome-app", "xxxxxxxxxxxxxxxxx"));
var consumed = 0;
var skipped = 0;
var pushed = 0;
var batched = 0;
var deleted = 0;
var partitions = 0;
var failures = 0;
var entities = new BlockingCollection<Sample>();
var batches = new BlockingCollection<IList<Sample>>();
var filters = new[]
{
    "PartitionKey ge '0' and PartitionKey lt '5'",
    "PartitionKey ge '5'",

    // "PartitionKey ge '0' and PartitionKey lt '1' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '1' and PartitionKey lt '2' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '2' and PartitionKey lt '3' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '3' and PartitionKey lt '4' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '4' and PartitionKey lt '5' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '5' and PartitionKey lt '6' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '6' and PartitionKey lt '7' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '7' and PartitionKey lt '8' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '8' and PartitionKey lt '9' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",
    // "PartitionKey ge '9' and Timestamp lt datetime'2023-09-01T00:00:00.0000000Z'",

    // "PartitionKey ge '0' and PartitionKey lt '1'",
    // "PartitionKey ge '1' and PartitionKey lt '2'",
    // "PartitionKey ge '2' and PartitionKey lt '3'",
    // "PartitionKey ge '3' and PartitionKey lt '4'",
    // "PartitionKey ge '4' and PartitionKey lt '5'",
    // "PartitionKey ge '5' and PartitionKey lt '6'",
    // "PartitionKey ge '6' and PartitionKey lt '7'",
    // "PartitionKey ge '7' and PartitionKey lt '8'",
    // "PartitionKey ge '8' and PartitionKey lt '9'",
    // "PartitionKey ge '9'",
};


Console.Clear();
Console.CancelKeyPress += (sender, args) =>
{
    token.Cancel();
    Console.WriteLine();
    Console.WriteLine("Exiting...");
    Environment.Exit(0);
};

var readers = filters.Select(filter => Task.Run(async () =>
{
    await foreach (var entity in client.QueryAsync<Sample>(filter, 1000, ["PartitionKey", "RowKey", "Timestamp"], token.Token))
    {
        Interlocked.Increment(ref consumed);
        if (entity.Timestamp >= DateTime.UtcNow.Date.AddMonths(-3))
        {
            Interlocked.Increment(ref skipped);
            continue;
        }
        Interlocked.Increment(ref pushed);
        entities.Add(entity);
    }
}));

var grouper = Task.Run(() =>
{
    var dict = new ConcurrentDictionary<string, List<Sample>>();

    var prev = string.Empty;
    foreach (var entity in entities.GetConsumingEnumerable(token.Token))
    {
        if (prev == string.Empty)
        {
            prev = entity.PartitionKey;
        }

        dict.AddOrUpdate(entity.PartitionKey,
            key => {
                Interlocked.Increment(ref partitions);
                return [entity];
            },
            (key, list) =>
            {
                list.Add(entity);
                return list;
            });



        if (dict[entity.PartitionKey].Count >= 100 || entity.PartitionKey != prev)
        {
            if (dict.TryRemove(entity.PartitionKey, out var batch))
            {
                batches.Add(batch);
                prev = entity.PartitionKey;
                Interlocked.Increment(ref batched);
                Interlocked.Decrement(ref partitions);
            }
        }
    }

    foreach (var batch in dict.Values)
    {
        batches.Add(batch);
        Interlocked.Increment(ref batched);
        Interlocked.Decrement(ref partitions);
    }

    batches.CompleteAdding();
});

var remover = Task.Run(() => Parallel.ForEach(Partitioner.Create(batches.GetConsumingEnumerable(token.Token), EnumerablePartitionerOptions.NoBuffering), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, async batch =>
{
    try {
        await client.SubmitTransactionAsync(batch.Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)), token.Token).ConfigureAwait(false);
        Interlocked.Add(ref deleted, batch.Count);
    } catch(Exception ex) {
        Interlocked.Increment(ref failures);
    }
}));

var printer = Task.Run(() => {

    var timer = Stopwatch.StartNew();
    while(!token.IsCancellationRequested && !batches.IsCompleted)
    {
        var b = batches.IsCompleted ? "done" : "running";
        Console.SetCursorPosition(0, 0);
        Console.WriteLine(timer.Elapsed.ToString("'elapsed:       'hh':'mm':'ss"));
        Console.WriteLine($"rps:         {consumed / timer.Elapsed.TotalSeconds,10:N0}");
        Console.WriteLine($"consumed:    {consumed,10:N0}");
        Console.WriteLine($"skipped:     {skipped,10:N0}");
        Console.WriteLine($"pushed:      {pushed,10:N0}");
        Console.WriteLine($"partitions:  {partitions,10:N0}");
        Console.WriteLine($"batched:     {batched,10:N0}");
        Console.WriteLine($"deleted:     {deleted,10:N0}");
        Console.WriteLine($"failures:    {failures,10:N0}");
        Console.WriteLine($"memory:      {Process.GetCurrentProcess().WorkingSet64 / 1024 / 1024,10:N0}");
        Console.WriteLine($"batches: {b}");
        Thread.Sleep(1000);
    }

    Console.WriteLine();
    Console.WriteLine("Done");
});

await Task.WhenAll(readers);
entities.CompleteAdding();
await grouper;
await remover;
await printer;


record Sample: ITableEntity {
    public required string PartitionKey { get; set; }
    public required string RowKey { get; set; }
    public DateTimeOffset? Timestamp { get; set; }
    public ETag ETag { get; set; }
}

Ideally bunch of code may be removed with help of custom partitioner or may be RX or even better some linq, but it was not the goal