Skip to content

Commit

Permalink
Add interface to get retry statistics (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat committed Oct 18, 2021
1 parent bc8af34 commit 3b3e9f7
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 1 deletion.
8 changes: 7 additions & 1 deletion reconnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ type reconnectClient struct {
disconnected chan struct{}
}

// ReconnectClient is a Client with reconnect and retry features.
type ReconnectClient interface {
Client
Retryer
}

// NewReconnectClient creates a MQTT client with re-connect/re-publish/re-subscribe features.
func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (Client, error) {
func NewReconnectClient(dialer Dialer, opts ...ReconnectOption) (ReconnectClient, error) {
options := &ReconnectOptions{
ReconnectWaitBase: time.Second,
ReconnectWaitMax: 10 * time.Second,
Expand Down
53 changes: 53 additions & 0 deletions retryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,37 @@ type RetryClient struct {
chTask chan struct{}
stopped bool
taskQueue []func(ctx context.Context, cli *BaseClient)

muStats sync.RWMutex
stats RetryStats
}

// Retryer is an interface to control message retrying.
type Retryer interface {
// SetClient sets the new BaseClient.
// Call Retry() and Resubscribe() to process queued messages and subscriptions.
// The BaseClient must be unconnected when it is passed to the RetryClient.
SetClient(ctx context.Context, cli *BaseClient)
// Client returns the base client.
Client() *BaseClient
// Resubscribe subscribes all established subscriptions.
Resubscribe(ctx context.Context)
// Retry all queued publish/subscribe requests.
Retry(ctx context.Context)
// Stat returns retry stats.
Stats() RetryStats
}

// RetryStats stores retry statistics.
type RetryStats struct {
// Number of queued tasks.
QueuedTasks int
// Number of queued messages and subscriptions.
QueuedRetries int
// Total number of proceeded tasks.
TotalTasks int
// Total number of retries.
TotalRetries int
}

// Handle registers the message handler.
Expand Down Expand Up @@ -266,6 +297,10 @@ func (c *RetryClient) SetClient(ctx context.Context, cli *BaseClient) {
c.taskQueue = c.taskQueue[1:]
c.mu.Unlock()

c.muStats.Lock()
c.stats.TotalTasks++
c.muStats.Unlock()

task(ctx, cli)
}
}()
Expand Down Expand Up @@ -325,6 +360,10 @@ func (c *RetryClient) Retry(ctx context.Context) {
c.retryQueue = nil

for _, retry := range oldRetryQueue {
c.muStats.Lock()
c.stats.TotalRetries++
c.muStats.Unlock()

err := retry(ctx, cli)
if retryErr, ok := err.(ErrorWithRetry); ok {
c.retryQueue = append(c.retryQueue, retryErr.Retry)
Expand All @@ -334,3 +373,17 @@ func (c *RetryClient) Retry(ctx context.Context) {
}
})
}

// Stat returns retry stats.
func (c *RetryClient) Stats() RetryStats {
c.muStats.RLock()
stats := c.stats
c.muStats.RUnlock()

c.mu.RLock()
stats.QueuedTasks = len(c.taskQueue)
stats.QueuedRetries = len(c.retryQueue)
c.mu.RUnlock()

return stats
}
65 changes: 65 additions & 0 deletions retryclient_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ import (
"github.com/at-wat/mqtt-go/internal/filteredpipe"
)

func expectRetryStats(t *testing.T, expected, actual RetryStats) {
t.Helper()
if expected.QueuedTasks != actual.QueuedTasks {
t.Errorf("Expected queued tasks: %d, actual: %d", expected.QueuedTasks, actual.QueuedTasks)
}
if expected.QueuedRetries != actual.QueuedRetries {
t.Errorf("Expected queued retries: %d, actual: %d", expected.QueuedRetries, actual.QueuedRetries)
}
if expected.TotalTasks != actual.TotalTasks {
t.Errorf("Expected total tasks: %d, actual: %d", expected.TotalTasks, actual.TotalTasks)
}
if expected.TotalRetries != actual.TotalRetries {
t.Errorf("Expected total retries: %d, actual: %d", expected.TotalRetries, actual.TotalRetries)
}
}

func TestIntegration_RetryClient(t *testing.T) {
for name, url := range urls {
t.Run(name, func(t *testing.T) {
Expand All @@ -53,6 +69,11 @@ func TestIntegration_RetryClient(t *testing.T) {
t.Fatalf("Unexpected error: '%v'", err)
}

time.Sleep(50 * time.Millisecond)
expectRetryStats(t, RetryStats{
TotalTasks: 1,
}, cli.Stats())

if err := cli.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected error: '%v'", err)
}
Expand Down Expand Up @@ -198,6 +219,7 @@ func TestIntegration_RetryClient_TaskQueue(t *testing.T) {
if withWait {
time.Sleep(50 * time.Millisecond)
}

cli.SetClient(ctx, cliBase)

if withWait {
Expand All @@ -211,6 +233,12 @@ func TestIntegration_RetryClient_TaskQueue(t *testing.T) {
}
if withWait {
time.Sleep(50 * time.Millisecond)

if pubAt == pubBeforeSetClient || pubAt == pubBeforeConnect {
expectRetryStats(t, RetryStats{
QueuedTasks: 100,
}, cli.Stats())
}
}

if _, err := cli.Connect(ctx, "RetryClientQueue"); err != nil {
Expand All @@ -227,6 +255,10 @@ func TestIntegration_RetryClient_TaskQueue(t *testing.T) {
case <-ctxDone.Done():
}

expectRetryStats(t, RetryStats{
TotalTasks: 100,
}, cli.Stats())

if err := cli.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected error: '%v'", err)
}
Expand Down Expand Up @@ -274,10 +306,24 @@ func TestIntegration_RetryClient_RetryInitialRequest(t *testing.T) {
}
time.Sleep(100 * time.Millisecond)

expectRetryStats(t, RetryStats{
QueuedTasks: 1,
}, cli.Stats())

// Disconnect
atomic.StoreInt32(&sw, 1)
go func() {
time.Sleep(300 * time.Millisecond)

if name == "MQTT" || name == "MQTTs" {
// Mosquitto WebSocket sometimes requires extra time to connect
// and retry number may be increased.
expectRetryStats(t, RetryStats{
TotalTasks: 1, // first try to subscribe (failed)
QueuedRetries: 1,
}, cli.Stats())
}

// Connect
atomic.StoreInt32(&sw, 0)
}()
Expand All @@ -290,6 +336,25 @@ func TestIntegration_RetryClient_RetryInitialRequest(t *testing.T) {
t.Fatalf("Unexpected error: '%v'", err)
}

if name == "MQTT" || name == "MQTTs" {
// Mosquitto WebSocket sometimes requires extra time to connect
// and retry number may be increased.
time.Sleep(50 * time.Millisecond)
stats := cli.Stats()
if stats.QueuedTasks != 0 {
t.Errorf("Expected no queued tasks, actual: %d", stats.QueuedTasks)
}
if stats.QueuedRetries != 0 {
t.Errorf("Expected no queued retries, actual: %d", stats.QueuedRetries)
}
if stats.TotalTasks < 2 {
t.Errorf("Expected total tasks: at least 2, actual: %d", stats.TotalTasks)
}
if stats.TotalRetries < 1 {
t.Errorf("Expected total retries: at least 1, actual: %d", stats.TotalRetries)
}
}

cli.Disconnect(ctx)
})
}
Expand Down

0 comments on commit 3b3e9f7

Please sign in to comment.