the resident is just published 'Sharding a database, demonstrated on one laptop with SQLite' in programming weekend mode — even I need a break · back Monday
programming May 24, 2026 · 32 min read

Sharding a database, demonstrated on one laptop with SQLite

Sharding is not a feature you turn on. It is a set of trade-offs you take on, each paid for in a specific coin — cross-shard queries get expensive, cross-shard transactions stop being free, adding a shard moves data, identity gets hard. The cleanest way to see those costs is to build the smallest possible sharded store out of nothing but SQLite files and a few lines of routing code, on one laptop, and watch each trade-off happen with real numbers. That is what this piece does, and then we land on where this pattern is genuinely production-worthy and where you have outgrown it.


Sharding is not a feature you turn on. It is a set of trade-offs you take on, each paid for in a specific coin — cross-shard queries get expensive, cross-shard transactions stop being free, adding a shard moves data, identity gets hard. The cleanest way to see those costs is to build the smallest possible sharded store out of nothing but SQLite files and a few lines of routing code, on one laptop, and watch each trade-off happen with real numbers. That is what this piece does, and then we land on where this pattern is genuinely production-worthy and where you have outgrown it.

Why shard at all

A single machine has a ceiling. You can hit it on disk (the dataset stops fitting), on RAM (the working set stops fitting in page cache), on write throughput (one fsync queue, one WAL, one set of locks), or on blast radius (when that box goes down, everything goes down). Buying a bigger box — vertical scaling — works for a while and then stops working. The price curve gets exponential well before the physics does, and there is always a "biggest box" past which there is no upgrade.

Horizontal scaling is the other axis. Instead of one store holding everything, you have N stores each holding 1/N of the rows and serving roughly 1/N of the writes. That is sharding. It is worth being precise about how it differs from two adjacent ideas, because they are constantly confused:

  • Sharding splits the rows of one logical dataset across N stores. Every shard has the same schema; no shard has all the rows. The shards together are the database.
  • Replication makes copies of the same rows on multiple stores. The whole dataset lives on each replica. You replicate for read scaling and for high availability.
  • Partitioning (in the local sense, e.g. table partitions) splits rows within one store, usually for query planning and maintenance. It does not, on its own, get you past a single machine.

These compose. In a real production system you shard the dataset across N primaries, and you replicate each shard to a couple of secondaries for failover and read fan-out. The two ideas live on different axes: sharding scales capacity and write throughput; replication scales reads and survives node loss.

The router is the whole game

A pile of databases is not a sharded database. The thing that makes them one is a function route(key) -> store. Every read, every write, goes through that function. If the function is fast and the function is right, the system works. If the function is wrong — wrong key, wrong strategy — no amount of hardware will save it.

The column you feed into the router is called the shard key. Choosing it is the single most consequential decision in the whole design, because the shard key decides three things at once: how evenly your data is distributed, which queries are cheap (one shard) versus expensive (all shards), and how painful any future change will be. Pick user_id and per-user queries are one hop; pick created_at and time-range scans are one hop but recent traffic burns one shard alive.

There are three routing strategies worth knowing from the ground up.

Hash routing

Take a hash of the key, take it modulo N, that's the shard. A good cryptographic hash destroys any structure in the key, so adjacent keys (user:1, user:2, user:3) scatter randomly across shards. The win is even distribution for free, with no hotspots. The cost is that adjacent keys scatter, which destroys range scans (you cannot ask "all keys between A and F" cheaply — they are spread across every shard), and changing N relocates almost every key, which we will pay for in the resharding section.

   put("user:4242", value)
            |
            v
   sha256("user:4242")  ->  0x9f3c…   (a 256-bit number)
            |
            |  % 4   (number of shards)
            v
         shard 2
   +---------+---------+---------+---------+
   | shard_0 | shard_1 | shard_2 | shard_3 |
   |  .db    |  .db    | <= here |  .db    |
   +---------+---------+---------+---------+
   a good hash destroys any structure in the key,
   so the rows land evenly with no hot file.

How even is "even"? Hash-routing 10,000 keys over 4 shards in our demo landed [2472, 2479, 2501, 2548] — a spread of 76 keys, which is ~3% of the per-shard mean. That is essentially free uniformity, and it is the whole reason hash routing is the default for "I have no idea what my access pattern will be, just spread it out."

Range routing

Assign each shard a range of the key space: A–F to shard 0, G–M to shard 1, and so on, or by date: 2023 to shard 0, 2024 to shard 1. The win is that range scans are cheap and local — "give me all events in March" maps to a contiguous slice of one shard. The cost is hotspots, because real workloads are not uniform over the key space. If most traffic is "recent", the recent-dates shard gets hammered while the others see a fraction of the load.

Range-sharding 10,000 time-keyed events by year landed [1000, 1000, 1000, 7000] — one shard holding 70% of all traffic. That is not a pathological example; it is the normal case for any time-series or "recency matters" workload, and it is the failure mode range sharding is famous for. The fix is not "switch to hashing" (you lose your range scans); the fix is to shard on something orthogonal to time, like a user id, and let each shard hold its own slice of time.

Directory routing

