Flaky Playwright Tests and Phoenix: A Distributed Systems Problem

The DBConnection.OwnershipError isn't a bug. It's the BEAM telling you you're building a distributed system wrong

Ever run into that dreaded DBConnection.OwnershipError during your Playwright tests? The one that makes you think, "Great, another flaky test." Let me tell you, this isn’t about flakiness or some elusive race condition. What you’re seeing is the BEAM VM doing exactly what it’s designed to do: enforcing strict process isolation. Remember, this isn’t a monolith you’re testing. You’re working with three fully isolated processes, each with its own boundaries. One of them is stepping over the line, trying to grab a database connection that it’s not supposed to have access to.

Here’s the kind of in-depth explanation I wish I’d had when I first ran into this issue. We’ll dig into how the Ecto SQL Sandbox operates, explore the process dictionary, walk through the ETS permission table lookups, and break down why phoenix_test_playwright it leverages the user agent string as a subtle communication channel. By the time we’re done, you won’t just have your tests running smoothly, you’ll also have a solid grasp of why the solution works, right down to the VM internals.

The Problem: Three Processes, Zero Shared Context

When you write what looks like a simple Playwright feature test:

# test/my_app_web/features/user_registration_test.exs
defmodule MyAppWeb.UserRegistrationTest do
  use MyAppWeb.FeatureCase, async: true

  test "user can register with valid data", %{conn: conn} do
    conn
    |> visit("/register")
    |> fill_in("Email", with: "alice@example.com")
    |> fill_in("Password", with: "SecurePass123!")
    |> click_button("Create Account")
    |> assert_has(".alert-success", text: "Welcome!")
  end
end

You're actually orchestrating three distinct processes that share nothing but the BEAM runtime:

P1: The ExUnit Test Process (The Owner)

This is the process that runs your test code. When you use MyAppWeb.FeatureCase, somewhere in the setup it calls:

# What phoenix_test_playwright does for you
owner_pid = self()
{:ok, _owner_sup} = Ecto.Adapters.SQL.Sandbox.start_owner!(MyApp.Repo, ownership_timeout: :infinity)

