Skip to content

Commit

Permalink
Use tcp for statsd and poll for metric arrival
Browse files Browse the repository at this point in the history
Have you heard the joke about UDP? Never mind. You probably won't get
it.

The integration tests seem to be flaking out while making assertions on
statsd metrics. There are two likely causes:

1. We wait for some sentinel and assume that when it arrives, all of the
   metrics we're looking for are ready. That's not necessarily true.
   This change replaces the single sentinel with assertions that all
   metrics are eventually available.
2. Even on good days and over the loopback adapter, UDP datagrams may
   not get to the intended target. To take that out of the equation,
   we'll move to TCP.

Signed-off-by: Matthew Sykes <[email protected]>
  • Loading branch information
sykesm authored and mastersingh24 committed Jul 2, 2020
1 parent 83b29a3 commit 74dd4dc
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 85 deletions.
87 changes: 47 additions & 40 deletions integration/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package e2e

import (
"bufio"
"encoding/json"
"io"
"net"
Expand Down Expand Up @@ -53,74 +54,80 @@ func StartPort() int {
return integration.E2EBasePort.StartPortForNode()
}

type DatagramReader struct {
type MetricsReader struct {
buffer *gbytes.Buffer
errCh chan error
sock *net.UDPConn
listener net.Listener
doneCh chan struct{}
closeOnce sync.Once
err error
}

func NewDatagramReader() *DatagramReader {
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())
sock, err := net.ListenUDP("udp", udpAddr)
Expect(err).NotTo(HaveOccurred())
err = sock.SetReadBuffer(1024 * 1024)
func NewMetricsReader() *MetricsReader {
listener, err := net.Listen("tcp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())

return &DatagramReader{
buffer: gbytes.NewBuffer(),
sock: sock,
errCh: make(chan error, 1),
doneCh: make(chan struct{}),
return &MetricsReader{
buffer: gbytes.NewBuffer(),
listener: listener,
errCh: make(chan error, 1),
doneCh: make(chan struct{}),
}
}

func (dr *DatagramReader) Buffer() *gbytes.Buffer {
return dr.buffer
func (mr *MetricsReader) Buffer() *gbytes.Buffer {
return mr.buffer
}

func (dr *DatagramReader) Address() string {
return dr.sock.LocalAddr().String()
func (mr *MetricsReader) Address() string {
return mr.listener.Addr().String()
}

func (dr *DatagramReader) String() string {
return string(dr.buffer.Contents())
func (mr *MetricsReader) String() string {
return string(mr.buffer.Contents())
}

func (dr *DatagramReader) Start() {
buf := make([]byte, 1024*1024)
func (mr *MetricsReader) Start() {
for {
select {
case <-dr.doneCh:
dr.errCh <- nil
conn, err := mr.listener.Accept()
if err != nil {
mr.errCh <- err
return
}
go mr.handleConnection(conn)
}
}

func (mr *MetricsReader) handleConnection(c net.Conn) {
defer GinkgoRecover()
defer c.Close()

br := bufio.NewReader(c)
for {
select {
case <-mr.doneCh:
c.Close()
default:
n, _, err := dr.sock.ReadFrom(buf)
if err != nil {
dr.errCh <- err
return
}
_, err = dr.buffer.Write(buf[0:n])
if err != nil {
dr.errCh <- err
data, err := br.ReadBytes('\n')
if err == io.EOF {
return
}
Expect(err).NotTo(HaveOccurred())

_, err = mr.buffer.Write(data)
Expect(err).NotTo(HaveOccurred())
}
}
}

func (dr *DatagramReader) Close() error {
dr.closeOnce.Do(func() {
close(dr.doneCh)
err := dr.sock.Close()
dr.err = <-dr.errCh
if dr.err == nil && err != nil && err != io.EOF {
dr.err = err
func (mr *MetricsReader) Close() error {
mr.closeOnce.Do(func() {
close(mr.doneCh)
err := mr.listener.Close()
mr.err = <-mr.errCh
if mr.err == nil && err != nil && err != io.EOF {
mr.err = err
}
})
return dr.err
return mr.err
}
89 changes: 46 additions & 43 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ var _ = Describe("EndToEnd", func() {

Describe("basic solo network with 2 orgs and no docker", func() {
var (
datagramReader *DatagramReader
metricsReader *MetricsReader
runArtifactsFilePath string
)

BeforeEach(func() {
datagramReader = NewDatagramReader()
go datagramReader.Start()
metricsReader = NewMetricsReader()
go metricsReader.Start()

network = nwo.New(nwo.BasicSolo(), testDir, nil, StartPort(), components)
network.MetricsProvider = "statsd"
network.StatsdEndpoint = datagramReader.Address()
network.StatsdEndpoint = metricsReader.Address()
network.ChannelParticipationEnabled = true
network.Profiles = append(network.Profiles, &nwo.Profile{
Name: "TwoOrgsBaseProfileChannel",
Expand Down Expand Up @@ -131,8 +131,8 @@ var _ = Describe("EndToEnd", func() {
})

AfterEach(func() {
if datagramReader != nil {
datagramReader.Close()
if metricsReader != nil {
metricsReader.Close()
}

// Terminate the processes but defer the network cleanup to the outer
Expand Down Expand Up @@ -198,14 +198,14 @@ var _ = Describe("EndToEnd", func() {
RunQueryInvokeQuery(network, orderer, peer, "testchannel")
RunRespondWith(network, orderer, peer, "testchannel")

By("waiting for DeliverFiltered stats to be emitted")
By("evaluating statsd metrics")
metricsWriteInterval := 5 * time.Second
Eventually(datagramReader, 2*metricsWriteInterval).Should(gbytes.Say("stream_request_duration.protos_Deliver.DeliverFiltered."))
CheckPeerStatsdStreamMetrics(metricsReader, 2*metricsWriteInterval)
CheckPeerStatsdMetrics("org1_peer0", metricsReader, 2*metricsWriteInterval)
CheckPeerStatsdMetrics("org2_peer0", metricsReader, 2*metricsWriteInterval)

CheckPeerStatsdStreamMetrics(datagramReader.String())
CheckPeerStatsdMetrics(datagramReader.String(), "org1_peer0")
CheckPeerStatsdMetrics(datagramReader.String(), "org2_peer0")
CheckOrdererStatsdMetrics(datagramReader.String(), "ordererorg_orderer")
By("checking for orderer metrics")
CheckOrdererStatsdMetrics("ordererorg_orderer", metricsReader, 2*metricsWriteInterval)

By("setting up a channel from a base profile")
additionalPeer := network.Peer("Org2", "peer0")
Expand Down Expand Up @@ -687,43 +687,46 @@ func RunRespondWith(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channe
Expect(sess.Err).To(gbytes.Say(`Error: endorsement failure during invoke.`))
}

func CheckPeerStatsdMetrics(contents, prefix string) {
func CheckPeerStatsdMetrics(prefix string, mr *MetricsReader, timeout time.Duration) {
By("checking for peer statsd metrics")
Expect(contents).To(ContainSubstring(prefix + ".logging.entries_checked.info:"))
Expect(contents).To(ContainSubstring(prefix + ".logging.entries_written.info:"))
Expect(contents).To(ContainSubstring(prefix + ".go.mem.gc_completed_count:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_requests_received.protos_Endorser.ProcessProposal:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_requests_completed.protos_Endorser.ProcessProposal.OK:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_request_duration.protos_Endorser.ProcessProposal.OK:"))
Expect(contents).To(ContainSubstring(prefix + ".ledger.blockchain_height"))
Expect(contents).To(ContainSubstring(prefix + ".ledger.blockstorage_commit_time"))
Expect(contents).To(ContainSubstring(prefix + ".ledger.blockstorage_and_pvtdata_commit_time"))
Eventually(mr.String, timeout).Should(SatisfyAll(
ContainSubstring(prefix+".logging.entries_checked.info:"),
ContainSubstring(prefix+".logging.entries_written.info:"),
ContainSubstring(prefix+".go.mem.gc_completed_count:"),
ContainSubstring(prefix+".grpc.server.unary_requests_received.protos_Endorser.ProcessProposal:"),
ContainSubstring(prefix+".grpc.server.unary_requests_completed.protos_Endorser.ProcessProposal.OK:"),
ContainSubstring(prefix+".grpc.server.unary_request_duration.protos_Endorser.ProcessProposal.OK:"),
ContainSubstring(prefix+".ledger.blockchain_height"),
ContainSubstring(prefix+".ledger.blockstorage_commit_time"),
ContainSubstring(prefix+".ledger.blockstorage_and_pvtdata_commit_time"),
))
}

func CheckPeerStatsdStreamMetrics(contents string) {
func CheckPeerStatsdStreamMetrics(mr *MetricsReader, timeout time.Duration) {
By("checking for stream metrics")
Expect(contents).To(ContainSubstring(".grpc.server.stream_requests_received.protos_Deliver.DeliverFiltered:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_requests_completed.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_request_duration.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_messages_received.protos_Deliver.DeliverFiltered"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_messages_sent.protos_Deliver.DeliverFiltered"))
Eventually(mr.String, timeout).Should(SatisfyAll(
ContainSubstring(".grpc.server.stream_requests_received.protos_Deliver.DeliverFiltered:"),
ContainSubstring(".grpc.server.stream_requests_completed.protos_Deliver.DeliverFiltered.Unknown:"),
ContainSubstring(".grpc.server.stream_request_duration.protos_Deliver.DeliverFiltered.Unknown:"),
ContainSubstring(".grpc.server.stream_messages_received.protos_Deliver.DeliverFiltered"),
ContainSubstring(".grpc.server.stream_messages_sent.protos_Deliver.DeliverFiltered"),
))
}

func CheckOrdererStatsdMetrics(contents, prefix string) {
By("checking for AtomicBroadcast")
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Broadcast.OK"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Deliver."))

By("checking for orderer metrics")
Expect(contents).To(ContainSubstring(prefix + ".logging.entries_checked.info:"))
Expect(contents).To(ContainSubstring(prefix + ".logging.entries_written.info:"))
Expect(contents).To(ContainSubstring(prefix + ".go.mem.gc_completed_count:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_requests_received.orderer_AtomicBroadcast.Deliver:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_requests_completed.orderer_AtomicBroadcast.Deliver."))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_messages_received.orderer_AtomicBroadcast.Deliver"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_messages_sent.orderer_AtomicBroadcast.Deliver"))
Expect(contents).To(ContainSubstring(prefix + ".ledger.blockchain_height"))
Expect(contents).To(ContainSubstring(prefix + ".ledger.blockstorage_commit_time"))
func CheckOrdererStatsdMetrics(prefix string, mr *MetricsReader, timeout time.Duration) {
Eventually(mr.String, timeout).Should(SatisfyAll(
ContainSubstring(prefix+".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Broadcast.OK"),
ContainSubstring(prefix+".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Deliver."),
ContainSubstring(prefix+".logging.entries_checked.info:"),
ContainSubstring(prefix+".logging.entries_written.info:"),
ContainSubstring(prefix+".go.mem.gc_completed_count:"),
ContainSubstring(prefix+".grpc.server.stream_requests_received.orderer_AtomicBroadcast.Deliver:"),
ContainSubstring(prefix+".grpc.server.stream_requests_completed.orderer_AtomicBroadcast.Deliver."),
ContainSubstring(prefix+".grpc.server.stream_messages_received.orderer_AtomicBroadcast.Deliver"),
ContainSubstring(prefix+".grpc.server.stream_messages_sent.orderer_AtomicBroadcast.Deliver"),
ContainSubstring(prefix+".ledger.blockchain_height"),
ContainSubstring(prefix+".ledger.blockstorage_commit_time"),
))
}

func OrdererOperationalClients(network *nwo.Network, orderer *nwo.Orderer) (authClient, unauthClient *http.Client) {
Expand Down
7 changes: 6 additions & 1 deletion integration/nwo/core_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,13 @@ operations:
metrics:
provider: {{ .MetricsProvider }}
statsd:
{{- if .StatsdEndpoint }}
network: tcp
address: {{ .StatsdEndpoint }}
{{- else }}
network: udp
address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
address: 127.0.0.1:8125
{{- end }}
writeInterval: 5s
prefix: {{ ReplaceAll (ToLower Peer.ID) "." "_" }}
`
7 changes: 6 additions & 1 deletion integration/nwo/orderer_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ Operations:
Metrics:
Provider: {{ .MetricsProvider }}
Statsd:
{{- if .StatsdEndpoint }}
Network: tcp
Address: {{ .StatsdEndpoint }}
{{- else }}
Network: udp
Address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
Address: 127.0.0.1:8125
{{- end }}
WriteInterval: 5s
Prefix: {{ ReplaceAll (ToLower Orderer.ID) "." "_" }}
{{- end }}
Expand Down

0 comments on commit 74dd4dc

Please sign in to comment.