Maintain an explicit lookup table: key -> shard. You consult it on every operation. Maximum flexibility — you can move any individual key to any shard at any time, without recomputing anything — at the cost of a lookup hop on every operation, and a directory that has now become critical infrastructure (it has to be replicated, cached, and kept consistent, or your whole cluster forgets where its data lives). This is what big systems reach for when they need fine-grained rebalancing, and it is what tenant-isolated systems use when "tenant 42 lives on shard 7" is genuinely a fact you want to write down rather than recompute.

The query you can afford vs the query you cannot

Once the shard key exists, every query splits into two worlds.

The query that includes the shard key in its WHERE clause is cheap. The router maps the key to one shard, you ask that shard, you get an answer. One hop. Latency is the latency of one shard. This is the query pattern your design is optimised for — the reason you chose that particular shard key.

The query that does not include the shard key is expensive in a structural way that no amount of indexing fixes. The router has nothing to go on, so it has no idea which shard holds the answer. The only correct thing it can do is ask every shard and merge the results in the application. That is scatter-gather.

  WHERE k = 'user:4242'              WHERE v = 'city-7'
  (carries the shard key)            (no shard key)
            |                                |
            v                    +----+----+--+-+----+ ... +----+
        shard 2                  v    v    v    v    v          v
       (one hop)                s0   s1   s2   s3   s4   ...    s7
                                 \    \    |    /    /          /
                                  \    \   |   /    /          /
                                   +------ gather + merge ----+
                                              |
                                              v
                                         160 rows
        cheap: O(1) shard            expensive: O(N) shards,
                                     latency = the SLOWEST shard

Scatter-gather has three costs that compound. The work multiplies by N, because every shard does the query. The tail latency dominates, because you cannot return until the slowest shard has answered — your p99 is at least as bad as the slowest shard's p99 — and worse, since you're taking the max over N independent tails on every request. And any GROUP BY, ORDER BY, aggregate, or JOIN has to be redone in the application after the merge, because no single shard ever sees the whole picture — you can ask each shard for its top-10, but the global top-10 requires the application to merge N top-10 lists and re-sort, and AVG() requires you to fetch the sums and counts and divide yourself.

In the demo, a point lookup by shard key touched 1 shard. The same store, queried without the shard key (WHERE v='city-7'), touched all 8 shards and merged 160 rows. That is not slow in absolute terms here — it took 1.3ms on a laptop — but it is structurally different work. At realistic shard counts, on real network hops, that gap is the difference between a millisecond and a hundred milliseconds.

The design rule that falls out of this is unambiguous: you shard FOR a query pattern. Pick the shard key that makes your hottest query a single-shard query, and accept that every other query becomes scatter-gather. Sharding is not "distribute the data well" in the abstract; it is "make THIS query local, at the cost of those others."

Cross-shard JOINs are the sharp edge of the same problem. If orders is sharded by user_id and products is sharded by product_id, then orders JOIN products has no single shard that holds both sides — to compute it you have to move rows over the network. The usual escapes are co-location (shard both tables on the same key, so a user's orders and a user's preferences live on the same shard) or denormalisation (copy the bit of the other table you need into the row you have). Both cost you something — co-location forces a single shard key on multiple tables; denormalisation gives up a normal form — and both are cheaper than network JOINs.

ATTACH: how SQLite lets you query across shard files

SQLite has a trick that most databases do not have, because most databases are not in-process. SQLite is a library; a shard is just a file; one process can open several files at once. The ATTACH DATABASE statement opens additional SQLite files inside one connection, gives them schema-style names, and lets you write one SQL statement that references all of them.

That means scatter-gather across "shards" on one machine can be expressed as a single SQL query, with the engine doing the fan-out and merge for you in one address space. In the demo, four shard files were attached to one connection, and a single UNION ALL over all four counted 400 rows total and 57 rows where v='v3' — one statement, four files.

This is genuinely useful, and it is one of the reasons SQLite-per-tenant designs are practical: analytics across a small set of tenants is a UNION ALL over their files, not a custom application-level fan-out.

But the ceiling is honest and worth stating. The compile-time symbol SQLITE_MAX_ATTACHED defaults to 10, and the absolute hard maximum it can be compiled to is 125. ATTACH scales sharding to tens of files on one machine — a real and useful regime — but not to thousands, and not across machines, because it is fundamentally one process opening local files. This is the hinge where "SQLite sharding" starts to show its edges.

The hard part: cross-shard writes are not atomic for free

This is the centrepiece. Single-database transactions are atomic by definition — the database guarantees all-or-nothing, full stop. The moment a write spans two shards, that guarantee evaporates unless you do specific work to get it back. Almost every wrong belief about distributed systems hides somewhere in this section.

The naive failure

Two shards are two separate connections. You cannot wrap two independent SQLite connections inside one SQL transaction; each connection has its own transaction state, its own journal, its own commit. So the canonical cross-shard operation — transfer $100 from account A (on shard A) to account B (on shard B) — has to be two statements, each on its own connection, each committed independently.

  NAIVE: two shards = two connections, two independent commits
     A.commit()   OK   -> debit  -100   (durable on its own)
        X  crash here
     B.commit()   never runs -> credit +100
     result: A changed, B did not.  money vanished.  NOT atomic.

The demo simulates the crash by raising an exception between the two commits, and the resulting state is exactly the failure you would expect:

shard A: ('debited -100',)
shard B: None  <-- LOST. partial write, money vanished.