This call is deceptively simple. What's actually happening:

  1. ETS Table Creation: The sandbox creates a public ETS table (by default named $callers , but this is misleading, it's not about call stacks) that will store permission tuples.
  2. Process Dictionary Injection: The DBConnection.Ownership process stores the actual connection reference in P1's process dictionary under the key :"$db_connection". This is crucial - only P1 can see this.
  3. Transaction Wrapping: The connection is wrapped in a SQL BEGIN transaction with ROLLBACK queued for when the owner exits.

The owner process (P1) now holds the database connection. But here's the kicker: when P1 exits, that connection dies with it. This is by design. It's a sandbox, transactions are supposed to be isolated and ephemeral.

P2: The HTTP Request Handler (The Orphan)

When Playwright's process hits your Phoenix endpoint with an HTTP request, Phoenix spawns a new, short-lived process to handle it. This process is supervised by your Endpoint supervisor, not your test. Its ancestry looks like:

P2 (HTTP Handler)
  ├── Parent: MyAppWeb.Endpoint.ProcessSupervisor
  ├── Grandparent: MyAppWeb.Endpoint
  └── No relation to P1 whatsoever

When your LiveView mounts or controller action runs, it executes in P2's context. If it tries to query the database:

# Inside your LiveView mount/3 (running in P2)
def mount(_params, _session, socket) do
  user_count = MyApp.Repo.aggregate(MyApp.Accounts.User, :count)
  {:ok, assign(socket, user_count: user_count)}
end

The Repo.aggregate/3 call triggers a DBConnection lookup. Here's the exact sequence inside lib/db_connection.ex:

# Simplified from DBConnection source
def checkout(conn, opts) do
  owner = DBConnection.Ownership.find_owner(self())
  # ...
end

find_owner/1 does a three-step search:

  1. Check its own process dictionary: Process.get(:"$db_connection")nil
  2. Walk ancestry: Recursively check parent processes → Not found
  3. Query ETS permission table: Look for {self(), :allowed, owner_pid} → Not found

Result: {:error, :not_found}OwnershipError.

P3: The LiveView GenServer (The Long-Lived Ghost)

The real complexity starts after the initial HTTP request completes. When Playwright upgrades to a WebSocket connection, Phoenix spawns yet another process, a Phoenix.LiveView.Socket GenServer, supervised by the Phoenix.Socket.Pool supervisor:

P3 (LiveView Socket)
  ├── Parent: Phoenix.Socket.Pool.Supervisor
  ├── Grandparent: MyAppWeb.Endpoint
  └── Still no relation to P1

This process is long-lived. It persists for the duration of the WebSocket connection, handling handle_event/3, handle_info/2, and handle_params/3 callbacks. If your LiveView does any async work:

def handle_info(:delayed_query, socket) do
  # This runs AFTER your test might have passed
  data = MyApp.Repo.all(MyApp.Analytics.Event)
  {:noreply, assign(socket, data: data)}
end

You're now in a race condition. The test process (P1) might finish, trigger a transaction rollback, and exit, while P3 is still executing. When P3's query hits the database, the connection is gone.

How Ecto SQL Sandbox Actually Works

The Process Dictionary: Connection Storage

When start_owner!/2 is called, it eventually reaches DBConnection.Ownership.start_link/2. Here's the critical code from lib/db_connection/ownership.ex:

def init({pool, owner, tag, timeout}) do
  # The connection ref is stored in THIS process's dictionary
  Process.put(:"$db_connection", {:owner, pool, tag})
  # ...
end

The process dictionary is a key-value store local to each process. It's not shared. It's not a global variable. It's not accessible from other processes. This is fundamental BEAM isolation.

When your test process (P1) owns the connection, only P1 can find it in its dictionary. No amount of send/2 or message passing will give P2 or P3 access. They need explicit permission.

The ETS Table: The Permission Ledger

The sandbox creates a public ETS table (by default: :"$db_connection_owners"). Let's examine its structure:

# When you call Ecto.Adapters.SQL.Sandbox.allow/3
:ets.insert_new(@ownership_table, {allowed_pid, :allowed, owner_pid, pool})

The table schema is:

  • Key: allowed_pid (the process being granted access)
  • Value: {allowed_pid, :allowed, owner_pid, pool}
  • Access: Public (readable/writable from any process)

When P2 or P3 attempts a database operation, DBConnection.find_owner/1 queries this table:

# From lib/db_connection/ownership.ex
def find_owner(caller_pid) do
  case :ets.lookup(@ownership_table, caller_pid) do
    [{^caller_pid, :allowed, owner_pid, _pool}] -> {:ok, owner_pid}
    [] -> find_owner_by_ancestry(caller_pid)
  end
end

This is a constant-time O(1) lookup, fast, but it means the permission must be inserted before the query happens.

The Permission Check Flow (Step-by-Step)

Let's trace a Repo.insert/1 call from within a LiveView handle_event/3:

# 1. User clicks button
# 2. P3 (LiveView) receives websocket message
# 3. handle_event/3 executes in P3 context

def handle_event("create_user", params, socket) do
  # This line:
  user = MyApp.Repo.insert!(changeset)

  # Expands to:
  Ecto.Repo.insert!(MyApp.Repo, changeset)

  # Which calls:
  Ecto.Adapters.SQL.insert(adapter_meta, query, opts)

  # Which calls:
  DBConnection.execute(conn, query, opts)

  # Inside DBConnection.execute/4:
  ownership = DBConnection.Ownership.find_owner(self())
  # self() is P3's PID

  # find_owner/1 does:
  # Step 1: Check own process dictionary
  Process.get(:"$db_connection") # => nil

  # Step 2: Check ETS table
  :ets.lookup(:'$db_connection_owners', self()) # => []

  # Step 3: Check ancestry (simplified)
  find_ancestor_owner(self()) # => :error

  # Result: {:error, :not_found}

  # Which raises: DBConnection.OwnershipError
end

The error message is telling you the truth: "cannot find ownership process for #PID<0.x.y>". It searched the process dictionary, the ETS table, and the supervision tree. Nothing.

The User Agent Pattern: A Covert Permission Channel

Why Not Custom Headers?

You might think: "I'll just pass the owner PID in a custom header!" Let's see why that fails:

The WebSocket upgrade handshake does not include custom HTTP headers from the original request. The spec only allows the Cookie header (for session) and a few others. Your X-Test-Owner header dies at the upgrade boundary.

The User Agent Survives

The User-Agent header is different. It's part of the browser's persistent identity, not the request. When you configure Playwright's browser context:

// What phoenix_test_playwright does automatically
const browser = await chromium.launch();
const context = await browser.newContext({
  userAgent: "Mozilla/5.0 ... Sandbox: {metadata}"
});

This user agent is sent on every HTTP request and is included in the WebSocket handshake's User-Agent field. It's the only piece of metadata that reliably crosses the HTTP/WebSocket boundary.

How phoenix_test_playwright Encodes Metadata

Let's look at the actual implementation (simplified from the library):

# In test setup
def start_owner_and_encode_metadata(repo) do
  # 1. Start owner
  {:ok, owner_pid} = Ecto.Adapters.SQL.Sandbox.start_owner!(repo)

  # 2. Generate metadata map
  metadata = %{
    repo: repo,
    owner: owner_pid,
    test_pid: self()
  }

  # 3. Encode with Phoenix's built-in encoder
  encoded = Phoenix.Ecto.SQL.Sandbox.encode_metadata(metadata)
  # Returns a base64-encoded, compressed string

  # 4. Inject into browser context
  set_browser_user_agent(encoded)

  encoded
end

The encode_metadata/1 function produces a string like:

"Phx-Ecto-Sandbox: eJxVjE0KwjAQRfdzin4B0hZc+QAuXLiC5iGppG1I2lSUDqXv7k1c3MzL
m3kTt5T4W6QsQeDJH9JO3Kqt0BVmhXJXJK6VMK0rYVYXwjQvhHl5JO3LO/5O3B8AAAD//wMAJQBLJg=="

This is base64-encoded, zlib-compressed Erlang term. It's opaque, compact, and survives header parsing.

Decoding and Permission Granting

In your endpoint, the Phoenix.Ecto.SQL.Sandbox plug decodes this:

# lib/phoenix_ecto/sql_sandbox.ex
def call(conn, _opts) do
  case get_req_header(conn, "user-agent") do
    [user_agent | _] ->
      case extract_metadata(user_agent) do
        {:ok, %{owner: owner_pid, repo: repo}} ->
          # CRITICAL: Grant permission to THIS process (P2)
          Ecto.Adapters.SQL.Sandbox.allow(repo, owner_pid, self())
        :error ->
          :ok
      end
    [] ->
      :ok
  end
  conn
end

The allow/3 call inserts into the ETS table:

# From ecto_sql/lib/ecto/adapters/sql/sandbox.ex
def allow(repo, owner_pid, allowed_pid) do
  pool = GenServer.whereis(repo)

  # Check owner actually owns a connection
  case :ets.lookup(@owner_table, owner_pid) do
    [{^owner_pid, :owner, _pool, _tag}] ->
      # Insert permission tuple
      :ets.insert(@ownership_table, {allowed_pid, :allowed, owner_pid, pool})
      :ok
    [] ->
      {:error, :not_found}
  end
end

Now, when P2 queries, the ETS lookup succeeds:

# P2's database query:
:ets.lookup(:'$db_connection_owners', self())
# => [{#PID<0.3421.0>, :allowed, #PID<0.2261.0>, #PID<0.123.0>}]

LiveView Permission: The on_mount Hook

The HTTP handler (P2) is short-lived. After it renders the initial HTML, it terminates. But the WebSocket process (P3) is just starting. It needs the same permission granted again.

The get_connect_info/2 function extracts the user agent from the WebSocket handshake:

# lib/my_app_web/live_helpers.ex
def on_mount(:default, _params, _session, socket) do
  if connected?(socket) do  # Only for WebSocket-connected mount
    case get_connect_info(socket, :user_agent) do
      user_agent when is_binary(user_agent) ->
        # Same metadata, same permission grant
        Phoenix.Ecto.SQL.Sandbox.allow(user_agent, Ecto.Adapters.SQL.Sandbox)
      _ -> :ok
    end
  end
  {:cont, socket}
end

Critical detail: connected?(socket) returns false on the initial HTTP render (P2's context) and true on the WebSocket mount (P3's context). This prevents double-granting permission to the same process.

Mox: The Same Isolation, Same Solution

Mox (Elixir's mocking library) has the same problem. Mock expectations are stored in the defining process's dictionary. Let's trace through:

# test file
test "sends welcome email", %{conn: conn} do
  # Expectation stored in P1's dictionary
  expect(MyApp.MockMailer, :send, fn _email -> {:ok, %{id: "test-123"}} end)

  conn
  |> visit("/register")
  |> click_button("Create Account")  # Triggers mailer in P3

  # P3 cannot see P1's expectation!
end

# Inside the LiveView
def handle_event("create_account", params, socket) do
  # This runs in P3
  MyApp.MockMailer.send(email)  # Mox looks in P3's dictionary → :error
  # Raises: Mox.UnexpectedCallError
end

Mox stores expectations in an ETS table private to the defining process:

# From mox/lib/mox.ex
def expect(mock, name, n \\ 1, code) do
  # Store in process dictionary of CURRENT process (P1)
  Process.put({mock, name}, %{n: n, code: code})
end

The Mox.allow/3 Solution

Just like Ecto, Mox provides allow/3 for cross-process expectations:

# test/support/live_helpers.ex
def on_mount(:default, _params, _session, socket) do
  if connected?(socket) do
    case get_connect_info(socket, :user_agent) do
      user_agent when is_binary(user_agent) ->
        # Decode metadata to get test PID
        metadata = Phoenix.Ecto.SQL.Sandbox.decode_metadata(user_agent)

        # Allow Mox expectations
        Mox.allow(MyApp.MockMailer, metadata.test_pid, self())
        Mox.allow(MyApp.MockRepo, metadata.test_pid, self())
        Mox.allow(MyApp.MockHTTPClient, metadata.test_pid, self())

        # Also allow Ecto sandbox
        Phoenix.Ecto.SQL.Sandbox.allow(user_agent, Ecto.Adapters.SQL.Sandbox)
      _ -> :ok
    end
  end
  {:cont, socket}
end

Mox Patterns: Stubs vs. Expectations

For async-safe tests, prefer stubs:

# test/support/feature_case.ex
defmodule MyAppWeb.FeatureCase do
  use ExUnit.CaseTemplate, async: true

  setup _tags do
    # Stubs are GLOBAL - visible to all processes
    stub(MyApp.MockMailer, :send, fn _email ->
      {:ok, %{id: "stubbed-id"}}
    end)

    stub(MyApp.MockHTTPClient, :get, fn _url ->
      {:ok, %{status: 200, body: "ok"}}
    end)

    :ok
  end
end

Stubs are stored in a shared ETS table (:$mox_global), making them visible to all processes. They don't require allow/3.

Use expectations only when you need to assert call count or arguments:

# test file
test "charges credit card exactly once", %{conn: conn} do
  expect(MyApp.MockPaymentGateway, :charge, 1, fn _amount ->
    {:ok, %{transaction_id: "tx-123"}}
  end)

  # ... test code ...

  # Verify called exactly once
  verify!(MyApp.MockPaymentGateway)
end

Never use set_mox_global/1:

# DON'T - this forces async: false
setup :set_mox_global  # Creates race conditions between tests

set_mox_global makes expectations global, which means tests can interfere with each other. It's async: false by another name.

The Race Condition: When Tests Finish Too Early

The Problem: Async Work After Test Completion

Here's a real-world scenario that will bite you:

defmodule MyAppWeb.DashboardLive do
  use MyAppWeb, :live_view

  def mount(_params, _session, socket) do
    if connected?(socket) do
      # Simulate delayed analytics loading
      Process.send_after(self(), :load_charts, 500)
      Process.send_after(self(), :load_metrics, 1000)
    end

    {:ok, assign(socket, page_state: :loading, charts: nil, metrics: nil)}
  end

  def handle_info(:load_charts, socket) do
    # This runs 500ms after mount
    charts = MyApp.Analytics.generate_charts()  # DB queries here
    {:noreply, assign(socket, charts: charts)}
  end

  def handle_info(:load_metrics, socket) do
    # This runs 1000ms after mount
    metrics = MyApp.Analytics.get_metrics()  # More DB queries
    {:noreply, assign(socket, metrics: metrics, page_state: :ready)}
  end
end

Your test:

test "shows dashboard", %{conn: conn} do
  conn
  |> visit("/dashboard")
  |> assert_has(".chart-container")  # Charts might not be loaded yet!

  # Test passes as soon as HTML renders...
  # ...but LiveView is still processing :load_charts message
end

Timeline:

t=0ms:  Test starts, visits /dashboard
t=10ms: HTTP request (P2) renders initial HTML, shows loading state
t=15ms: WebSocket connects (P3), mount/3 sends delayed messages
t=20ms: Test asserts on HTML, passes
t=25ms: Test process (P1) exits, transaction rolls back
t=500ms: P3 handles :load_charts, tries to query DB
        → 💥 DBConnection.ConnectionError: owner exited

The Solution: assign_async and Semantic State Tracking

Phoenix LiveView has assign_async/3 for exactly this problem:

def handle_info(:load_data, socket) do
  socket =
    socket
    |> assign(page_state: :loading)
    |> assign_async(:charts, fn ->
        # Runs in a Task process, but error handling is managed
        {:ok, %{charts: MyApp.Analytics.generate_charts()}}
      end)
    |> assign_async(:metrics, fn ->
        {:ok, %{metrics: MyApp.Analytics.get_metrics()}}
      end)

  {:noreply, socket}
end

def handle_async(:charts, {:ok, %{charts: charts}}, socket) do
  {:noreply, assign(socket, charts: charts) |> maybe_set_complete()}
end

def handle_async(:metrics, {:ok, %{metrics: metrics}}, socket) do
  {:noreply, assign(socket, metrics: metrics) |> maybe_set_complete()}
end

defp maybe_set_complete(%{assigns: assigns} = socket) do
  if assigns.charts != :loading && assigns.metrics != :loading do
    assign(socket, page_state: :complete)
  else
    socket
  end
end

How assign_async works internally (simplified):

# From phoenix_live_view/lib/phoenix_live_view.ex
def assign_async(socket, key, func) do
  task_pid = Task.async(fn ->
    try do
      result = func.()
      send(self(), {:async_result, key, result})
    catch
      kind, reason -> send(self(), {:async_result, key, {:exit, kind, reason}})
    end
  end)

  put_in(socket.assigns[key], {:loading, task_pid})
end

The key insight: assign_async tracks the state of the async operation, not just the result. When you assert on data-page-state="complete", you're waiting for a deterministic state, not racing against a timer.

Testing with Semantic State

# Template
<div data-page-state={@page_state} data-testid="dashboard">
  <%= if @page_state == :loading do %>
    <.spinner />
  <% else %>
    <div class="charts"><%= @charts %></div>
    <div class="metrics"><%= @metrics %></div>
  <% end %>
</div>

# Test
test "loads all analytics completely", %{conn: conn} do
  conn
  |> visit("/dashboard")
  # Wait for explicit state, not arbitrary timeout
  |> assert_has("[data-page-state='complete']", timeout: 5_000)

  # Now safe to assert on content
  |> assert_has(".charts")
  |> assert_has(".metrics")
end

Performance win: The test only waits as long as needed, not a fixed Process.sleep/1 duration.

Advanced Patterns and Edge Cases

Testing GenServer Calls from LiveView

What if your LiveView calls a GenServer that queries the database?

defmodule MyApp.PriceCalculator do
  use GenServer

  def calculate(product_id) do
    GenServer.call(__MODULE__, {:calculate, product_id})
  end

  def handle_call({:calculate, product_id}, _from, state) do
    # This runs in the GenServer process (P4)
    product = MyApp.Repo.get!(MyApp.Catalog.Product, product_id)
    price = compute_price(product)
    {:reply, price, state}
  end
end

defmodule MyAppWeb.ProductLive do
  def handle_event("calculate", %{"id" => id}, socket) do
    price = MyApp.PriceCalculator.calculate(id)  # Calls P4
    {:noreply, assign(socket, price: price)}
  end
end

Now you have four processes: P1 (test), P3 (LiveView), and P4 (GenServer). You need to allow P4 too:

# In your LiveView on_mount
def on_mount(:default, _params, _session, socket) do
  if connected?(socket) do
    case get_connect_info(socket, :user_agent) do
      user_agent when is_binary(user_agent) ->
        metadata = decode_metadata(user_agent)

        # Allow the GenServer too
        Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, metadata.owner, MyApp.PriceCalculator)

        # Also need to allow Mox if GenServer uses mocks
        Mox.allow(MyApp.MockAPI, metadata.test_pid, MyApp.PriceCalculator)

        # Grant to current process (P3)
        Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, metadata.owner, self())
      _ -> :ok
    end
  end
  {:cont, socket}
