// Package migrate reads migrations from sources and runs them against databases. // Sources are defined by the `source.Driver` and databases by the `database.Driver` // interface. The driver interfaces are kept "dump", all migration logic is kept // in this package. package migrate import ( "errors" "fmt" "os" "sync" "time" "github.com/hashicorp/go-multierror" "github.com/golang-migrate/migrate/v4/database" iurl "github.com/golang-migrate/migrate/v4/internal/url" "github.com/golang-migrate/migrate/v4/source" ) // DefaultPrefetchMigrations sets the number of migrations to pre-read // from the source. This is helpful if the source is remote, but has little // effect for a local source (i.e. file system). // Please note that this setting has a major impact on the memory usage, // since each pre-read migration is buffered in memory. See DefaultBufferSize. var DefaultPrefetchMigrations = uint(10) // DefaultLockTimeout sets the max time a database driver has to acquire a lock. var DefaultLockTimeout = 15 * time.Second var ( ErrNoChange = errors.New("no change") ErrNilVersion = errors.New("no migration") ErrInvalidVersion = errors.New("version must be >= -1") ErrLocked = errors.New("database locked") ErrLockTimeout = errors.New("timeout: can't acquire database lock") ) // ErrShortLimit is an error returned when not enough migrations // can be returned by a source for a given limit. type ErrShortLimit struct { Short uint } // Error implements the error interface. func (e ErrShortLimit) Error() string { return fmt.Sprintf("limit %v short", e.Short) } type ErrDirty struct { Version int } func (e ErrDirty) Error() string { return fmt.Sprintf("Dirty database version %v. Fix and force version.", e.Version) } type Migrate struct { sourceName string sourceDrv source.Driver databaseName string databaseDrv database.Driver // Log accepts a Logger interface Log Logger // GracefulStop accepts `true` and will stop executing migrations // as soon as possible at a safe break point, so that the database // is not corrupted. GracefulStop chan bool isLockedMu *sync.Mutex isGracefulStop bool isLocked bool // PrefetchMigrations defaults to DefaultPrefetchMigrations, // but can be set per Migrate instance. PrefetchMigrations uint // LockTimeout defaults to DefaultLockTimeout, // but can be set per Migrate instance. LockTimeout time.Duration } // New returns a new Migrate instance from a source URL and a database URL. // The URL scheme is defined by each driver. func New(sourceURL, databaseURL string) (*Migrate, error) { m := newCommon() sourceName, err := iurl.SchemeFromURL(sourceURL) if err != nil { return nil, err } m.sourceName = sourceName databaseName, err := iurl.SchemeFromURL(databaseURL) if err != nil { return nil, err } m.databaseName = databaseName sourceDrv, err := source.Open(sourceURL) if err != nil { return nil, err } m.sourceDrv = sourceDrv databaseDrv, err := database.Open(databaseURL) if err != nil { return nil, err } m.databaseDrv = databaseDrv return m, nil } // NewWithDatabaseInstance returns a new Migrate instance from a source URL // and an existing database instance. The source URL scheme is defined by each driver. // Use any string that can serve as an identifier during logging as databaseName. // You are responsible for closing the underlying database client if necessary. func NewWithDatabaseInstance(sourceURL string, databaseName string, databaseInstance database.Driver) (*Migrate, error) { m := newCommon() sourceName, err := iurl.SchemeFromURL(sourceURL) if err != nil { return nil, err } m.sourceName = sourceName m.databaseName = databaseName sourceDrv, err := source.Open(sourceURL) if err != nil { return nil, err } m.sourceDrv = sourceDrv m.databaseDrv = databaseInstance return m, nil } // NewWithSourceInstance returns a new Migrate instance from an existing source instance // and a database URL. The database URL scheme is defined by each driver. // Use any string that can serve as an identifier during logging as sourceName. // You are responsible for closing the underlying source client if necessary. func NewWithSourceInstance(sourceName string, sourceInstance source.Driver, databaseURL string) (*Migrate, error) { m := newCommon() databaseName, err := iurl.SchemeFromURL(databaseURL) if err != nil { return nil, err } m.databaseName = databaseName m.sourceName = sourceName databaseDrv, err := database.Open(databaseURL) if err != nil { return nil, err } m.databaseDrv = databaseDrv m.sourceDrv = sourceInstance return m, nil } // NewWithInstance returns a new Migrate instance from an existing source and // database instance. Use any string that can serve as an identifier during logging // as sourceName and databaseName. You are responsible for closing down // the underlying source and database client if necessary. func NewWithInstance(sourceName string, sourceInstance source.Driver, databaseName string, databaseInstance database.Driver) (*Migrate, error) { m := newCommon() m.sourceName = sourceName m.databaseName = databaseName m.sourceDrv = sourceInstance m.databaseDrv = databaseInstance return m, nil } func newCommon() *Migrate { return &Migrate{ GracefulStop: make(chan bool, 1), PrefetchMigrations: DefaultPrefetchMigrations, LockTimeout: DefaultLockTimeout, isLockedMu: &sync.Mutex{}, } } // Close closes the source and the database. func (m *Migrate) Close() (source error, database error) { databaseSrvClose := make(chan error) sourceSrvClose := make(chan error) m.logVerbosePrintf("Closing source and database\n") go func() { databaseSrvClose <- m.databaseDrv.Close() }() go func() { sourceSrvClose <- m.sourceDrv.Close() }() return <-sourceSrvClose, <-databaseSrvClose } // Migrate looks at the currently active migration version, // then migrates either up or down to the specified version. func (m *Migrate) Migrate(version uint) error { if err := m.lock(); err != nil { return err } curVersion, dirty, err := m.databaseDrv.Version() if err != nil { return m.unlockErr(err) } if dirty { return m.unlockErr(ErrDirty{curVersion}) } ret := make(chan interface{}, m.PrefetchMigrations) go m.read(curVersion, int(version), ret) return m.unlockErr(m.runMigrations(ret)) } // Steps looks at the currently active migration version. // It will migrate up if n > 0, and down if n < 0. func (m *Migrate) Steps(n int) error { if n == 0 { return ErrNoChange } if err := m.lock(); err != nil { return err } curVersion, dirty, err := m.databaseDrv.Version() if err != nil { return m.unlockErr(err) } if dirty { return m.unlockErr(ErrDirty{curVersion}) } ret := make(chan interface{}, m.PrefetchMigrations) if n > 0 { go m.readUp(curVersion, n, ret) } else { go m.readDown(curVersion, -n, ret) } return m.unlockErr(m.runMigrations(ret)) } // Up looks at the currently active migration version // and will migrate all the way up (applying all up migrations). func (m *Migrate) Up() error { if err := m.lock(); err != nil { return err } curVersion, dirty, err := m.databaseDrv.Version() if err != nil { return m.unlockErr(err) } if dirty { return m.unlockErr(ErrDirty{curVersion}) } ret := make(chan interface{}, m.PrefetchMigrations) go m.readUp(curVersion, -1, ret) return m.unlockErr(m.runMigrations(ret)) } // Down looks at the currently active migration version // and will migrate all the way down (applying all down migrations). func (m *Migrate) Down() error { if err := m.lock(); err != nil { return err } curVersion, dirty, err := m.databaseDrv.Version() if err != nil { return m.unlockErr(err) } if dirty { return m.unlockErr(ErrDirty{curVersion}) } ret := make(chan interface{}, m.PrefetchMigrations) go m.readDown(curVersion, -1, ret) return m.unlockErr(m.runMigrations(ret)) } // Drop deletes everything in the database. func (m *Migrate) Drop() error { if err := m.lock(); err != nil { return err } if err := m.databaseDrv.Drop(); err != nil { return m.unlockErr(err) } return m.unlock() } // Run runs any migration provided by you against the database. // It does not check any currently active version in database. // Usually you don't need this function at all. Use Migrate, // Steps, Up or Down instead. func (m *Migrate) Run(migration ...*Migration) error { if len(migration) == 0 { return ErrNoChange } if err := m.lock(); err != nil { return err } curVersion, dirty, err := m.databaseDrv.Version() if err != nil { return m.unlockErr(err) } if dirty { return m.unlockErr(ErrDirty{curVersion}) } ret := make(chan interface{}, m.PrefetchMigrations) go func() { defer close(ret) for _, migr := range migration { if m.PrefetchMigrations > 0 && migr.Body != nil { m.logVerbosePrintf("Start buffering %v\n", migr.LogString()) } else { m.logVerbosePrintf("Scheduled %v\n", migr.LogString()) } ret <- migr go func(migr *Migration) { if err := migr.Buffer(); err != nil { m.logErr(err) } }(migr) } }() return m.unlockErr(m.runMigrations(ret)) } // Force sets a migration version. // It does not check any currently active version in database. // It resets the dirty state to false. func (m *Migrate) Force(version int) error { if version < -1 { return ErrInvalidVersion } if err := m.lock(); err != nil { return err } if err := m.databaseDrv.SetVersion(version, false); err != nil { return m.unlockErr(err) } return m.unlock() } // Version returns the currently active migration version. // If no migration has been applied, yet, it will return ErrNilVersion. func (m *Migrate) Version() (version uint, dirty bool, err error) { v, d, err := m.databaseDrv.Version() if err != nil { return 0, false, err } if v == database.NilVersion { return 0, false, ErrNilVersion } return suint(v), d, nil } // read reads either up or down migrations from source `from` to `to`. // Each migration is then written to the ret channel. // If an error occurs during reading, that error is written to the ret channel, too. // Once read is done reading it will close the ret channel. func (m *Migrate) read(from int, to int, ret chan<- interface{}) { defer close(ret) // check if from version exists if from >= 0 { if err := m.versionExists(suint(from)); err != nil { ret <- err return } } // check if to version exists if to >= 0 { if err := m.versionExists(suint(to)); err != nil { ret <- err return } } // no change? if from == to { ret <- ErrNoChange return } if from < to { // it's going up // apply first migration if from is nil version if from == -1 { firstVersion, err := m.sourceDrv.First() if err != nil { ret <- err return } migr, err := m.newMigration(firstVersion, int(firstVersion)) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() from = int(firstVersion) } // run until we reach target ... for from < to { if m.stop() { return } next, err := m.sourceDrv.Next(suint(from)) if err != nil { ret <- err return } migr, err := m.newMigration(next, int(next)) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() from = int(next) } } else { // it's going down // run until we reach target ... for from > to && from >= 0 { if m.stop() { return } prev, err := m.sourceDrv.Prev(suint(from)) if os.IsNotExist(err) && to == -1 { // apply nil migration migr, err := m.newMigration(suint(from), -1) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() return } else if err != nil { ret <- err return } migr, err := m.newMigration(suint(from), int(prev)) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() from = int(prev) } } } // readUp reads up migrations from `from` limitted by `limit`. // limit can be -1, implying no limit and reading until there are no more migrations. // Each migration is then written to the ret channel. // If an error occurs during reading, that error is written to the ret channel, too. // Once readUp is done reading it will close the ret channel. func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { defer close(ret) // check if from version exists if from >= 0 { if err := m.versionExists(suint(from)); err != nil { ret <- err return } } if limit == 0 { ret <- ErrNoChange return } count := 0 for count < limit || limit == -1 { if m.stop() { return } // apply first migration if from is nil version if from == -1 { firstVersion, err := m.sourceDrv.First() if err != nil { ret <- err return } migr, err := m.newMigration(firstVersion, int(firstVersion)) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() from = int(firstVersion) count++ continue } // apply next migration next, err := m.sourceDrv.Next(suint(from)) if os.IsNotExist(err) { // no limit, but no migrations applied? if limit == -1 && count == 0 { ret <- ErrNoChange return } // no limit, reached end if limit == -1 { return } // reached end, and didn't apply any migrations if limit > 0 && count == 0 { ret <- os.ErrNotExist return } // applied less migrations than limit? if count < limit { ret <- ErrShortLimit{suint(limit - count)} return } } if err != nil { ret <- err return } migr, err := m.newMigration(next, int(next)) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() from = int(next) count++ } } // readDown reads down migrations from `from` limitted by `limit`. // limit can be -1, implying no limit and reading until there are no more migrations. // Each migration is then written to the ret channel. // If an error occurs during reading, that error is written to the ret channel, too. // Once readDown is done reading it will close the ret channel. func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) { defer close(ret) // check if from version exists if from >= 0 { if err := m.versionExists(suint(from)); err != nil { ret <- err return } } if limit == 0 { ret <- ErrNoChange return } // no change if already at nil version if from == -1 && limit == -1 { ret <- ErrNoChange return } // can't go over limit if already at nil version if from == -1 && limit > 0 { ret <- os.ErrNotExist return } count := 0 for count < limit || limit == -1 { if m.stop() { return } prev, err := m.sourceDrv.Prev(suint(from)) if os.IsNotExist(err) { // no limit or haven't reached limit, apply "first" migration if limit == -1 || limit-count > 0 { firstVersion, err := m.sourceDrv.First() if err != nil { ret <- err return } migr, err := m.newMigration(firstVersion, -1) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() count++ } if count < limit { ret <- ErrShortLimit{suint(limit - count)} } return } if err != nil { ret <- err return } migr, err := m.newMigration(suint(from), int(prev)) if err != nil { ret <- err return } ret <- migr go func() { if err := migr.Buffer(); err != nil { m.logErr(err) } }() from = int(prev) count++ } } // runMigrations reads *Migration and error from a channel. Any other type // sent on this channel will result in a panic. Each migration is then // proxied to the database driver and run against the database. // Before running a newly received migration it will check if it's supposed // to stop execution because it might have received a stop signal on the // GracefulStop channel. func (m *Migrate) runMigrations(ret <-chan interface{}) error { for r := range ret { if m.stop() { return nil } switch r := r.(type) { case error: return r case *Migration: migr := r // set version with dirty state if err := m.databaseDrv.SetVersion(migr.TargetVersion, true); err != nil { return err } if migr.Body != nil { m.logVerbosePrintf("Read and execute %v\n", migr.LogString()) if err := m.databaseDrv.Run(migr.BufferedBody); err != nil { return err } } // set clean state if err := m.databaseDrv.SetVersion(migr.TargetVersion, false); err != nil { return err } endTime := time.Now() readTime := migr.FinishedReading.Sub(migr.StartedBuffering) runTime := endTime.Sub(migr.FinishedReading) // log either verbose or normal if m.Log != nil { if m.Log.Verbose() { m.logPrintf("Finished %v (read %v, ran %v)\n", migr.LogString(), readTime, runTime) } else { m.logPrintf("%v (%v)\n", migr.LogString(), readTime+runTime) } } default: return fmt.Errorf("unknown type: %T with value: %+v", r, r) } } return nil } // versionExists checks the source if either the up or down migration for // the specified migration version exists. func (m *Migrate) versionExists(version uint) (result error) { // try up migration first up, _, err := m.sourceDrv.ReadUp(version) if err == nil { defer func() { if errClose := up.Close(); errClose != nil { result = multierror.Append(result, errClose) } }() } if os.IsExist(err) { return nil } else if !os.IsNotExist(err) { return err } // then try down migration down, _, err := m.sourceDrv.ReadDown(version) if err == nil { defer func() { if errClose := down.Close(); errClose != nil { result = multierror.Append(result, errClose) } }() } if os.IsExist(err) { return nil } else if !os.IsNotExist(err) { return err } m.logErr(fmt.Errorf("no migration found for version %d", version)) return os.ErrNotExist } // stop returns true if no more migrations should be run against the database // because a stop signal was received on the GracefulStop channel. // Calls are cheap and this function is not blocking. func (m *Migrate) stop() bool { if m.isGracefulStop { return true } select { case <-m.GracefulStop: m.isGracefulStop = true return true default: return false } } // newMigration is a helper func that returns a *Migration for the // specified version and targetVersion. func (m *Migrate) newMigration(version uint, targetVersion int) (*Migration, error) { var migr *Migration if targetVersion >= int(version) { r, identifier, err := m.sourceDrv.ReadUp(version) if os.IsNotExist(err) { // create "empty" migration migr, err = NewMigration(nil, "", version, targetVersion) if err != nil { return nil, err } } else if err != nil { return nil, err } else { // create migration from up source migr, err = NewMigration(r, identifier, version, targetVersion) if err != nil { return nil, err } } } else { r, identifier, err := m.sourceDrv.ReadDown(version) if os.IsNotExist(err) { // create "empty" migration migr, err = NewMigration(nil, "", version, targetVersion) if err != nil { return nil, err } } else if err != nil { return nil, err } else { // create migration from down source migr, err = NewMigration(r, identifier, version, targetVersion) if err != nil { return nil, err } } } if m.PrefetchMigrations > 0 && migr.Body != nil { m.logVerbosePrintf("Start buffering %v\n", migr.LogString()) } else { m.logVerbosePrintf("Scheduled %v\n", migr.LogString()) } return migr, nil } // lock is a thread safe helper function to lock the database. // It should be called as late as possible when running migrations. func (m *Migrate) lock() error { m.isLockedMu.Lock() defer m.isLockedMu.Unlock() if m.isLocked { return ErrLocked } // create done channel, used in the timeout goroutine done := make(chan bool, 1) defer func() { done <- true }() // use errchan to signal error back to this context errchan := make(chan error, 2) // start timeout goroutine timeout := time.After(m.LockTimeout) go func() { for { select { case <-done: return case <-timeout: errchan <- ErrLockTimeout return } } }() // now try to acquire the lock go func() { if err := m.databaseDrv.Lock(); err != nil { errchan <- err } else { errchan <- nil } }() // wait until we either receive ErrLockTimeout or error from Lock operation err := <-errchan if err == nil { m.isLocked = true } return err } // unlock is a thread safe helper function to unlock the database. // It should be called as early as possible when no more migrations are // expected to be executed. func (m *Migrate) unlock() error { m.isLockedMu.Lock() defer m.isLockedMu.Unlock() if err := m.databaseDrv.Unlock(); err != nil { // BUG: Can potentially create a deadlock. Add a timeout. return err } m.isLocked = false return nil } // unlockErr calls unlock and returns a combined error // if a prevErr is not nil. func (m *Migrate) unlockErr(prevErr error) error { if err := m.unlock(); err != nil { return multierror.Append(prevErr, err) } return prevErr } // logPrintf writes to m.Log if not nil func (m *Migrate) logPrintf(format string, v ...interface{}) { if m.Log != nil { m.Log.Printf(format, v...) } } // logVerbosePrintf writes to m.Log if not nil. Use for verbose logging output. func (m *Migrate) logVerbosePrintf(format string, v ...interface{}) { if m.Log != nil && m.Log.Verbose() { m.Log.Printf(format, v...) } } // logErr writes error to m.Log if not nil func (m *Migrate) logErr(err error) { if m.Log != nil { m.Log.Printf("error: %v", err) } }