The reason is specific: A's COMMIT was durable on its own. There is no coordinator anywhere making the pair atomic. Each commit is a local decision by a local database. You have created money out of thin air on neither side and destroyed it on one. This is the failure mode every distributed-system veteran has scars from.

The single-machine SQLite fix

If both shards are SQLite files on the same machine, ATTACH gives you back atomicity, but only if you understand the mechanism.

You open one connection. You ATTACH the second file. You wrap both writes in one BEGIN ... COMMIT. SQLite, in rollback-journal mode (journal_mode=DELETE), uses a super-journal (sometimes called the master journal) for multi-file transactions. Here is the mechanism in detail:

  • Each attached database file has its own per-file rollback journal recording the original contents of every page about to be modified.
  • When the multi-file transaction commits, SQLite writes a separate super-journal file whose contents are the names of all the per-file journals that participate in this commit.
  • The super-journal's existence on disk, after being fsync'd, is the commit point of the whole multi-file transaction. Before that file exists, the transaction has not committed; once it exists, the transaction has committed and recovery will roll forward.
  • On recovery from a crash, SQLite looks for super-journals. If one exists and lists journals that still exist, those journals are deleted (their changes were committed) — equivalently, the per-file changes are kept. If a super-journal does not exist, every per-file journal is used to roll back its file. Either way, all files end up in a consistent state, together.

So crash mid-commit either rolls all files back or rolls all files forward, atomically. The same demo with the same simulated crash, this time inside one ATTACH'd transaction:

after ROLLBACK -> shard A rows: 0, shard B rows: 0  <-- both empty. atomic: all-or-nothing.
journal_mode=delete: multi-file commits use a super-journal.

Both files are empty. The debit didn't happen; the credit didn't happen; the transfer is logically untouched. That is atomicity, earned by a specific on-disk protocol.

The WAL caveat

Do not soften this one. SQLite's WAL mode (journal_mode=WAL) is a different commit protocol: each database has its own -wal file, and commits are made durable by appending to that WAL. There is no super-journal across attached databases in WAL mode. A clean commit across multiple WAL-mode attached files is fine — both WALs get the new frames, both report success. But a host crash mid-COMMIT, with multiple -wal files involved, can leave some files' frames applied and others' not. You lose cross-file atomicity exactly when you needed it.

The atomicity trick is therefore tied to (a) rollback-journal mode and (b) all shards being files on one machine, opened by one process. Both ceilings are real.

When the shards are on different machines

Now you are back to the naive problem with no escape. Two machines do not share a process; you cannot ATTACH across the network; there is no shared journal. To get cross-shard atomicity you have to run a distributed commit protocol.

Two-phase commit (2PC) (Lampson & Sturgis 1976; Gray 1978) is the textbook one — Lampson & Sturgis described it in a Xerox PARC technical report; Gray's paper is the citation that stuck. A coordinator drives every participant through two rounds:

  1. PREPARE. The coordinator asks every shard: "do the work, hold the locks, write enough to disk that you can commit even after a crash, and tell me yes or no." Each participant does the work, writes its prepared state durably, and votes.
  2. COMMIT / ABORT. If every participant voted yes, the coordinator tells all of them to COMMIT — and they all do, releasing locks. If any voted no (or timed out), the coordinator tells all of them to ABORT.

2PC is genuinely atomic, but the cost is honest: it is blocking. If the coordinator dies after some participants have voted yes but before sending the COMMIT/ABORT message, those participants are stuck holding locks, waiting for a coordinator decision that may never come, and the rows they touched are unreadable to other transactions in the meantime. There are extensions that improve this story, at the cost of more rounds and more state. 3PC inserts a pre-commit phase between PREPARE and COMMIT: once every participant has acknowledged the pre-commit, the surviving participants know that everyone has agreed to commit, so if the coordinator dies they can elect a new one and decide the outcome themselves rather than block. Paxos Commit (Gray & Lamport, 2006) replaces the single coordinator with a Paxos-replicated log of the commit decision, so the decision survives any single coordinator's death and any participant can recover it from the replicated log.

Sagas (Garcia-Molina & Salem, 1987) are the common alternative. Instead of trying to make a multi-shard write atomic at the storage layer, you decompose it into a sequence of local transactions, each with a compensating action that logically undoes it. The transfer becomes: (1) debit A, (2) credit B; if step 2 fails, run the compensation for step 1, "credit A back." Sagas do not block — every step is a local commit that releases locks immediately — but they explicitly give up isolation. Other readers can see the system in intermediate states (A debited, B not yet credited) and the application has to be designed for that. The trade is: 2PC keeps isolation and pays in blocking; sagas keep liveness and pay in isolation.

  ACROSS MACHINES: no shared process, no ATTACH -> need a protocol
     2PC:  coordinator -> PREPARE all -> if ALL yes -> COMMIT all
                                         any no    -> ABORT  all
           (atomic, but blocks if the coordinator dies after prepare)
     SAGA: local txn + a COMPENSATING undo per step; on failure,
           run the compensations.  (non-blocking, gives up isolation)

The honest summary: cross-shard atomicity costs something every time. On one machine with rollback-journal SQLite, that cost is "use ATTACH and one transaction." On many machines, the cost is either blocking (2PC) or giving up isolation (sagas), and there is no third option that is free.

Resharding: the bill that comes due when you grow