end

Better approach: Pass the caller's PID explicitly:

def handle_event("calculate", %{"id" => id}, socket) do
  # Tell the GenServer to use our permissions
  price = MyApp.PriceCalculator.calculate(id, caller_pid: self())
  {:noreply, assign(socket, price: price)}
end

# GenServer
def handle_call({:calculate, id, caller_pid}, _from, state) do
  # Allow this specific call
  Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, caller_pid, self())
  # ... query database ...
end

Testing Oban Jobs Triggered by LiveView

def handle_event("bulk_import", %{"file" => file}, socket) do
  # Enqueues job that runs in separate process
  %{id: job_id} = Oban.insert!(MyApp.Workers.ImportJob.new(%{file: file}))

  # Track job in socket for testing
  {:noreply, assign(socket, import_job_id: job_id)}
end

The Oban worker runs in yet another process (P5), outside the sandbox. You have three options:

  1. Disable Oban in tests (simplest):
# config/test.exs
config :my_app, Oban, testing: :inline  # Runs synchronously in test process
  1. Allow the worker (complex):
defmodule MyApp.Workers.ImportJob do
  use Oban.Worker

  @impl true
  def perform(%Oban.Job{args: args}) do
    # Worker needs to decode user agent from args
    metadata = args["sandbox_metadata"]
    Ecto.Adapters.SQL.Sandbox.allow(metadata, Application.fetch_env!(:my_app, :sandbox_mod))

    # ... perform work ...
  end
