dotnet client side sql join

the most efficient way to transfer big amount of rows from any relational database to somewhere is not to compute everything in database but instead perform simples possible sequential reads from tables

with that cpu usage and traffic on database server will dramaticaly decreate

we had succesfull experience with spark sql, and here is an attempt to do somthing similar with dotnet

here is our domain: we have Resume entity which may have zero or infinite Experiences and Educations

we are reding all three tables in parallel, ordered by resume identifier, from database perspective it is simples possible operation

on our side we have buffers for all of them and whenever we read next row from any table we are checking if we got all required data for next resume and if so call Push method

idea here is as simple as following, imagine we have a stream of (where numbers are representing resume identifier):

  • resume: 1, 2, 3, 4, 5
  • experiences: 2, 2, 4, 5, 5
  • education: 1, 2, 2, 5

because we are ordering by resume identifier, while reading each table, we can answer the question if all data for concrete resume was retrieved, e.g. if we jumped from 2 to 4 while reading experiences we can definitely say that we have retvieved all experiences for resume 2

with that in place we can detect when we read all data for concrete resume

and thanks to that we wont utilize too much memory (only holding current ongoing data)

here is an example

using System.Collections.Concurrent;
using System.Data;
using System.Text.Json;
using Dapper;
using Microsoft.Data.Sqlite;

namespace joiner;

public static class Program
{
    private const string ConnectionString = "Data Source=hello.db;Pooling=False;Cache=Private";

    public static void Main()
    {
        // i'm here only for demo purposes, will recreate demo database
        Initializer.Initialize(new SqliteConnection(ConnectionString));

        // here is the main point - we are starting 3 threads reading three tables (dictionary tables are not used here because it is obvious what to do with them)
        Task.WaitAll(
            Task.Run(ReadResume),
            Task.Run(ReadEducation),
            Task.Run(ReadExperience)
        );
    }


    private static void Push(int id, string from)
    {
        if (!_cv.ContainsKey(id))
        {
            Console.WriteLine($"{id} - no resume {from}");
            return;
        }

        if (_edu.Keys.LastOrDefault() > id)
        {
            _edu.AddOrUpdate(id, _ => new BlockingCollection<Education>(), (_, list) => list);
            _edu[id].CompleteAdding();
        }
        if (!_edu.ContainsKey(id) || !_edu[id].IsAddingCompleted)
        {
            Console.WriteLine($"{id} - no edu {from} {string.Join(", ", _edu.Keys)}");
            return;
        }

        if (_exp.Keys.LastOrDefault() > id)
        {
            _exp.AddOrUpdate(id, _ => new BlockingCollection<Experience>(), (_, list) => list);
            _exp[id].CompleteAdding();
        }
        if (_exp.Keys.LastOrDefault() < id && (!_exp.ContainsKey(id) || !_exp[id].IsAddingCompleted))
        {
            Console.WriteLine($"{id} - no exp {from}");
            return;
        }

        if (_cv.TryRemove(id, out var cv) && _edu.TryRemove(id, out var edu) && _exp.TryRemove(id, out var exp))
        {
            Console.WriteLine(from + ": " + JsonSerializer.Serialize(new
            {
                cv,
                edu,
                exp,
            }));
        }
    }


    private static ConcurrentDictionary<int, Resume> _cv = new ();
    private static void ReadResume()
    {
        using var con = new SqliteConnection(ConnectionString);
        var rows = con.Query<Resume>("SELECT id AS Id, name AS Name, city_id AS CityId, rubric_id AS RubricId FROM resume ORDER BY id ASC", buffered: false);
        foreach (var row in rows)
        {
            _cv.AddOrUpdate(row.Id, _ => row, (_, _) => row);
            Push(row.Id, "ReadResume");
        }
    }

    private static ConcurrentDictionary<int, BlockingCollection<Education>> _edu = new ();
    private static void ReadEducation()
    {
        int? prev = null;
        using var con = new SqliteConnection(ConnectionString);
        var rows = con.Query<Education>("SELECT resume_id AS ResumeId, name AS Name FROM education ORDER BY resume_id ASC", buffered: false);
        foreach (var row in rows)
        {
            _edu.AddOrUpdate(row.ResumeId, _ => new BlockingCollection<Education> { row }, (_, list) =>
            {
                list.Add(row);
                return list;
            });

            if (prev != null && prev != row.ResumeId)
            {
                _edu[prev.Value].CompleteAdding();
                Push(prev.Value, "ReadEducation");
            }
            prev = row.ResumeId;
        }

        if (prev != null && _edu.ContainsKey(prev.Value))
        {
            _edu[prev.Value].CompleteAdding();
            Push(prev.Value, "ReadEducation");
        }
    }