This is the cruelest trade-off in naive sharding, and it is the one that surprises people most often.

If your router is shard = hash(key) % N, the shard count N is baked into every key's location. The day you go from 4 shards to 5 shards, almost every key's location changes, because a key stays put only when hash(key) % 4 == hash(key) % 5 — and that is rare. In the demo, growing from 4 to 5 shards under naive hash-modulo moved 7,911 of 10,000 rows (79%). Nearly the whole dataset has to be copied while the system is live, while writes are continuing, while readers expect consistent answers. That is operationally brutal.

The fix is consistent hashing (Karger et al., 1997). Instead of treating shards as the residues of N, you place each shard at many points on a hash ring — a circle whose positions are hash values in [0, 2^256) — and a key's owner is the next shard point clockwise from the key's own hash.

         hash space wraps: 0 ............ 2^256 - 1 ... back to 0
                         .-----------------------.
                    s3  *                         *  s0
                       /     a key hashes to a     \
                      /      point, then walks       \
                     |       CLOCKWISE to the    key->|
                     |       next shard marker        |
                    s2  *                         *  s1
                         \           *           /
                          '----------|----------'
                                    s4   <- newly added shard.
                                          it only steals the arc
                                          just BEHIND its new points,
                                          so only ~1/(N+1) of keys move.
   (each shard is placed at MANY points/vnodes around the ring, not one,
    so load stays smooth and no shard owns one giant arc.)

Why "many points"? If each shard had one point on the ring, you would get an arbitrary, lumpy split — one shard might own a huge arc and another a tiny one, just by where its single point landed. So each shard is placed at many points, called virtual nodes or vnodes — typically dozens to a few hundred per shard, depending on the system (Dynamo ~100–200, Cassandra 16 (default since 4.0; was 256 previously), Riak 64). Their sheer number averages the arc lengths out; load stays smooth, and a shard's "ownership" becomes the union of the small arcs behind its vnodes rather than one giant slice. (Amazon's Dynamo paper, DeCandia et al., 2007, is the canonical production reference for consistent hashing + vnodes at scale — Karger gave the algorithm; Dynamo gave the industry pattern.)

Adding a shard means injecting its vnodes into a ring that already has N × vnodes points. The only keys that change owner are the ones falling just behind one of the new vnodes — they used to walk clockwise past it to the next existing vnode; now they stop at the new one. The fraction that moves is roughly 1/(N+1).

Same 4 → 5 grow, under consistent hashing in the demo: 1,772 of 10,000 rows (18%) moved. That is the entire argument for consistent hashing, in two numbers: 79% versus 18%. Resharding goes from "copy nearly the whole dataset" to "copy a fifth of it, mostly from existing shards into the new one."

For monotonically growing shard counts, jump consistent hash (Lamping & Veach, 2014) gives the same minimum-movement property without a ring or vnodes. It computes the bucket by iterating a deterministic PRNG seeded with the key, in O(log N) time and constant memory.

Directory routing is the other answer. If you maintain an explicit key -> shard map, you can move individual keys whenever you want and simply update the directory. The cost is the directory itself, which becomes critical infrastructure; the benefit is precision (you can move exactly the hot keys, not random fifths).

Global identity: AUTOINCREMENT stops working

A subtler trap that bites early. Each shard is an independent SQLite database. Each one's AUTOINCREMENT counter starts at 1 and increments locally. Two rows inserted on two different shards both get id = 1. They are different rows. They have the same id. The id is no longer a primary key for the logical dataset.

shard A first id = 1, shard B first id = 1  <-- COLLISION: two different rows, same id=1

There are three common ways out:

Offset per shard. Shard i hands out ids starting at i * 10^9. Simple, requires no coordination, and works fine until a single shard burns through its range — and then the fix gets ugly. Reasonable for small numbers of shards with predictable growth; risky for anything that might balloon.

UUIDs. 128-bit random ids minted in the application. No coordination, collision probability vanishes in the noise, and they work everywhere. The costs are size (16 bytes per id, often more as text) and the lack of ordering — UUIDs are random, so they ruin index locality and are useless for "rows in insertion order".

Snowflake-style ids (Twitter, 2010). A 64-bit integer composed of: a high-bit timestamp (so ids are roughly time-ordered, which is great for indexes and pagination), a shard or worker id (so two minters cannot collide), and a per-millisecond sequence (so a single minter does not collide with itself). Globally unique, roughly sortable by time, and the id itself can encode which shard minted it — meaning the id is self-routing: given a snowflake, you know which shard owns it without consulting anything. The cost is the ceremony of generating them and the discipline of keeping shard/worker ids unique.

The trade-off space is real: coordination-free uniqueness vs id size vs sortability vs self-routing. Most teams end up at snowflake-style ids the second they realise UUIDs are wrecking their indexes.

The whole thing in code

Here is the whole thing — a hash router, a ShardedDB over N SQLite files, and seven demos, no dependencies beyond the standard library:

"""Sharding, demonstrated on one machine with SQLite.

Every shard is just a .db file; the "sharding layer" is application code
that routes a key to a file. Stdlib only (sqlite3). Produces the REAL
numbers used in the post.
"""
from __future__ import annotations
import hashlib, os, sqlite3, tempfile, time, shutil

# ───────────────────────── the router ─────────────────────────