end
  1. Test the effect, not the job (recommended):
test "bulk imports users", %{conn: conn} do
  conn
  |> visit("/import")
  |> upload_file("input[type=file]", "users.csv")
  |> click_button("Import")

  # Wait for LiveView to signal completion
  |> assert_has("[data-import-state='complete']")

  # Assert on final state, not job internals
  assert MyApp.Repo.aggregate(MyApp.Accounts.User, :count) == 100
end

Memory Implications

Each sandbox connection holds a PostgreSQL backend process open. With async: true, you can have N connections simultaneously:

# config/test.exs
config :my_app, MyApp.Repo,
  pool_size: 20,  # Up to 20 parallel tests
  ownership_timeout: :infinity

Monitor with :observer.start():

  • P1 (test): ~2MB each
  • PostgreSQL backend: ~5MB each
  • Total for 20 tests: ~140MB

On modern hardware, this is trivial. The parallelism gain far outweighs the memory cost.

Debugging: When It Still Doesn't Work

Tool 1: Trace the Permission Flow

# In iex -S mix test
:dbg.tracer()
:dbg.p(:all, [:call])

# Trace all Sandbox.allow calls
:dbg.tp(Ecto.Adapters.SQL.Sandbox, :allow, 3, [])

# Run test, watch output

You'll see exactly which processes are calling allow/3 and when.

