Skip to content

Commit

Permalink
services/horizon: Lock ingestion while applying DB migrations (#4587)
Browse files Browse the repository at this point in the history
Acquire ingestion lock before applying DB migrations to stop ingestion.

When applying `58_add_index_by_id_optimization.sql` migration file we
experienced a deadlock. It can happen when ingestion is running because
ingestion by inserting or updating certain rows can acquire `RowExclusiveLock`
which conflicts with other locks (like `ShareLock` when creating a new index).

We lock ingestion only for `up` migrations. `down` migrations can remove
key-value table which can lead to deadlocks.
  • Loading branch information
bartekn committed Sep 27, 2022
1 parent 621d634 commit 1584aa3
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion services/horizon/internal/db2/schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package schema

import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
stdLog "log"
"text/tabwriter"
"time"

migrate "github.com/rubenv/sql-migrate"
"github.com/stellar/go/support/errors"
)

//go:generate go run github.com/kevinburke/go-bindata/[email protected]+incompatible -nometadata -pkg schema -o bindata.go migrations/
Expand Down Expand Up @@ -46,6 +47,51 @@ var Migrations migrate.MigrationSource = &migrate.AssetMigrationSource{
// upward back to the current version at the start of the process. If count is
// 0, a count of 1 will be assumed.
func Migrate(db *sql.DB, dir MigrateDir, count int) (int, error) {
if dir == MigrateUp {
// The code below locks ingestion to apply DB migrations. This works
// for MigrateUp migrations only because it's possible that MigrateDown
// can remove `key_value_store` table and it will deadlock the process.
txConn, err := db.Conn(context.Background())
if err != nil {
return 0, err
}

defer txConn.Close()

tx, err := txConn.BeginTx(context.Background(), nil)
if err != nil {
return 0, err
}

// Unlock ingestion when done. DB migrations run in a separate DB connection
// so no need to Commit().
defer tx.Rollback()

// Check if table exists
row := tx.QueryRow(`select exists (
select from information_schema.tables where table_schema = 'public' and table_name = 'key_value_store'
)`)
err = row.Err()
if err != nil {
return 0, err
}

var tableExists bool
err = row.Scan(&tableExists)
if err != nil {
return 0, err
}

if tableExists {
// Lock ingestion
row := tx.QueryRow("select value from key_value_store where key = 'exp_ingest_last_ledger' for update")
err = row.Err()
if err != nil {
return 0, err
}
}
}

switch dir {
case MigrateUp:
return migrate.ExecMax(db, "postgres", Migrations, migrate.Up, count)
Expand Down

0 comments on commit 1584aa3

Please sign in to comment.