Skip to content

Commit

Permalink
[receiver/postgresql] Add wal metrics and max connection metric (open…
Browse files Browse the repository at this point in the history
…-telemetry#13400)

Adds the following metrics:
postgresql.replication.data_delay
postgresql.wal.age
postgresql.wal.lag
postgresql.connection.max
  • Loading branch information
schmikei committed Aug 18, 2022
1 parent 1a552bc commit 4ce3fae
Show file tree
Hide file tree
Showing 10 changed files with 1,132 additions and 374 deletions.
82 changes: 82 additions & 0 deletions receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-co
import (
"context"
"database/sql"
"errors"
"fmt"
"net"
"strings"
"time"

"github.com/lib/pq"
"go.opentelemetry.io/collector/config/confignet"
Expand All @@ -38,6 +40,10 @@ type tableIdentifier string
// indexIdentifier is a unique string that identifies a particular index and is separated by the "|" character
type indexIdentifer string

// errNoLastArchive is an error that occurs when there is no previous wal archive, so there is no way to compute the
// last archived point
var errNoLastArchive = errors.New("no last archive found, not able to calculate oldest WAL age")

type client interface {
Close() error
getDatabaseStats(ctx context.Context, databases []string) (map[databaseName]databaseStats, error)
Expand All @@ -46,6 +52,9 @@ type client interface {
getDatabaseSize(ctx context.Context, databases []string) (map[databaseName]int64, error)
getDatabaseTableMetrics(ctx context.Context, db string) (map[tableIdentifier]tableStats, error)
getBlocksReadByTable(ctx context.Context, db string) (map[tableIdentifier]tableIOStats, error)
getReplicationStats(ctx context.Context) ([]replicationStats, error)
getLatestWalAgeSeconds(ctx context.Context) (int64, error)
getMaxConnections(ctx context.Context) (int64, error)
getIndexStats(ctx context.Context, database string) (map[indexIdentifer]indexStat, error)
listDatabases(ctx context.Context) ([]string, error)
}
Expand Down Expand Up @@ -427,6 +436,79 @@ func (c *postgreSQLClient) getBGWriterStats(ctx context.Context) (*bgStat, error
}, nil
}

func (c *postgreSQLClient) getMaxConnections(ctx context.Context) (int64, error) {
query := `SHOW max_connections;`
row := c.client.QueryRowContext(ctx, query)
var maxConns int64
err := row.Scan(&maxConns)
return maxConns, err
}

type replicationStats struct {
clientAddr string
pendingBytes int64
flushLag int64
replayLag int64
writeLag int64
}

func (c *postgreSQLClient) getReplicationStats(ctx context.Context) ([]replicationStats, error) {
query := `SELECT
client_addr,
coalesce(pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn), 0) AS replication_bytes_pending,
write_lag,
flush_lag,
replay_lag
FROM pg_stat_replication;
`
rows, err := c.client.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("unable to query pg_stat_replication: %w", err)
}
defer rows.Close()
rs := []replicationStats{}
var errors error
for rows.Next() {
var client string
var replicationBytes, writeLag, flushLag, replayLag int64
err = rows.Scan(&client, &replicationBytes, &writeLag, &flushLag, &replayLag)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
rs = append(rs, replicationStats{
clientAddr: client,
pendingBytes: replicationBytes,
replayLag: replayLag,
writeLag: writeLag,
flushLag: flushLag,
})
}

return rs, errors
}

func (c *postgreSQLClient) getLatestWalAgeSeconds(ctx context.Context) (int64, error) {
query := `SELECT
coalesce(last_archived_time, CURRENT_TIMESTAMP) AS last_archived_wal,
CURRENT_TIMESTAMP
FROM pg_stat_archiver;
`
row := c.client.QueryRowContext(ctx, query)
var lastArchivedWal, currentInstanceTime time.Time
err := row.Scan(&lastArchivedWal, &currentInstanceTime)
if err != nil {
return 0, err
}

if lastArchivedWal.Equal(currentInstanceTime) {
return 0, errNoLastArchive
}

age := int64(currentInstanceTime.Sub(lastArchivedWal).Seconds())
return age, nil
}

func (c *postgreSQLClient) listDatabases(ctx context.Context) ([]string, error) {
query := `SELECT datname FROM pg_database
WHERE datistemplate = false;`
Expand Down
8 changes: 8 additions & 0 deletions receiver/postgresqlreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ These are the metrics available for this scraper.
| **postgresql.bgwriter.maxwritten** | Number of times the background writer stopped a cleaning scan because it had written too many buffers. | | Sum(Int) | <ul> </ul> |
| **postgresql.blocks_read** | The number of blocks read. | 1 | Sum(Int) | <ul> <li>database</li> <li>table</li> <li>source</li> </ul> |
| **postgresql.commits** | The number of commits. | 1 | Sum(Int) | <ul> <li>database</li> </ul> |
| **postgresql.connection.max** | Configured maximum number of client connections allowed | {connections} | Gauge(Int) | <ul> </ul> |
| **postgresql.database.count** | Number of user databases. | {databases} | Sum(Int) | <ul> </ul> |
| **postgresql.db_size** | The database disk usage. | By | Sum(Int) | <ul> <li>database</li> </ul> |
| **postgresql.index.scans** | The number of index scans on a table. | {scans} | Sum(Int) | <ul> </ul> |
| **postgresql.index.size** | The size of the index on disk. | By | Gauge(Int) | <ul> </ul> |
| **postgresql.operations** | The number of db row operations. | 1 | Sum(Int) | <ul> <li>database</li> <li>table</li> <li>operation</li> </ul> |
| **postgresql.replication.data_delay** | The amount of data delayed in replication. | By | Gauge(Int) | <ul> <li>replication_client</li> </ul> |
| **postgresql.rollbacks** | The number of rollbacks. | 1 | Sum(Int) | <ul> <li>database</li> </ul> |
| **postgresql.rows** | The number of rows in the database. | 1 | Sum(Int) | <ul> <li>database</li> <li>table</li> <li>state</li> </ul> |
| **postgresql.table.count** | Number of user tables in a database. | | Sum(Int) | <ul> </ul> |
| **postgresql.table.size** | Disk space used by a table. | By | Sum(Int) | <ul> </ul> |
| **postgresql.table.vacuum.count** | Number of times a table has manually been vacuumed. | {vacuums} | Sum(Int) | <ul> </ul> |
| **postgresql.wal.age** | Age of the oldest WAL file. This metric requires WAL to be enabled with at least one replica.
| s | Gauge(Int) | <ul> </ul> |
| **postgresql.wal.lag** | Time between flushing recent WAL locally and receiving notification that the standby server has completed an operation with it. This metric requires WAL to be enabled with at least one replica.
| s | Gauge(Int) | <ul> <li>wal_operation_lag</li> <li>replication_client</li> </ul> |

**Highlighted metrics** are emitted by default. Other metrics are optional and not emitted by default.
Any metric can be enabled or disabled with the following scraper configuration:
Expand Down Expand Up @@ -53,6 +59,8 @@ metrics:
| bg_duration_type (type) | The type of time spent during the checkpoint. | sync, write |
| database | The name of the database. | |
| operation | The database operation. | ins, upd, del, hot_upd |
| replication_client | The IP address of the client connected to this backend. If this field is "unix", it indicates either that the client is connected via a Unix socket. | |
| source | The block read source type. | heap_read, heap_hit, idx_read, idx_hit, toast_read, toast_hit, tidx_read, tidx_hit |
| state | The tuple (row) state. | dead, live |
| table | The schema name followed by the table name. | |
| wal_operation_lag (operation) | The operation which is responsible for the lag. | flush, replay, write |
Loading

0 comments on commit 4ce3fae

Please sign in to comment.