Tool 2: Inspect the ETS Table

# In test setup or IEx
def inspect_sandbox() do
  table = :"$db_connection_owners"

  # All permissions
  :ets.tab2list(table)
  |> Enum.each(fn {allowed, :allowed, owner, pool} ->
    IO.puts "#{inspect(allowed)} ← allowed by #{inspect(owner)}"
  end)

  # Owner connections
  owner_table = :"$db_connection_owner_table"
  :ets.tab2list(owner_table)
  |> Enum.each(fn {owner, :owner, pool, tag} ->
    IO.puts "Owner: #{inspect(owner)} → Pool: #{inspect(pool)}"
  end)
end

Tool 3: Trace Process Exits

# In your test
def test_with_exit_trace do
  # Monitor owner
  owner = self()
  ref = Process.monitor(owner)

  # Run test logic...

  receive do
    {:DOWN, ^ref, :process, ^owner, reason} ->
      IO.inspect(reason, label: "Owner exited")
  after
    5000 -> :ok
  end
end

Common Error #1: "owner exited" After Test Passes

Symptom: Test passes, then you see a database error in the logs.

Cause: LiveView is still processing async work.

Fix: Use assign_async and assert on the semantic state, not just UI presence.

Common Error #2: "cannot find ownership process" on Initial Page Load