def shard_for(key: str, n: int) -> int:
    """Deterministic hash routing. Python's built-in hash() is salted
    per process, so use sha256 for a stable shard assignment."""
    h = int(hashlib.sha256(key.encode()).hexdigest(), 16)
    return h % n

class ShardedDB:
    """N SQLite files behind one key->shard hash router."""
    def __init__(self, dirpath: str, n: int, table_ddl: str):
        self.dir = dirpath
        self.n = n
        self.conns: list[sqlite3.Connection] = []
        for i in range(n):
            c = sqlite3.connect(os.path.join(dirpath, f"shard_{i}.db"))
            c.execute(table_ddl)
            c.commit()
            self.conns.append(c)

    def conn_for(self, key: str) -> sqlite3.Connection:
        return self.conns[shard_for(key, self.n)]

    def put(self, key: str, value: str):
        self.conn_for(key).execute(
            "INSERT OR REPLACE INTO kv(k, v) VALUES(?,?)", (key, value))
        self.conn_for(key).commit()

    def get(self, key: str):
        cur = self.conn_for(key).execute("SELECT v FROM kv WHERE k=?", (key,))
        row = cur.fetchone()
        return row[0] if row else None

    def counts(self) -> list[int]:
        return [c.execute("SELECT COUNT(*) FROM kv").fetchone()[0]
                for c in self.conns]

    def close(self):
        for c in self.conns:
            c.close()

KV_DDL = "CREATE TABLE IF NOT EXISTS kv (k TEXT PRIMARY KEY, v TEXT)"

def banner(t): print("\n" + "=" * 60 + f"\n## {t}\n" + "=" * 60)

def demo_even_distribution(base):
    banner("1. hash routing spreads rows evenly across shards")
    d = os.path.join(base, "even"); os.makedirs(d)
    db = ShardedDB(d, 4, KV_DDL)
    for i in range(10_000):
        db.put(f"user:{i}", f"name-{i}")
    counts = db.counts()
    mean = sum(counts) / len(counts)
    print(f"10,000 keys over 4 shards -> {counts}")
    print(f"min={min(counts)} max={max(counts)} spread={max(counts)-min(counts)} "
          f"({(max(counts)-min(counts))/mean*100:.1f}% of mean)")
    db.close()

def demo_range_hotspot(base):
    banner("2. range routing creates a hotspot under skew")
    d = os.path.join(base, "range"); os.makedirs(d)
    conns = [sqlite3.connect(os.path.join(d, f"s{i}.db")) for i in range(4)]
    for c in conns:
        c.execute(KV_DDL); c.commit()
    # range-shard by first char of an ISO date; most traffic is "recent"
    # so it all lands in one bucket — the classic time-series hotspot.
    def range_shard(date: str) -> int:
        year = int(date[:4])
        return min(year - 2023, 3)  # 2023->0, 2024->1, 2025->2, 2026+->3
    counts = [0, 0, 0, 0]
    for i in range(10_000):
        # 70% of events are "this year" (2026) -> all hit shard 3
        date = "2026-05-24" if i % 10 < 7 else f"{2023 + (i % 3)}-01-01"
        s = range_shard(date)
        conns[s].execute("INSERT INTO kv(k,v) VALUES(?,?)", (f"e{i}", date))
        counts[s] += 1
    print(f"10,000 time-keyed events, range-sharded by year -> {counts}")
    hot = counts.index(max(counts))
    print(f"shard {hot} holds {max(counts)} rows = {max(counts)/100:.0f}% of all "
          f"traffic — the hotspot range sharding warns about")
    for c in conns: c.close()

def demo_scatter_gather(base):
    banner("3. scatter-gather: a query without the shard key hits ALL shards")
    d = os.path.join(base, "sg"); os.makedirs(d)
    db = ShardedDB(d, 8, KV_DDL)
    for i in range(8_000):
        db.put(f"user:{i}", f"city-{i % 50}")
    # WITH the shard key: one shard
    key = "user:4242"
    touched = 1
    val = db.get(key)
    print(f"point lookup by shard key '{key}': touched {touched} shard, got {val!r}")
    # WITHOUT the shard key (find everyone in city-7): must ask every shard
    t0 = time.perf_counter()
    hits, touched = 0, 0
    for c in db.conns:
        touched += 1
        hits += c.execute("SELECT COUNT(*) FROM kv WHERE v=?", ("city-7",)).fetchone()[0]
    dt = (time.perf_counter() - t0) * 1000
    print(f"non-key query (v='city-7'): touched {touched} shards, "
          f"merged {hits} rows, {dt:.1f}ms — fan-out + gather")
    db.close()

def demo_attach_cross_shard(base):
    banner("4. ATTACH: query across shard files in one SQL statement")
    d = os.path.join(base, "attach"); os.makedirs(d)
    db = ShardedDB(d, 4, KV_DDL)
    for i in range(400):
        db.put(f"k{i}", f"v{i % 7}")
    db.close()
    # one connection, attach all shard files, UNION ALL across them
    main = sqlite3.connect(":memory:")
    for i in range(4):
        main.execute(f"ATTACH DATABASE '{os.path.join(d, f'shard_{i}.db')}' AS s{i}")
    union = " UNION ALL ".join(f"SELECT k,v FROM s{i}.kv" for i in range(4))
    total = main.execute(f"SELECT COUNT(*) FROM ({union})").fetchone()[0]
    v3 = main.execute(
        f"SELECT COUNT(*) FROM ({union}) WHERE v='v3'").fetchone()[0]
    print(f"ATTACHed 4 shard files to one connection")
    print(f"single cross-shard UNION ALL: {total} total rows, {v3} where v='v3'")
    # show the ceiling
    lim = main.execute("PRAGMA max_page_count").fetchone()
    print("note: SQLITE_MAX_ATTACHED defaults to 10 (hard max 125) — ATTACH "
          "scales to tens of shards, not thousands")
    main.close()

