Skip to content

Commit

Permalink
feat: show siderolink status on dashboard
Browse files Browse the repository at this point in the history
Add a new resource, `SiderolinkStatus`, which combines the following info:
- The Siderolink API endpoint without the query parameters or fragments (potentially sensitive info due to the join token)
- The status of the Siderolink connection

This resource is not set as sensitive, so it can be retrieved by the users with `os:operator` role (e.g., using `talosctl dashboard` through Omni).

Make use of this resource in the dashboard to display the status of the Siderolink connection.

Additionally, rework the status columns in the dashboard to:
- Display a Linux terminal compatible "tick" or a "cross" prefix for statuses in addition to the red/green color coding.
- Move and combine some statuses to save rows and make them more even.

Closes #8643.

Signed-off-by: Utku Ozdemir <[email protected]>
  • Loading branch information
utkuozdemir committed Jun 18, 2024
1 parent 6f6a5d1 commit 5ffc3f1
Show file tree
Hide file tree
Showing 21 changed files with 758 additions and 123 deletions.
6 changes: 6 additions & 0 deletions api/resource/definitions/siderolink/siderolink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ message ConfigSpec {
bool tunnel = 5;
}

// StatusSpec describes Siderolink status.
message StatusSpec {
string host = 1;
bool connected = 2;
}