Symptom: OwnershipError on first HTTP request.

Cause: Phoenix.Ecto.SQL.Sandbox plug not in endpoint, or placed before Plug.Parsers.

Fix: Ensure plug is after parsers but before router:

plug Plug.Parsers, ...
plug Phoenix.Ecto.SQL.Sandbox  # Here!
plug MyAppWeb.Router

Common Error #3: Mox.UnexpectedCallError in LiveView

Symptom: Mock works in test but fails in LiveView.

Cause: Didn't call Mox.allow/3 in on_mount.

Fix: Add Mox.allow(mock, test_pid, self()) alongside sandbox allow.

Common Error #4: Intermittent Failures with assign_async

Symptom: Tests pass locally, fail in CI.

Cause: CI is slower, async work times out before assert_has.

Fix: Increase assert_has timeout or optimize async functions.

# Increase timeout for CI
|> assert_has("[data-page-state='complete']", timeout: 10_000)

The Internal Implementation: Reading the Source

Let's examine the actual DBConnection source to understand the permission check:

# From hex.pm package db_connection 2.8.1, lib/db_connection/ownership.ex
defmodule DBConnection.Ownership do
  @moduledoc """
  DBConnection plugin for ownership.
  """

  @ownership_table :"$db_connection_owners"

  def find_owner(pid) when is_pid(pid) do
    case Process.get(:"$db_connection") do
      {:owner, pool, tag} ->
        {:ok, {pool, tag}}

      _ ->
        case :ets.lookup(@ownership_table, pid) do
          [{^pid, :allowed, owner, pool}] ->
            # Recursively find the owner's connection
            case find_owner(owner) do
              {:ok, {pool, tag}} -> {:ok, {pool, tag}}
              error -> error
            end

          [] ->
            # Walk supervision tree
            find_owner_by_ancestry(pid)
        end
    end
  end

  defp find_owner_by_ancestry(pid) do
    case Process.info(pid, :dictionary) do
      {:dictionary, dict} ->
        case dict[:"$db_connection_parent"] do
          nil -> :error
          parent -> find_owner(parent)
        end
      _ -> :error
    end
  end