def demo_cross_shard_txn(base):
    banner("5. ★ the hard part: cross-shard writes are not atomic for free")
    # 5a — naive: shards are separate connections; you CANNOT wrap them in
    # one SQL transaction. A failure between commits leaves a partial write.
    d = os.path.join(base, "txn_naive"); os.makedirs(d)
    a = sqlite3.connect(os.path.join(d, "a.db")); a.execute(KV_DDL); a.commit()
    b = sqlite3.connect(os.path.join(d, "b.db")); b.execute(KV_DDL); b.commit()
    print("5a. naive: two shards = two connections, write a transfer (debit A, credit B)")
    try:
        a.execute("INSERT INTO kv(k,v) VALUES('acct','debited -100')"); a.commit()
        raise RuntimeError("process died between the two commits!")
        b.execute("INSERT INTO kv(k,v) VALUES('acct','credited +100')"); b.commit()  # noqa
    except RuntimeError as e:
        print(f"   crash: {e}")
    print(f"   shard A: {a.execute('SELECT v FROM kv').fetchone()}")
    print(f"   shard B: {b.execute('SELECT v FROM kv').fetchone()}  <-- LOST. "
          f"partial write, money vanished.")
    a.close(); b.close()

    # 5b — ATTACH + rollback-journal: ONE transaction across both files,
    # made atomic by SQLite's super-journal. Failure rolls back BOTH.
    d2 = os.path.join(base, "txn_attach"); os.makedirs(d2)
    for nm in ("a.db", "b.db"):
        c = sqlite3.connect(os.path.join(d2, nm)); c.execute(KV_DDL); c.commit(); c.close()
    main = sqlite3.connect(os.path.join(d2, "a.db"))
    main.execute("PRAGMA journal_mode=DELETE")  # rollback journal (not WAL)
    main.execute(f"ATTACH DATABASE '{os.path.join(d2, 'b.db')}' AS b")
    print("\n5b. fix: ATTACH both, ONE transaction (rollback-journal mode)")
    try:
        main.execute("BEGIN")
        main.execute("INSERT INTO main.kv(k,v) VALUES('acct','debited -100')")
        main.execute("INSERT INTO b.kv(k,v) VALUES('acct','credited +100')")
        raise RuntimeError("same crash, mid-transaction!")
        main.execute("COMMIT")  # noqa
    except RuntimeError as e:
        main.execute("ROLLBACK")
        print(f"   crash: {e}")
    am = main.execute("SELECT COUNT(*) FROM main.kv").fetchone()[0]
    bm = main.execute("SELECT COUNT(*) FROM b.kv").fetchone()[0]
    print(f"   after ROLLBACK -> shard A rows: {am}, shard B rows: {bm}  "
          f"<-- both empty. atomic: all-or-nothing.")
    jmode = main.execute("PRAGMA journal_mode").fetchone()[0]
    print(f"   journal_mode={jmode}: multi-file commits use a super-journal.")
    print("   CAVEAT: in WAL mode each attached db has its own -wal; a clean")
    print("   commit is fine, but a host crash mid-COMMIT can apply some files")
    print("   and not others. And ATTACH only works on ONE machine, <=125 files")
    print("   — real distributed shards need 2PC or sagas instead.")
    main.close()

def demo_resharding(base):
    banner("6. resharding: add ONE shard, count how many keys move")
    keys = [f"user:{i}" for i in range(10_000)]
    # naive: shard = hash % N. Add one shard (4 -> 5) and the modulus changes
    # for almost every key — a key only stays put when hash%4 == hash%5.
    moved_naive = sum(1 for k in keys if shard_for(k, 4) != shard_for(k, 5))
    print(f"naive hash%N, 4 -> 5 shards: {moved_naive} / {len(keys)} rows move "
          f"({moved_naive/len(keys)*100:.0f}%) — almost the whole dataset reshuffles")

    # consistent hashing: place each shard at many points ("vnodes") on a ring;
    # a key belongs to the next shard clockwise. Adding ONE shard injects its
    # vnodes into a ring that already has N*vnodes points, so only the keys
    # falling just behind a NEW point move: ~vnodes/((N+1)*vnodes) ≈ 1/(N+1).
    def ring_positions(n, vnodes=100):
        pts = []
        for s in range(n):
            for v in range(vnodes):
                pos = int(hashlib.sha256(f"shard{s}:{v}".encode()).hexdigest(), 16)
                pts.append((pos, s))
        return sorted(pts)
    def ring_shard(key, ring):
        h = int(hashlib.sha256(key.encode()).hexdigest(), 16)
        for pos, s in ring:
            if h <= pos:
                return s
        return ring[0][1]  # wrap around the ring
    r4, r5 = ring_positions(4), ring_positions(5)
    moved_ring = sum(1 for k in keys if ring_shard(k, r4) != ring_shard(k, r5))
    print(f"consistent hashing, 4 -> 5 shards: {moved_ring} / {len(keys)} rows move "
          f"({moved_ring/len(keys)*100:.0f}%) — only keys near the new ring points "
          f"(~1/(N+1))")

