Skip to content

go library to manage shutdown of coordinated resources

License

Notifications You must be signed in to change notification settings

glowlabs-org/threadgroup

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

threadgroup

This code is from an old open source library I made at Nebulous Labs, which has since gone out of business. I wasn't sure how long the repo was going to stay online, so I made a new home for the code so that it could be used by Glow software.

threadgroup is a utility used to facilitate the coordinated shutdown of a set of resources. Instead of individually managing the shutdown of each resource, a single call to tg.Stop() will gracefully clean up everything all at once in a coordinate way that respects resource dependencies.

Resources can be added to a ThreadGroup by calling tg.Add(). They can then use the ThreadGroup to watch for a stop signal to determine that all resources related to the ThreadGroup are being shut down. A resource signals to the ThreadGroup that it has completed shutdown by calling tg.Done().

If a resource needs cleanup code to run when it gets shut down, it can schedule cleanup code by calling tg.OnStop() and tg.AfterStop(). All functions added with tg.OnStop() will be called after the ThreadGroup has stopped, but without blocking to wait until all of the resources have signaled that shutdown is complete. All functions added with tg.AfterStop() will only be called after all resources connected to the ThreadGroup have signaled that they have completed shutdown.

A net.Listener for example can use tg.OnStop() to have the ThreadGroup call listener.Close(). A logger on the other hand will probably handle its own shutdown in tg.AfterStop(), because the logger will not want to close itself until after all other functions have had a chance to log any errors they encountered during shutdown.

The ThreadGroup will call all OnStop and AfterStop functions in reverse order to how they were added, and the ThreadGroup will block on each one before calling the next one. Every function will be called regardless of any errors that are returned. Once every function in the OnStop and AfterStop queue has been called, tg.Stop() will return a single error that contains all of the errors produced during shutdown.

Long running background threads should be created by calling tg.Launch(). This will add the thread to the ThreadGroup before creating the thread, and will schedule a call to tg.Done() when the thread returns. Launch will return an error if Launch() is called after the ThreadGroup has already been stopped.

The ThreadGroup offers two helper functions for performing long sleeps. There's tg.Sleep(), which will sleep like time.Sleep but will exit early when tg.Stop() is called, and there is tg.AfterFunc() which will call a function after an amount of time has elapsed like time.AfterFunc(), except that it will exit early without calling the function if tg.Stop() is called.

Finally, the ThreadGroup offers a StopChan() function and an IsStopped() function. tg.StopChan() returns a channel which will be closed when tg.Stop() is called. IsStoppped() will immediately return true or false depending on whether tg.Stop() has been called yet.

Example:

var tg threadgroup.ThreadGroup

// Create a logger and set it to shutdown upon closing. Because log.Close() is
// the first call added to AfterStop, it will be the last function called when the
// ThreadGroup shuts down.
log := NewLogger()
tg.AfterStop(func() error {
	return log.Close()
})

// Create a background thread that will call `net.Dial()` once an hour. Use the
// ThreadGroup to ensure that the Dial call will be canceled upon shutdown, and
// also to ensure that the hour long sleeps between calls will be interrupted.
func InfrequentDial() {
	// Repeatedly perform a dial. Latency means the dial could take up to a
	// minute, which would delay shutdown without a cancel chan.
	for {
		// Perform the dial, but abort quickly if 'Stop' is called.
		dialer := &net.Dialer{
			Cancel:  tg.StopChan(), // This will cause the Dial to be cancelled
			Timeout: time.Minute,
		}
		conn, err := dialer.Dial("tcp", 8.8.8.8)
		if err == nil {
			conn.Close()
		}

        // Sleep for an hour, using the ThreadGroup's sleep to exit early if
        // tg.Stop() is called.
        if tg.Sleep(time.Hour) {
            return
        }
	}

    // Log that the dial thread has shut down cleanly. The logger does not
    // close until the `AfterStop()` functions are called in the threadgroup, and
    // those functions will be blocked until this thread has exited because this
    // thread was created using `tg.Launch()`. Therefore, the logger is guaranteed
    // to still be working at the moment this line is called, even though it is called
    // during shutdown.
	log.Println("InfrequentDial closed cleanly")
}()
err = tg.Launch(InfrequentDial)
if err != nil {
    fmt.Println("ThreadGroup was stopped before the InfrequentDial thread could be launched")
}

// Create a background thread that will listen for new connections but also
shut down cleanly when tg.Stop() is called.
func LongListener() {
	// Create the listener.
	listener, err := net.Listen("tcp", ":12345")
	if err != nil {
		return
	}
    // Close the listener as soon as 'Stop' is called. OnStop is used here,
    // because the listener should be closed immediately and does not need to wait for
    // other resources to finish shutting down first.
    //
    // In this case, OnStop is actually necessary, because the OnStop call here is what
    // will cause the infinite loop in the listener to break. Using AfterStop would be a
    // mistake because the infinite loop would not break until AfterStop is called,
    // and AfterStop would not be called until this thread returns, which would block
    // until listener.Close is called, resulting in deadlock.
	tg.OnStop(func() error {
		return listener.Close()
	})

	for {
		conn, err := listener.Accept()
		if err != nil {
            // Accept will return an error as soon as the listener is closed,
            // which will happen immediately after tg.Stop() is called.
			return
		}
		conn.Close()
	}

    log.Println("LongListener closed cleanly")
}()
err = tg.Launch(LongListener)
if err != nil {
    fmt.Println("ThreadGroup was stopped before the LongListener could be launched")
}

// Calling Stop will result in a quick, organized shutdown that closes all
// long-running resources.
err := tg.Stop()
if err != nil {
	fmt.Println(err)
}

About

go library to manage shutdown of coordinated resources

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Languages