end

Key insights from source:

  1. Transitive permissions are not automatic: If A allows B, B cannot allow C. Only the original owner can grant permissions.
  2. Ancestry check uses $db_connection_parent: This is how allow/3 with {:process, parent_pid} works.
  3. ETS lookup is recursive: It finds the owner, then finds the owner's connection.

Alternative Approaches: Trade-offs

Approach 1: Global Sandbox Mode

# config/test.exs
config :my_app, MyApp.Repo, pool: Ecto.Adapters.SQL.Sandbox, ownership_mode: :global

Pros: No allow/3 needed, all processes share one connection.

Cons:

  • Forces async: false (global state)
  • Tests can interfere (uncommitted data visible across tests)
  • PostgreSQL deadlock risk with parallel tests

Verdict: Only for legacy suites, you can't refactor.

Approach 2: Transaction Isolation in Tests

# Don't use sandbox, manage transactions manually
setup do
  :ok = Ecto.Adapters.SQL.begin_test_transaction(MyApp.Repo)
  on_exit(fn -> Ecto.Adapters.SQL.rollback_test_transaction(MyApp.Repo) end)
end

Pros: No process isolation issues.

Cons:

  • Still requires async: false (shared transaction)
  • Manual cleanup is error-prone
  • Doesn't work with LiveView (process dies before cleanup)