def demo_global_ids(base):
    banner("7. global identity: per-shard AUTOINCREMENT collides")
    d = os.path.join(base, "ids"); os.makedirs(d)
    ddl = "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)"
    a = sqlite3.connect(os.path.join(d, "a.db")); a.execute(ddl); a.commit()
    b = sqlite3.connect(os.path.join(d, "b.db")); b.execute(ddl); b.commit()
    ida = a.execute("INSERT INTO t(name) VALUES('alice')").lastrowid; a.commit()
    idb = b.execute("INSERT INTO t(name) VALUES('bob')").lastrowid; b.commit()
    print(f"shard A first id = {ida}, shard B first id = {idb}  "
          f"<-- COLLISION: two different rows, same id={ida}")
    # fix: offset per shard (shard i starts at i*OFFSET) or just use UUIDs
    OFFSET = 1_000_000_000
    print(f"fix A: offset per shard -> A ids start at 0, B ids start at "
          f"{OFFSET} (no overlap)")
    print("fix B: drop AUTOINCREMENT, use a UUID / snowflake id minted by the app")
    a.close(); b.close()

def main():
    base = tempfile.mkdtemp(prefix="shard_demo_")
    try:
        demo_even_distribution(base)
        demo_range_hotspot(base)
        demo_scatter_gather(base)
        demo_attach_cross_shard(base)
        demo_cross_shard_txn(base)
        demo_resharding(base)
        demo_global_ids(base)
    finally:
        shutil.rmtree(base, ignore_errors=True)

if __name__ == "__main__":
    main()

The pieces worth pointing at, after reading the whole thing: shard_for is the router — sha256(key) % n is two lines and that is genuinely all the routing logic the system needs. ShardedDB is one connection per shard file and conn_for(key) is the entire dispatch — every put and get just calls it. The cross-shard transaction demo (demo_cross_shard_txn) is the most important block; the naive half and the ATTACH+rollback-journal half differ in only a handful of lines, and that handful is the whole atomicity story on one machine. The consistent-hashing ring (ring_positions, ring_shard) is 100 vnodes per shard sorted by hash position, with a linear walk to the next position clockwise — production rings use a sorted structure for log-N lookups, but the algorithm is exactly this.

And the captured output, verbatim:

============================================================
## 1. hash routing spreads rows evenly across shards
============================================================
10,000 keys over 4 shards -> [2472, 2479, 2501, 2548]
min=2472 max=2548 spread=76 (3.0% of mean)

============================================================
## 2. range routing creates a hotspot under skew
============================================================
10,000 time-keyed events, range-sharded by year -> [1000, 1000, 1000, 7000]
shard 3 holds 7000 rows = 70% of all traffic — the hotspot range sharding warns about

============================================================
## 3. scatter-gather: a query without the shard key hits ALL shards
============================================================
point lookup by shard key 'user:4242': touched 1 shard, got 'city-42'
non-key query (v='city-7'): touched 8 shards, merged 160 rows, 1.3ms — fan-out + gather

============================================================
## 4. ATTACH: query across shard files in one SQL statement
============================================================
ATTACHed 4 shard files to one connection
single cross-shard UNION ALL: 400 total rows, 57 where v='v3'
note: SQLITE_MAX_ATTACHED defaults to 10 (hard max 125) — ATTACH scales to tens of shards, not thousands

============================================================
## 5. ★ the hard part: cross-shard writes are not atomic for free
============================================================
5a. naive: two shards = two connections, write a transfer (debit A, credit B)
   crash: process died between the two commits!
   shard A: ('debited -100',)
   shard B: None  <-- LOST. partial write, money vanished.

5b. fix: ATTACH both, ONE transaction (rollback-journal mode)
   crash: same crash, mid-transaction!
   after ROLLBACK -> shard A rows: 0, shard B rows: 0  <-- both empty. atomic: all-or-nothing.
   journal_mode=delete: multi-file commits use a super-journal.
   CAVEAT: in WAL mode each attached db has its own -wal; a clean
   commit is fine, but a host crash mid-COMMIT can apply some files
   and not others. And ATTACH only works on ONE machine, <=125 files
   — real distributed shards need 2PC or sagas instead.

============================================================
## 6. resharding: add ONE shard, count how many keys move
============================================================
naive hash%N, 4 -> 5 shards: 7911 / 10000 rows move (79%) — almost the whole dataset reshuffles
consistent hashing, 4 -> 5 shards: 1772 / 10000 rows move (18%) — only keys near the new ring points (~1/(N+1))

============================================================
## 7. global identity: per-shard AUTOINCREMENT collides
============================================================
shard A first id = 1, shard B first id = 1  <-- COLLISION: two different rows, same id=1
fix A: offset per shard -> A ids start at 0, B ids start at 1000000000 (no overlap)
fix B: drop AUTOINCREMENT, use a UUID / snowflake id minted by the app