    private static ConcurrentDictionary<int, BlockingCollection<Experience>> _exp = new ();
    private static void ReadExperience()
    {
        int? prev = null;
        using var con = new SqliteConnection(ConnectionString);
        var rows = con.Query<Experience>("SELECT resume_id AS ResumeId, name AS Name FROM experience ORDER BY resume_id ASC", buffered: false);
        foreach (var row in rows)
        {
            _exp.AddOrUpdate(row.ResumeId, _ => new BlockingCollection<Experience> { row }, (_, list) =>
            {
                list.Add(row);
                return list;
            });

            if (prev != null && prev != row.ResumeId)
            {
                _exp[prev.Value].CompleteAdding();
                Push(prev.Value, "ReadExperience");
            }

            prev = row.ResumeId;
        }

        if (prev != null && _edu.ContainsKey(prev.Value))
        {
            _exp[prev.Value].CompleteAdding();
            Push(prev.Value, "ReadExperience");
        }
    }
}

internal static class Initializer
{
    public static void Initialize(IDbConnection connection)
    {
        var random = new Random();
        connection.Open();


        connection.Exec("DROP TABLE IF EXISTS city");
        connection.Exec("CREATE TABLE IF NOT EXISTS city (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)");
        for (var i = 1; i <= 10; i++)
        {
            connection.Exec($"INSERT INTO city(id, name) VALUES ({i}, 'city {i}')");
        }


        connection.Exec("DROP TABLE IF EXISTS rubric");
        connection.Exec("CREATE TABLE IF NOT EXISTS rubric (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)");
        for (var i = 1; i <= 10; i++)
        {
            connection.Exec($"INSERT INTO rubric(id, name) VALUES ({i}, 'rubric {i}')");
        }


        connection.Exec("DROP TABLE IF EXISTS resume");
        connection.Exec("DROP TABLE IF EXISTS education");
        connection.Exec("DROP TABLE IF EXISTS experience");

        connection.Exec(@"CREATE TABLE IF NOT EXISTS resume (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, city_id INTEGER, rubric_id INTEGER)");
        connection.Exec(@"CREATE TABLE IF NOT EXISTS education (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, resume_id INTEGER)");
        connection.Exec(@"CREATE TABLE IF NOT EXISTS experience (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, resume_id INTEGER)");

        for (var i = 1; i <= 10; i++)
        {
            connection.Exec($"INSERT INTO resume(id, name, city_id, rubric_id) VALUES ({i}, 'resume {i}', {random.Next(1, 10)}, {random.Next(1, 10)})");

            if (i != 2)
            {
                for (var j = 1; j <= random.Next(1, 3); j++)
                {
                    connection.Exec($"INSERT INTO education(name, resume_id) VALUES ('education {j}', {i})");
                }
            }

            for (var j = 1; j <= random.Next(1, 3); j++)
            {
                connection.Exec($"INSERT INTO experience(name, resume_id) VALUES ('experience {j}', {i})");
            }
        }
    }

    private static void Exec(this IDbConnection connection, string sql)
    {
        var command = connection.CreateCommand();
        command.CommandText = sql;
        command.ExecuteNonQuery();
    }
}

public record City
{
    public int Id { get; init; }
    public string? Name { get; init; }
}

public record Rubric
{
    public int Id { get; init; }
    public string? Name { get; init; }
}

public record Resume
{
    public int Id { get; init; }
    public string? Name { get; init; }
    public int CityId { get; init; }
    public int RubricId { get; init; }
}

public record Education
{
    // public int Id { get; init; }
    public int ResumeId { get; init; }
    public string? Name { get; init; }
}

public record Experience
{
    // public int Id { get; init; }
    public int ResumeId { get; init; }
    public string? Name { get; init; }
}

unfortunately it does not seems to be possible to do it in some kind of usable generic way