// TunnelSpec describes Siderolink GRPC Tunnel configuration.
message TunnelSpec {
string api_endpoint = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (ctrl *DiagnosticsController) Run(ctx context.Context, r controller.Runtime
return nil
}

return safe.WriterModify(ctx, r, runtime.NewDiagnstic(runtime.NamespaceName, checkDescription.ID), func(res *runtime.Diagnostic) error {
return safe.WriterModify(ctx, r, runtime.NewDiagnostic(runtime.NamespaceName, checkDescription.ID), func(res *runtime.Diagnostic) error {
*res.TypedSpec() = *warning

return nil
Expand Down
28 changes: 6 additions & 22 deletions internal/app/machined/pkg/controllers/siderolink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,13 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
case <-ctx.Done():
return nil
case <-ticker.C:
reconnect, err := ctrl.shouldReconnect(wgClient)
reconnect, err := peerDown(wgClient)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// no Wireguard device, so no need to reconnect
continue
}

return err
}

Expand Down Expand Up @@ -476,27 +481,6 @@ func (ctrl *ManagerController) cleanupAddressSpecs(ctx context.Context, r contro
return nil
}

func (ctrl *ManagerController) shouldReconnect(wgClient *wgctrl.Client) (bool, error) {
wgDevice, err := wgClient.Device(constants.SideroLinkName)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// no Wireguard device, so no need to reconnect
return false, nil
}

return false, fmt.Errorf("error reading Wireguard device: %w", err)
}

if len(wgDevice.Peers) != 1 {
return false, fmt.Errorf("unexpected number of Wireguard peers: %d", len(wgDevice.Peers))
}

peer := wgDevice.Peers[0]
since := time.Since(peer.LastHandshakeTime)

return since >= wireguard.PeerDownInterval, nil
}

func withTransportCredentials(insec bool) grpc.DialOption {
var transportCredentials credentials.TransportCredentials

Expand Down
32 changes: 32 additions & 0 deletions internal/app/machined/pkg/controllers/siderolink/siderolink.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,35 @@

// Package siderolink provides controllers which manage file resources.
package siderolink

import (
"fmt"
"time"

"github.com/siderolabs/siderolink/pkg/wireguard"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"

"github.com/siderolabs/talos/pkg/machinery/constants"
)

// WireguardClient allows mocking Wireguard client.
type WireguardClient interface {
Device(string) (*wgtypes.Device, error)
Close() error
}

func peerDown(wgClient WireguardClient) (bool, error) {
wgDevice, err := wgClient.Device(constants.SideroLinkName)
if err != nil {
return false, fmt.Errorf("error reading Wireguard device: %w", err)
}

if len(wgDevice.Peers) != 1 {
return false, fmt.Errorf("unexpected number of Wireguard peers: %d", len(wgDevice.Peers))
}

peer := wgDevice.Peers[0]
since := time.Since(peer.LastHandshakeTime)

return since >= wireguard.PeerDownInterval, nil
}
158 changes: 158 additions & 0 deletions internal/app/machined/pkg/controllers/siderolink/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

package siderolink

import (
"context"
"errors"
"fmt"
"net"
"net/url"
"os"
"time"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
"go.uber.org/zap"
"golang.zx2c4.com/wireguard/wgctrl"

"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
)

// DefaultStatusUpdateInterval is the default interval between status updates.
const DefaultStatusUpdateInterval = 30 * time.Second

// StatusController reports siderolink status.
type StatusController struct {
// WGClientFunc is a function that returns a WireguardClient.
//
// When nil, it defaults to an actual Wireguard client.
WGClientFunc func() (WireguardClient, error)

// Interval is the time between peer status checks.
//
// When zero, it defaults to DefaultStatusUpdateInterval.
Interval time.Duration
}

// Name implements controller.Controller interface.
func (ctrl *StatusController) Name() string {
return "siderolink.StatusController"
}

// Inputs implements controller.Controller interface.
func (ctrl *StatusController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: siderolink.ConfigType,
ID: optional.Some(siderolink.ConfigID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *StatusController) Outputs() []controller.Output {
return []controller.Output{
{
Type: siderolink.StatusType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
func (ctrl *StatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
interval := ctrl.Interval
if interval == 0 {
interval = DefaultStatusUpdateInterval
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

wgClientFunc := ctrl.WGClientFunc
if wgClientFunc == nil {
wgClientFunc = func() (WireguardClient, error) {
return wgctrl.New()
}
}

wgClient, err := wgClientFunc()
if err != nil {
return fmt.Errorf("failed to create wireguard client: %w", err)
}

for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
case <-ticker.C:
}

r.StartTrackingOutputs()

if err = ctrl.reconcileStatus(ctx, r, wgClient); err != nil {
return err
}

if err = safe.CleanupOutputs[*siderolink.Status](ctx, r); err != nil {
return err
}

r.ResetRestartBackoff()
}
}

func (ctrl *StatusController) reconcileStatus(ctx context.Context, r controller.Runtime, wgClient WireguardClient) (err error) {
cfg, err := safe.ReaderGetByID[*siderolink.Config](ctx, r, siderolink.ConfigID)
if err != nil {
if state.IsNotFoundError(err) {
return nil
}

return err
}

if cfg.TypedSpec().APIEndpoint == "" {
return nil
}

var parsed *url.URL

if parsed, err = url.Parse(cfg.TypedSpec().APIEndpoint); err != nil {
return fmt.Errorf("failed to parse siderolink API endpoint: %w", err)
}

host, _, err := net.SplitHostPort(parsed.Host)
if err != nil {
host = parsed.Host
}

down, err := peerDown(wgClient)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}

down = true // wireguard device does not exist, we mark it as down
}

if err = safe.WriterModify(ctx, r, siderolink.NewStatus(), func(status *siderolink.Status) error {
status.TypedSpec().Host = host
status.TypedSpec().Connected = !down

return nil
}); err != nil {
return fmt.Errorf("failed to update status: %w", err)
}

return nil
}
132 changes: 132 additions & 0 deletions internal/app/machined/pkg/controllers/siderolink/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

package siderolink_test

import (
"os"
"sync"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"

"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
siderolinkctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/siderolink"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
)

type StatusSuite struct {
ctest.DefaultSuite
}

func TestStatusSuite(t *testing.T) {
suite.Run(t, &StatusSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 3 * time.Second,
},
})
}

func (suite *StatusSuite) TestStatus() {
wgClient := &mockWgClient{
device: &wgtypes.Device{
Peers: []wgtypes.Peer{
{
LastHandshakeTime: time.Now().Add(-time.Minute),
},
},
},
}

suite.Require().NoError(suite.Runtime().RegisterController(&siderolinkctrl.StatusController{
WGClientFunc: func() (siderolinkctrl.WireguardClient, error) {
return wgClient, nil
},
Interval: 100 * time.Millisecond,
}))

rtestutils.AssertNoResource[*siderolink.Status](suite.Ctx(), suite.T(), suite.State(), siderolink.StatusID)

siderolinkConfig := siderolink.NewConfig(config.NamespaceName, siderolink.ConfigID)

siderolinkConfig.TypedSpec().APIEndpoint = "https://siderolink.example.org:1234?jointoken=supersecret&foo=bar#some=fragment"

suite.Require().NoError(suite.State().Create(suite.Ctx(), siderolinkConfig))

suite.assertStatus("siderolink.example.org", true)

// disconnect the peer

wgClient.setDevice(&wgtypes.Device{
Peers: []wgtypes.Peer{
{LastHandshakeTime: time.Now().Add(-time.Hour)},
},
})

// no device
wgClient.setDevice(nil)
suite.assertStatus("siderolink.example.org", false)

// reconnect the peer
wgClient.setDevice(&wgtypes.Device{
Peers: []wgtypes.Peer{
{LastHandshakeTime: time.Now().Add(-5 * time.Second)},
},
})

suite.assertStatus("siderolink.example.org", true)

// update API endpoint

siderolinkConfig.TypedSpec().APIEndpoint = "https://new.example.org?jointoken=supersecret"

suite.Require().NoError(suite.State().Update(suite.Ctx(), siderolinkConfig))
suite.assertStatus("new.example.org", true)

// no config

suite.Require().NoError(suite.State().Destroy(suite.Ctx(), siderolinkConfig.Metadata()))
rtestutils.AssertNoResource[*siderolink.Status](suite.Ctx(), suite.T(), suite.State(), siderolink.StatusID)
}

func (suite *StatusSuite) assertStatus(endpoint string, connected bool) {
rtestutils.AssertResources(suite.Ctx(), suite.T(), suite.State(), []resource.ID{siderolink.StatusID},
func(c *siderolink.Status, assert *assert.Assertions) {
assert.Equal(endpoint, c.TypedSpec().Host)
assert.Equal(connected, c.TypedSpec().Connected)
})
}

type mockWgClient struct {
mu sync.Mutex
device *wgtypes.Device
}

func (m *mockWgClient) setDevice(device *wgtypes.Device) {
m.mu.Lock()
defer m.mu.Unlock()

m.device = device
}

func (m *mockWgClient) Device(string) (*wgtypes.Device, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.device == nil {
return nil, os.ErrNotExist
}

return m.device, nil
}

func (m *mockWgClient) Close() error {
return nil
}
Loading

0 comments on commit 5ffc3f1

Please sign in to comment.