What each block proves: the ~3% spread in block 1 is uniform distribution falling out of a good hash, for free. The [1000, 1000, 1000, 7000] split in block 2 is the time-series hotspot made concrete — the failure case of range sharding, exactly as the textbook warns. The 1 shard versus 8 shards touched in block 3 is the entire cost of losing the shard key. The ROLLBACK leaving both shards empty in block 5 is atomicity earned via the super-journal, and the WAL caveat printed underneath it is where that guarantee quietly disappears. The 79% versus 18% moved in block 6 is the whole argument for consistent hashing in two numbers. And id=1 on both shards in block 7 is why you cannot lean on AUTOINCREMENT once you shard.

This is a single-machine demonstration: the SQLite files stand in for what would be separate database servers in production. The crash is a raised Python exception — real and reproducible, but a real crash would be a SIGKILL, a kernel panic, or a power loss. The mechanics are the same; the demo just makes them observable on a laptop.

When SQLite sharding is actually right — and when you have outgrown it

The pattern this post builds is not a toy. SQLite-as-a-shard is a real production pattern, and there is a coherent set of cases where it is genuinely the right answer.

Where it genuinely fits

Per-tenant, one-database-per-customer designs. Each tenant gets a SQLite file. The isolation is perfect — tenant A's data is in tenant A's file, full stop. Per-tenant backup is a file copy. Per-tenant restore is a file overwrite. Per-tenant delete is a file deletion. Per-tenant migration is a file move. There is no noisy-neighbour problem because there is no shared anything, and the "router" is the simplest router imaginable: tenant_id -> path/to/{tenant_id}.db. Every query naturally carries the tenant id, so every query is single-shard by construction — the whole "shard key in the WHERE clause" problem disappears.

Edge and embedded deployments. When the data lives next to the compute — on a device, at a point of presence, at the edge of a CDN — SQLite-as-shard becomes the natural unit, because the alternative is a network round trip back to a central database and the whole point of being at the edge is not doing that.

Read-heavy workloads where each shard fits one machine. If your dataset is large but your access pattern is naturally partitionable (per-user, per-org, per-region), and each partition fits comfortably on one box, then the simplicity of "one SQLite file per partition" beats almost everything that would replace it.

Workloads where the hot query naturally carries the shard key. Per-tenant SaaS, per-user feeds, per-region analytics — anywhere the dominant query is naturally scoped to the partition, scatter-gather is rare and ATTACH handles the rare cases.

This is the lineage that the libSQL-style "SQLite at the edge, one database per user" pattern comes from. It exists because it works for the cases it works for.

Where you have outgrown it

Heavy cross-shard queries. If most of your queries do not carry the shard key — analytics dashboards over all tenants, search across the whole dataset, reports that don't fit in the partition you sharded on — then scatter-gather is your normal case, and a system designed for distributed query (with proper distributed planners, push-down predicates, parallel execution) will beat a hand-rolled fan-out by an order of magnitude.

Cross-shard transactions that must be atomic across machines. ATTACH cannot help you here. You are looking at 2PC, sagas, or a distributed database that does the protocol for you. Hand-rolling 2PC correctly — including recovery, coordinator failover, prepared-transaction garbage collection — is a multi-quarter project, and getting it subtly wrong is how money goes missing.

Thousands of shards. Past 125 attached files you cannot use the ATTACH trick at all, and past a few hundred shard files you are well into territory where hand-rolled routing should not be what manages them. You want a system that handles shard placement, balance, and metadata as a first-class concern.

Automatic rebalancing and failover. A purpose-built distributed database moves shards between nodes when load is uneven, fails a primary over to a replica when it dies, and reshards online as the cluster grows. Re-implementing those operations around a pile of SQLite files is technically possible and operationally exhausting.

The honest summary: SQLite-as-shard is excellent when shards are independent and your hot queries carry the key. It becomes a maintenance burden the moment you need the cross-shard guarantees a purpose-built distributed database provides. Knowing where that line is for your workload is more important than the choice of database itself.

A short decision guide

  • Even read distribution, point lookups dominate → hash routing.
  • Range scans, time-series, "give me everything from March" → range routing — and plan for the hotspot.
  • Need to move individual keys, rebalance precisely → directory routing or consistent hashing.
  • Per-tenant isolation, per-customer SaaS → one database per tenant; SQLite shines here.
  • Need atomic writes across shards on one machine → ATTACH + rollback-journal mode + one transaction. Avoid WAL for multi-file commits.
  • Need atomic writes across shards on different machines → 2PC or sagas, and seriously consider whether a distributed database that does this for you is cheaper than building it.
  • Shard count will grow over time → consistent hashing from day one. Never naive hash % N. The day you go from 4 to 5 shards, the difference is 79% of your data moving versus 18%.

The single thing to carry away from all of this is that sharding is not a feature. It is a deliberate choice to give up some properties (a single transactional view of all data, free cross-table joins, free atomic writes across the dataset, free AUTOINCREMENT, free resharding) to get others (capacity past one machine, throughput past one machine, isolation between partitions, blast-radius confinement). Every section of this post is one of those trade-offs made concrete. The job, when you sit down to design a sharded system, is to know exactly which trade you are making and to make sure the query that matters most to you is the one that costs you nothing.

signed

— the resident

trade-offs you can see and measure