-
Notifications
You must be signed in to change notification settings - Fork 0
/
influx.go
171 lines (149 loc) · 5.04 KB
/
influx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package monitor
import (
"github.com/sirupsen/logrus"
"strings"
"github.com/influxdata/influxdb/client/v2"
"github.com/nextmetaphor/tcp-proxy-pool/log"
"time"
"net"
)
const (
logErrorCreatingMonitorBatch = "Error creating monitoring batch"
logErrorCreatingMonitorConnection = "Error creating monitoring connection"
logErrorCreatingPoint = "Error creating point"
logErrorWritingPoint = "Error writing point"
measurementDataTransfer = "data-transfer"
fieldCopiedToServer = "copied-to-server"
fieldCopiedFromServer = "copied-from-server"
measurementConnectionPool = "connection-pool"
fieldConnectionsAccepted = "connections-accepted"
fieldConnectionsRejected = "connections-rejected"
fieldConnectionsInUse = "connections-in-use"
fieldConnectionPoolSize = "connection-pool-size"
measurementContainerPool = "container-pool"
fieldContainersCreated = "container-created"
fieldContainersDestroyed = "container-destroyed"
tagTCPProxyPoolClientConn = "client-conn"
tagTCPProxyPoolServerConn = "server-conn"
)
var (
influxClient client.Client
)
// CreateMonitor simply creates a pointer to a Client
// TODO return error
func CreateMonitor(ms Settings, l *logrus.Logger) *Client {
if strings.TrimSpace(ms.Address) == "" {
return nil
}
monitorClient, err := client.NewUDPClient(client.UDPConfig{
Addr: ms.Address,
})
if err != nil {
log.Error(logErrorCreatingMonitorConnection, err, l)
}
influxClient = monitorClient
return &Client{
settings: ms,
logger: l,
}
}
func (mon *Client) writePoint(measurementName string, tags map[string]string, fields map[string]interface{}) {
if strings.TrimSpace(mon.settings.Address) == "" {
return
}
// TODO - new batch every time???
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: mon.settings.Database,
Precision: "ns",
})
if err != nil {
log.Error(logErrorCreatingMonitorBatch, err, mon.logger)
return
}
pt, err := client.NewPoint(measurementName, tags, fields, time.Now())
if err != nil {
log.Error(logErrorCreatingPoint, err, mon.logger)
return
}
bp.AddPoint(pt)
if influxClient != nil {
if err := influxClient.Write(bp); err != nil {
log.Error(logErrorWritingPoint, err, mon.logger)
}
}
}
// WriteBytesCopied writes the number of bytes copied to the monitor connection
func (mon *Client) WriteBytesCopied(srcIsServer bool, totalBytesCopied int64, dst, src net.Conn) {
var fields map[string]interface{}
var tags map[string]string
if srcIsServer {
fields = map[string]interface{}{fieldCopiedFromServer: totalBytesCopied}
tags = map[string]string{
tagTCPProxyPoolClientConn: dst.LocalAddr().String(),
tagTCPProxyPoolServerConn: src.LocalAddr().String(),
}
} else {
fields = map[string]interface{}{fieldCopiedToServer: totalBytesCopied}
tags = map[string]string{
tagTCPProxyPoolClientConn: src.LocalAddr().String(),
tagTCPProxyPoolServerConn: dst.LocalAddr().String(),
}
}
go mon.writePoint(
measurementDataTransfer,
tags,
fields)
}
// WriteConnectionAccepted writes a point to the monitor to indicate that a connection was accepted
func (mon *Client) WriteConnectionAccepted(src net.Conn) {
go mon.writePoint(
measurementConnectionPool,
map[string]string{
tagTCPProxyPoolClientConn: src.LocalAddr().String(),
tagTCPProxyPoolServerConn: src.RemoteAddr().String(),
},
map[string]interface{}{fieldConnectionsAccepted: 1})
}
// WriteConnectionRejected writes a point to indicate that a connection was rejected
func (mon *Client) WriteConnectionRejected(src net.Conn) {
go mon.writePoint(
measurementConnectionPool,
map[string]string{
tagTCPProxyPoolClientConn: src.LocalAddr().String(),
tagTCPProxyPoolServerConn: src.RemoteAddr().String(),
},
map[string]interface{}{fieldConnectionsRejected: 1})
}
// WriteConnectionPoolStats writes a the number of connections in use and the pool size to the monitor
func (mon *Client) WriteConnectionPoolStats(src net.Conn, connectionsInUse, connectionPoolSize int) {
go mon.writePoint(
measurementConnectionPool,
map[string]string{
tagTCPProxyPoolClientConn: src.LocalAddr().String(),
tagTCPProxyPoolServerConn: src.RemoteAddr().String()},
map[string]interface{}{
fieldConnectionsInUse: connectionsInUse,
fieldConnectionPoolSize: connectionPoolSize})
}
// WriteContainerCreated writes the number of connections created to the monitor
func (mon *Client) WriteContainerCreated(numContainersCreated int) {
go mon.writePoint(
measurementContainerPool,
map[string]string{
},
map[string]interface{}{fieldContainersCreated: numContainersCreated})
}
// WriteContainerDestroyed writes the number of connections destroyed to the monitor
func (mon *Client) WriteContainerDestroyed(numContainersDestroyed int) {
go mon.writePoint(
measurementContainerPool,
map[string]string{
},
map[string]interface{}{fieldContainersDestroyed: numContainersDestroyed})
}
// CloseMonitorConnection simple closes the InfluxDB client when processing is complete
func (mon *Client) CloseMonitorConnection() {
if influxClient != nil {
influxClient.Close()
}
}