Verdict: Pre-sandbox era pattern, don't use.

Approach 3: Roll Your Own Permission Channel

# Pass PID through application environment
setup do
  owner = self()
  Application.put_env(:my_app, :test_owner, owner)

  on_exit(fn -> Application.delete_env(:my_app, :test_owner) end)
end

# In LiveView
def on_mount(_, _, _, socket) do
  if owner = Application.get_env(:my_app, :test_owner) do
    Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, owner, self())
  end
  {:cont, socket}
end

Pros: Simple, no user-agent magic.

Cons:

  • Global state (breaks async)
  • Race conditions between tests
  • Application env is a bottleneck (single process access)

Verdict: Anti-pattern. Global state is death to async tests.

The Correct Mental Model: Distributed Erlang

At its core, the ownership system models distributed Erlang. Each test process is like a remote node that owns resources. The allow/3 function is like granting RPC permissions.

When you think in these terms, the solution is obvious: explicit, fine-grained permission grants, exactly what you'd do in a real distributed system.

Final Architecture: The Complete Picture

This is what you're building. It's not a simple test, it's a microservices architecture compressed into a single VM.

TL;DR: Key Takeaways

  1. DBConnection.OwnershipError is correct behavior - The BEAM is protecting you from shared-state bugs.
  2. Three processes, three contexts - Test, HTTP, and WebSocket processes are isolated by design.
  3. User agent is the only reliable metadata channel - It survives HTTP → WebSocket upgrade. Headers don't.
  4. allow/3 is a permission grant, not a connection transfer - The owner still controls the connection; others get temporary access.
  5. async: false is admitting defeat - Use Phoenix.Ecto.SQL.Sandbox plug and on_mount hooks instead.
  6. Race conditions are real - Use assign_async and assert on the semantic state like data-page-state="complete".
  7. Mox needs the same treatment - Call Mox.allow/3 alongside your sandbox allow.
  8. This is distributed systems 101 - The same patterns apply to Node.connect/2 and :rpc.multicall/4.

The ownership system isn't a limitation to work around—it's a correct design for concurrent, isolated tests. Once you stop fighting it and start leveraging it, your tests become reliable, parallel, and fast.


References