mangos

package module
v3.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2022 License: Apache-2.0 Imports: 3 Imported by: 60

README

mangos™ v3

Please see here for an important message for the people of Russia.

Linux Windows macOS Coverage Discord Documentation License Version

Mangos™ is an implementation in pure Go of the SP (Scalability Protocols) messaging system. These are colloquially known as nanomsg.

NOTE: The import path has changed! Please change any references to go.nanomsg.org/mangos/v3. The old v2 imports will still work for old applications, provided that a sufficiently modern version of Go is used. However, no further work will be done on earlier versions. Earlier versions will still inter-operate with this version, except that within the same process the inproc transport can only be used by consumers using the same version of mangos.

The modern C implementation of the SP protocols is available as NNG™.

The original implementation of the SP protocols is available as nanomsg™.

Generally (modulo a few caveats) all of these implementations can inter-operate.

The design is intended to make it easy to add new transports, as well as new topologies (protocols in SP parlance.)

At present, all the Req/Rep, Pub/Sub, Pair, Bus, Push/Pull, and Surveyor/Respondent patterns are supported. This project also supports an experimental protocol called Star.

Supported transports include TCP, inproc, IPC, WebSocket, WebSocket/TLS and TLS.

Basic interoperability with nanomsg and NNG has been verified (you can do so yourself with nanocat and macat) for all protocols and transports that NNG and nanomsg support, except for the ZeroTier transport and the PAIRv1 protocol, which are only supported in NNG at this time.

There are a number of projects that use these products together.

Documentation

For API documentation, see https://pkg.go.dev/go.nanomsg.org/mangos/v3.

Testing

This package supports internal self tests, which can be run in the idiomatic Go way. (Note that most of the tests are in a test subdirectory.)

$ go test go.nanomsg.org/mangos/v3/...

There are also internal benchmarks available:

$ go test -bench=. go.nanomsg.org/mangos/v3/test

Commercial Support

Staysail Systems, Inc. offers commercial support for mangos.

Examples

Some examples are posted in the directories under examples/ in this project.

These examples are rewrites (in Go) of Tim Dysinger's Getting Started with Nanomsg.

Running go doc in the example directories will yield information about how to run each example program.

Enjoy!


Copyright 2021 The Mangos Authors

mangos™, Nanomsg™ and NNG™ are trademarks of Garrett D'Amore.

Documentation

Overview

Package mangos provides a pure Go implementation of the Scalability Protocols. These are more commonly known as "nanomsg" which is the C-based software package that is also their reference implementation.

These protocols facilitate the rapid creation of applications which rely on multiple participants in sometimes complex communications topologies, including Request/Reply, Publish/Subscribe, Push/Pull, Surveyor/Respondent, etc.

For more information, see www.nanomsg.org.

Index

Constants

View Source
const (
	ErrBadAddr     = errors.ErrBadAddr
	ErrBadHeader   = errors.ErrBadHeader
	ErrBadVersion  = errors.ErrBadVersion
	ErrTooShort    = errors.ErrTooShort
	ErrTooLong     = errors.ErrTooLong
	ErrClosed      = errors.ErrClosed
	ErrConnRefused = errors.ErrConnRefused
	ErrSendTimeout = errors.ErrSendTimeout
	ErrRecvTimeout = errors.ErrRecvTimeout
	ErrProtoState  = errors.ErrProtoState
	ErrProtoOp     = errors.ErrProtoOp
	ErrBadTran     = errors.ErrBadTran
	ErrBadProto    = errors.ErrBadProto
	ErrBadOption   = errors.ErrBadOption
	ErrBadValue    = errors.ErrBadValue
	ErrGarbled     = errors.ErrGarbled
	ErrAddrInUse   = errors.ErrAddrInUse
	ErrBadProperty = errors.ErrBadProperty
	ErrTLSNoConfig = errors.ErrTLSNoConfig
	ErrTLSNoCert   = errors.ErrTLSNoCert
	ErrNotRaw      = errors.ErrNotRaw
	ErrCanceled    = errors.ErrCanceled
	ErrNoContext   = errors.ErrNoContext
	ErrNoPeers     = errors.ErrNoPeers
)

Various error codes.

View Source
const (
	// OptionRaw is used to test if the socket in RAW mod.  The details of
	// how this varies from normal mode vary from protocol to protocol,
	// but RAW mode is generally minimal protocol processing, and
	// stateless.  RAW mode sockets are constructed with different
	// protocol constructor.  Raw mode is generally used with Device()
	// or similar proxy configurations.
	OptionRaw = "RAW"

	// OptionRecvDeadline is the time until the next Recv times out.  The
	// value is a time.Duration.  Zero value may be passed to indicate that
	// no timeout should be applied.  A negative value indicates a
	// non-blocking operation.  By default there is no timeout.
	OptionRecvDeadline = "RECV-DEADLINE"

	// OptionSendDeadline is the time until the next Send times out.  The
	// value is a time.Duration.  Zero value may be passed to indicate that
	// no timeout should be applied.  A negative value indicates a
	// non-blocking operation.  By default there is no timeout.
	OptionSendDeadline = "SEND-DEADLINE"

	// OptionRetryTime is used by REQ.  The argument is a time.Duration.
	// When a request has not been replied to within the given duration,
	// the request will automatically be resent to an available peer.
	// This value should be longer than the maximum possible processing
	// and transport time.  The value zero indicates that no automatic
	// retries should be sent.  The default value is one minute.
	//
	// Note that changing this option is only guaranteed to affect requests
	// sent after the option is set.  Changing the value while a request
	// is outstanding may not have the desired effect.
	OptionRetryTime = "RETRY-TIME"

	// OptionSubscribe is used by SUB/XSUB.  The argument is a []byte.
	// The application will receive messages that start with this prefix.
	// Multiple subscriptions may be in effect on a given socket.  The
	// application will not receive messages that do not match any current
	// subscriptions.  (If there are no subscriptions for a SUB/XSUB
	// socket, then the application will not receive any messages.  An
	// empty prefix can be used to subscribe to all messages.)
	OptionSubscribe = "SUBSCRIBE"

	// OptionUnsubscribe is used by SUB/XSUB.  The argument is a []byte,
	// representing a previously established subscription, which will be
	// removed from the socket.
	OptionUnsubscribe = "UNSUBSCRIBE"

	// OptionSurveyTime is used to indicate the deadline for survey
	// responses, when used with a SURVEYOR socket.  Messages arriving
	// after this will be discarded.  Additionally, this will set the
	// OptionRecvDeadline when starting the survey, so that attempts to
	// receive messages fail with ErrRecvTimeout when the survey is
	// concluded.  The value is a time.Duration.  Zero can be passed to
	// indicate an infinite time.  Default is 1 second.
	OptionSurveyTime = "SURVEY-TIME"

	// OptionTLSConfig is used to supply TLS configuration details. It
	// can be set using the ListenOptions or DialOptions.
	// The parameter is a tls.Config pointer.
	OptionTLSConfig = "TLS-CONFIG"

	// OptionWriteQLen is used to set the size, in messages, of the write
	// queue channel. By default, it's 128. This option cannot be set if
	// Dial or Listen has been called on the socket.
	OptionWriteQLen = "WRITEQ-LEN"

	// OptionReadQLen is used to set the size, in messages, of the read
	// queue channel. By default, it's 128. This option cannot be set if
	// Dial or Listen has been called on the socket.
	OptionReadQLen = "READQ-LEN"

	// OptionKeepAlive is used to set TCP KeepAlive.  Value is a boolean.
	// Default is true.
	OptionKeepAlive = "KEEPALIVE"

	// OptionKeepAliveTime is used to set TCP KeepAlive time in seconds.
	// Value is a time.Duration. Default is OS dependent.
	// Default is true.
	OptionKeepAliveTime = "KEEPALIVETIME"

	// OptionNoDelay is used to configure Nagle -- when true messages are
	// sent as soon as possible, otherwise some buffering may occur.
	// Value is a boolean.  Default is true.
	OptionNoDelay = "NO-DELAY"

	// OptionLinger is used to set the linger property.  This is the amount
	// of time to wait for send queues to drain when Close() is called.
	// Close() may block for up to this long if there is unsent data, but
	// will return as soon as all data is delivered to the transport.
	// Value is a time.Duration.  Default is one second.
	OptionLinger = "LINGER"

	// OptionTTL is used to set the maximum time-to-live for messages.
	// Note that not all protocols can honor this at this time, but for
	// those that do, if a message traverses more than this many devices,
	// it will be dropped.  This is used to provide protection against
	// loops in the topology.  The default is protocol specific.
	OptionTTL = "TTL"

	// OptionMaxRecvSize supplies the maximum receive size for inbound
	// messages.  This option exists because the wire protocol allows
	// the sender to specify the size of the incoming message, and
	// if the size were overly large, a bad remote actor could perform a
	// remote Denial-Of-Service by requesting ridiculously  large message
	// sizes and then stalling on send.  The default value is 1MB.
	//
	// A value of 0 removes the limit, but should not be used unless
	// absolutely sure that the peer is trustworthy.
	//
	// Not all transports honor this limit.  For example, this limit
	// makes no sense when used with inproc.
	//
	// Note that the size includes any Protocol specific header.  It is
	// better to pick a value that is a little too big, than too small.
	//
	// This option is only intended to prevent gross abuse  of the system,
	// and not a substitute for proper application message verification.
	//
	// This option is type int.
	OptionMaxRecvSize = "MAX-RCV-SIZE"

	// OptionReconnectTime is the initial interval used for connection
	// attempts.  If a connection attempt does not succeed, then ths socket
	// will wait this long before trying again.  An optional exponential
	// backoff may cause this value to grow.  See OptionMaxReconnectTime
	// for more details.   This is a time.Duration whose default value is
	// 100msec.  This option must be set before starting any dialers.
	OptionReconnectTime = "RECONNECT-TIME"

	// OptionMaxReconnectTime is the maximum value of the time between
	// connection attempts, when an exponential backoff is used.  If this
	// value is zero, then exponential backoff is disabled, otherwise
	// the value to wait between attempts is doubled until it hits this
	// limit.  This value is a time.Duration, with initial value 0.
	// This option must be set before starting any dialers.
	OptionMaxReconnectTime = "MAX-RECONNECT-TIME"

	// OptionBestEffort enables non-blocking send operations on the
	// socket. Normally (for some socket types), a socket will block if
	// there are no receivers, or the receivers are unable to keep up
	// with the sender. (Multicast sockets types like Bus or Star do not
	// behave this way.)  If this option is set, instead of blocking, the
	// message will be silently discarded.  The value is a boolean, and
	// defaults to False.
	OptionBestEffort = "BEST-EFFORT"

	// OptionLocalAddr expresses a local address.  For dialers, this is
	// the (often random) address that was locally bound.  For listeners,
	// it is usually the service address.  The value is a net.Addr.  This
	// is generally a read-only value for pipes, though it might sometimes
	// be available on dialers or listeners.
	OptionLocalAddr = "LOCAL-ADDR"

	// OptionRemoteAddr expresses a remote address.  For dialers, this is
	// the service address.  For listeners, its the address of the far
	// end dialer.  The value is a net.Addr.  It is generally read-only
	// and available only on pipes and dialers.
	OptionRemoteAddr = "REMOTE-ADDR"

	// OptionTLSConnState is used to supply TLS connection details. The
	// value is a tls.ConnectionState.  It is only valid when TLS is used.
	// This is available on pipes that are using TLS.
	OptionTLSConnState = "TLS-STATE"

	// OptionHTTPRequest conveys an *http.Request.  This read-only option
	// only exists for Pipes using websocket connections.
	OptionHTTPRequest = "HTTP-REQUEST"

	// OptionDialAsynch (used on a Dialer) causes the Dial() operation
	// to run in the background.  Further, the Dialer will always redial,
	// even if the first attempt fails.  (Normally dialing is performed
	// synchronously, so that if the remote peer is unavailable at first
	// the caller can learn of the error and handle or report it.
	// Note that mangos v1 behavior is the same as if this option is
	// set to true.
	OptionDialAsynch = "DIAL-ASYNCH"

	// OptionPeerPID is the peer process ID.  This is only implemented for
	// transports that support it, and it is a read-only option for pipes
	// only.  It may require cgo on some platforms.  The value is an int.
	OptionPeerPID = "PEER-PID"

	// OptionPeerUID is the effective peer user ID, typically obtained via
	// SO_PEERCRED.  It is only available transports that support it, and is
	// a read-only option for pipes.  It may require cgo on some platforms.
	// The value is an int.
	OptionPeerUID = "PEER-UID"

	// OptionPeerGID is the effective peer group ID, typically obtained via
	// SO_PEERCRED.  It is only available transports that support it, and is
	// a read-only option for pipes.  It may require cgo on some platforms.
	// The value is an int.
	OptionPeerGID = "PEER-GID"

	// OptionPeerZone is the peer's zone ID.  This is only supported on
	// Solaris platforms at present, and only when cgo support is enabled.
	// The value is an int.
	OptionPeerZone = "PEER-ZONE"

	// OptionFailNoPeers causes send or receive operations to fail
	// immediately rather than waiting for a timeout if there are no
	// connected peers.  This helps discriminate between cases involving
	// flow control, from those where we we have no peers.  Use of this
	// option may make applications more brittle, as a temporary disconnect
	// that may otherwise self-heal quickly will now create an immediate
	// failure.  Applications using this should be prepared to deal with
	// such failures.  Note that not all protocols respect this -- best
	// effort protocols will particularly not support this.
	OptionFailNoPeers = "FAIL-NO-PEERS"
)
View Source
const (
	// PipeEventAttaching is called before the Pipe is registered with the
	// socket.  The intention is to permit the application to reject
	// a pipe before it is attached.
	PipeEventAttaching = iota

	// PipeEventAttached occurs after the Pipe is attached.
	// Consequently, it is possible to use the Pipe for delivering
	// events to sockets, etc.
	PipeEventAttached

	// PipeEventDetached occurs after the Pipe has been detached
	// from the socket.
	PipeEventDetached
)
View Source
const (
	ProtoPair       = 0x10
	ProtoPair1      = 0x11
	ProtoPub        = 0x20
	ProtoSub        = 0x21
	ProtoReq        = 0x30
	ProtoRep        = 0x31
	ProtoPush       = 0x50
	ProtoPull       = 0x51
	ProtoSurveyor   = 0x62
	ProtoRespondent = 0x63
	ProtoBus        = 0x70
	ProtoStar       = 0x640 // Experimental!
)

Useful constants for protocol numbers. Note that the major protocol number is stored in the upper 12 bits, and the minor (subprotocol) is located in the bottom 4 bits.

Variables

This section is empty.

Functions

func Device

func Device(s1 Socket, s2 Socket) error

Device is used to create a forwarding loop between two sockets. If the same socket is listed (or either socket is nil), then a loopback device is established instead. Note that the single socket case is only valid for protocols where the underlying protocol can peer for itself (e.g. PAIR, or BUS, but not REQ/REP or PUB/SUB!)

If the plumbing is successful, nil will be returned. Two threads will be established to forward messages in each direction. If either socket returns error on receive or send, the goroutine doing the forwarding will exit. This means that closing either socket will generally cause the goroutines to exit. Apart from closing the socket(s), no further operations should be performed against the socket.

Both sockets should be RAW; use of a "cooked" socket will result in ErrNotRaw.

Types

type Context

type Context interface {

	// Close closes the open Socket.  Further operations on the socket
	// will return ErrClosed.
	Close() error

	// GetOption is used to retrieve an option for a socket.
	GetOption(name string) (interface{}, error)

	// SetOption is used to set an option for a socket.
	SetOption(name string, value interface{}) error

	// Send puts the message on the outbound send queue.  It blocks
	// until the message can be queued, or the send deadline expires.
	// If a queued message is later dropped for any reason,
	// there will be no notification back to the application.
	Send([]byte) error

	// Recv receives a complete message.  The entire message is received.
	Recv() ([]byte, error)

	// SendMsg puts the message on the outbound send.  It works like Send,
	// but allows the caller to supply message headers.  AGAIN, the Socket
	// ASSUMES OWNERSHIP OF THE MESSAGE.
	SendMsg(*Message) error

	// RecvMsg receives a complete message, including the message header,
	// which is useful for protocols in raw mode.
	RecvMsg() (*Message, error)
}

Context is a protocol context, and represents the upper side operations that applications will want to use. Every socket has a default context, but only a certain protocols will allow the creation of additional Context instances (only if separate stateful contexts make sense for a given protocol).

type Dialer

type Dialer interface {
	// Close closes the dialer, and removes it from any active socket.
	// Further operations on the Dialer will return ErrClosed.
	Close() error

	// Dial starts connecting on the address.  If a connection fails,
	// it will restart.
	Dial() error

	// Address returns the string (full URL) of the Listener.
	Address() string

	// SetOption sets an option on the Dialer. Setting options
	// can only be done before Dial() has been called.
	SetOption(name string, value interface{}) error

	// GetOption gets an option value from the Listener.
	GetOption(name string) (interface{}, error)
}

Dialer is an interface to the underlying dialer for a transport and address.

type Listener

type Listener interface {
	// Close closes the listener, and removes it from any active socket.
	// Further operations on the Listener will return ErrClosed.
	Close() error

	// Listen starts listening for new connectons on the address.
	Listen() error

	// Address returns the string (full URL) of the Listener.
	Address() string

	// SetOption sets an option on the Listener. Setting options
	// can only be done before Listen() has been called.
	SetOption(name string, value interface{}) error

	// GetOption gets an option value from the Listener.
	GetOption(name string) (interface{}, error)
}

Listener is an interface to the underlying listener for a transport and address.

type Message

type Message struct {
	// Header carries any protocol (SP) specific header.  Applications
	// should not modify or use this unless they are using Raw mode.
	// No user data may be placed here.
	Header []byte

	// Body carries the body of the message.  This can also be thought
	// of as the message "payload".
	Body []byte

	// Pipe may be set on message receipt, to indicate the Pipe from
	// which the Message was received.  There are no guarantees that the
	// Pipe is still active, and applications should only use this for
	// informational purposes.
	Pipe Pipe
	// contains filtered or unexported fields
}

Message encapsulates the messages that we exchange back and forth. The meaning of the Header and Body fields, and where the splits occur, will vary depending on the protocol. Note however that any headers applied by transport layers (including TCP/ethernet headers, and SP protocol independent length headers), are *not* included in the Header.

func NewMessage

func NewMessage(sz int) *Message

NewMessage is the supported way to obtain a new Message. This makes use of a "cache" which greatly reduces the load on the garbage collector.

func (*Message) Clone

func (m *Message) Clone()

Clone bumps the reference count on the message, allowing it to be shared. Callers of this MUST ensure that the message is never modified. If a read-only copy needs to be made "unique", callers can do so by using the Uniq function.

func (*Message) Dup

func (m *Message) Dup() *Message

Dup creates a "duplicate" message. The message is made as a deep copy, so the resulting message is safe to modify.

func (*Message) Free

func (m *Message) Free()

Free releases the message to the pool from which it was allocated. While this is not strictly necessary thanks to GC, doing so allows for the resources to be recycled without engaging GC. This can have rather substantial benefits for performance.

func (*Message) MakeUnique

func (m *Message) MakeUnique() *Message

MakeUnique ensures that the message is not shared. If the reference count on the message is one, then the message is returned as is. Otherwise a new copy of hte message is made, and the reference count on the original is dropped. Note that it is an error for the caller to use the original message after this function; the caller should always do `m = m.MakeUnique()`. This function should be called whenever the message is leaving the control of the caller, such as when passing it to a user program.

Note that transports always should call this on their transmit path if they are going to modify the message. (Most do not.)

type Pipe

type Pipe interface {

	// ID returns the numeric ID for this Pipe.  This will be a
	// 31 bit (bit 32 is clear) value for the Pipe, which is unique
	// across all other Pipe instances in the application, while
	// this Pipe exists.  (IDs are recycled on Close, but only after
	// all other Pipe values are used.)
	ID() uint32

	// Address returns the address (URL form) associated with the Pipe.
	// This matches the string passed to Dial() or Listen().
	Address() string

	// GetOption returns an arbitrary option.  The details will vary
	// for different transport types.
	GetOption(name string) (interface{}, error)

	// Listener returns the Listener for this Pipe, or nil if none.
	Listener() Listener

	// Dialer returns the Dialer for this Pipe, or nil if none.
	Dialer() Dialer

	// Close closes the Pipe.  This does a disconnect, or something similar.
	// Note that if a dialer is present and active, it will redial.
	Close() error
}

Pipe represents the high level interface to a low level communications channel. There is one of these associated with a given TCP connection, for example. This interface is intended for application use.

Note that applications cannot send or receive data on a Pipe directly.

type PipeEvent

type PipeEvent int

PipeEvent determines what is actually transpiring on the Pipe.

type PipeEventHook

type PipeEventHook func(PipeEvent, Pipe)

PipeEventHook is an application supplied function to be called when events occur relating to a Pipe.

type ProtocolBase

type ProtocolBase interface {
	ProtocolContext

	// Info returns the information describing this protocol.
	Info() ProtocolInfo

	// AddPipe is called when a new Pipe is added to the socket.
	// Typically, this is as a result of connect or accept completing.
	// The pipe ID will be unique for the socket at this time.
	// The implementation must not call back into the socket, but it
	// may reject the pipe by returning a non-nil result.
	AddPipe(ProtocolPipe) error

	// RemovePipe is called when a Pipe is removed from the socket.
	// Typically, this indicates a disconnected or closed connection.
	// This is called exactly once, after the underlying transport pipe
	// is closed.  The Pipe ID will still be valid.
	RemovePipe(ProtocolPipe)

	// OpenContext is a request to create a unique instance of the
	// protocol state machine, allowing concurrent use of states on
	// a given protocol socket.  Protocols that don't support this
	// should return ErrProtoOp.
	OpenContext() (ProtocolContext, error)
}

ProtocolBase provides the protocol-specific handling for sockets. This is the new style API for sockets, and is how protocols provide their specific handling.

type ProtocolContext

type ProtocolContext interface {
	// Close closes the context.
	Close() error

	// SendMsg sends the message.  The message may be queued, or
	// may be delivered immediately, depending on the nature of
	// the protocol.  On success, the context assumes ownership
	// of the message.  On error, the caller retains ownership,
	// and may either resend the message or dispose of it otherwise.
	SendMsg(*Message) error

	// RecvMsg receives a complete message, including the message header,
	// which is useful for protocols in raw mode.
	RecvMsg() (*Message, error)

	// GetOption is used to retrieve the current value of an option.
	// If the protocol doesn't recognize the option, EBadOption should
	// be returned.
	GetOption(string) (interface{}, error)

	// SetOption is used to set an option.  EBadOption is returned if
	// the option name is not recognized, EBadValue if the value is
	// invalid.
	SetOption(string, interface{}) error
}

ProtocolContext is a "context" for a protocol, which contains the various stateful operations such as timers, etc. necessary for running the protocol. This is separable from the protocol itself as the protocol may permit the creation of multiple contexts.

type ProtocolInfo

type ProtocolInfo struct {
	Self     uint16
	Peer     uint16
	SelfName string
	PeerName string
}

ProtocolInfo is a description of the protocol.

type ProtocolPipe

type ProtocolPipe interface {
	// ID returns a unique 31-bit value associated with this.
	// The value is unique for a given socket, at a given time.
	ID() uint32

	// Close does what you think.
	Close() error

	// SendMsg sends a message.  On success it returns nil. This is a
	// blocking call.
	SendMsg(*Message) error

	// RecvMsg receives a message.  It blocks until the message is
	// received.  On error, the pipe is closed and nil is returned.
	RecvMsg() *Message

	// SetPrivate is used to set protocol private data.
	SetPrivate(interface{})

	// GetPrivate returns the previously stored protocol private data.
	GetPrivate() interface{}
}

ProtocolPipe represents the handle that a Protocol implementation has to the underlying stream transport. It can be thought of as one side of a TCP, IPC, or other type of connection.

type Socket

type Socket interface {
	// Info returns information about the protocol (numbers and names)
	// and peer protocol.
	Info() ProtocolInfo

	// Close closes the open Socket.  Further operations on the socket
	// will return ErrClosed.
	Close() error

	// Send puts the message on the outbound send queue.  It blocks
	// until the message can be queued, or the send deadline expires.
	// If a queued message is later dropped for any reason,
	// there will be no notification back to the application.
	Send([]byte) error

	// Recv receives a complete message.  The entire message is received.
	Recv() ([]byte, error)

	// SendMsg puts the message on the outbound send.  It works like Send,
	// but allows the caller to supply message headers.  AGAIN, the Socket
	// ASSUMES OWNERSHIP OF THE MESSAGE.
	SendMsg(*Message) error

	// RecvMsg receives a complete message, including the message header,
	// which is useful for protocols in raw mode.
	RecvMsg() (*Message, error)

	// Dial connects a remote endpoint to the Socket.  The function
	// returns immediately, and an asynchronous goroutine is started to
	// establish and maintain the connection, reconnecting as needed.
	// If the address is invalid, then an error is returned.
	Dial(addr string) error

	DialOptions(addr string, options map[string]interface{}) error

	// NewDialer returns a Dialer object which can be used to get
	// access to the underlying configuration for dialing.
	NewDialer(addr string, options map[string]interface{}) (Dialer, error)

	// Listen connects a local endpoint to the Socket.  Remote peers
	// may connect (e.g. with Dial) and will each be "connected" to
	// the Socket.  The accepter logic is run in a separate goroutine.
	// The only error possible is if the address is invalid.
	Listen(addr string) error

	ListenOptions(addr string, options map[string]interface{}) error

	NewListener(addr string, options map[string]interface{}) (Listener, error)

	// GetOption is used to retrieve an option for a socket.
	GetOption(name string) (interface{}, error)

	// SetOption is used to set an option for a socket.
	SetOption(name string, value interface{}) error

	// OpenContext creates a new Context.  If a protocol does not
	// support separate contexts, this will return an error.
	OpenContext() (Context, error)

	// SetPipeEventHook sets a PipeEventHook function to be called when a
	// Pipe is added or removed from this socket (connect/disconnect).
	// The previous hook is returned (nil if none.)  (Only one hook can
	// be used at a time.)
	SetPipeEventHook(PipeEventHook) PipeEventHook
}

Socket is the main access handle applications use to access the SP system. It is an abstraction of an application's "connection" to a messaging topology. Applications can have more than one Socket open at a time.

type TranDialer

type TranDialer interface {
	// Dial is used to initiate a connection to a remote peer.
	Dial() (TranPipe, error)

	// SetOption sets a local option on the dialer.
	// ErrBadOption can be returned for unrecognized options.
	// ErrBadValue can be returned for incorrect value types.
	SetOption(name string, value interface{}) error

	// GetOption gets a local option from the dialer.
	// ErrBadOption can be returned for unrecognized options.
	GetOption(name string) (value interface{}, err error)
}

TranDialer represents the client side of a connection. Clients initiate the connection.

TranDialer is only intended for use by transport implementors, and should not be directly used in applications.

type TranListener

type TranListener interface {

	// Listen actually begins listening on the interface.  It is
	// called just prior to the Accept() routine normally. It is
	// the socket equivalent of bind()+listen().
	Listen() error

	// Accept completes the server side of a connection.  Once the
	// connection is established and initial handshaking is complete,
	// the resulting connection is returned to the client.
	Accept() (TranPipe, error)

	// Close ceases any listening activity, and will specifically close
	// any underlying file descriptor.  Once this is done, the only way
	// to resume listening is to create a new Server instance.  Presumably
	// this function is only called when the last reference to the server
	// is about to go away.  Established connections are unaffected.
	Close() error

	// SetOption sets a local option on the listener.
	// ErrBadOption can be returned for unrecognized options.
	// ErrBadValue can be returned for incorrect value types.
	SetOption(name string, value interface{}) error

	// GetOption gets a local option from the listener.
	// ErrBadOption can be returned for unrecognized options.
	GetOption(name string) (value interface{}, err error)

	// Address gets the local address.  The value may not be meaningful
	// until Listen() has been called.
	Address() string
}

TranListener represents the server side of a connection. Servers respond to a connection request from clients.

TranListener is only intended for use by transport implementors, and should not be directly used in applications.

type TranPipe

type TranPipe interface {

	// Send sends a complete message.  In the event of a partial send,
	// the Pipe will be closed, and an error is returned.  For reasons
	// of efficiency, we allow the message to be sent in a scatter/gather
	// list.
	Send(*Message) error

	// Recv receives a complete message.  In the event that either a
	// complete message could not be received, an error is returned
	// to the caller and the Pipe is closed.
	//
	// To mitigate Denial-of-Service attacks, we limit the max message
	// size to 1M.
	Recv() (*Message, error)

	// Close closes the underlying transport.  Further operations on
	// the Pipe will result in errors.  Note that messages that are
	// queued in transport buffers may still be received by the remote
	// peer.
	Close() error

	// GetOption returns an arbitrary transport specific option on a
	// pipe.  Options for pipes are read-only and specific to that
	// particular connection. If the property doesn't exist, then
	// ErrBadOption should be returned.
	GetOption(string) (interface{}, error)
}

TranPipe behaves like a full-duplex message-oriented connection between two peers. Callers may call operations on a Pipe simultaneously from different goroutines. (These are different from net.Conn because they provide message oriented semantics.)

Pipe is only intended for use by transport implementors, and should not be directly used in applications.

type Transport

type Transport interface {
	// Scheme returns a string used as the prefix for SP "addresses".
	// This is similar to a URI scheme.  For example, schemes can be
	// "tcp" (for "tcp:https://xxx..."), "ipc", "inproc", etc.
	Scheme() string

	// NewDialer creates a new Dialer for this Transport.
	NewDialer(url string, sock Socket) (TranDialer, error)

	// NewListener creates a new PipeListener for this Transport.
	// This generally also arranges for an OS-level file descriptor to be
	// opened, and bound to the the given address, as well as establishing
	// any "listen" backlog.
	NewListener(url string, sock Socket) (TranListener, error)
}

Transport is the interface for transport suppliers to implement.

Directories

Path Synopsis
Package errors just defines some constant error codes, and is intended to be directly imported.
Package errors just defines some constant error codes, and is intended to be directly imported.
examples
bus
bus implements a bus example.
bus implements a bus example.
context
context implements a request/reply server that utilizes a pool of worker goroutines to service multiple requests simultaneously.
context implements a request/reply server that utilizes a pool of worker goroutines to service multiple requests simultaneously.
pair
pair implements a pair example.
pair implements a pair example.
pipeline
pipeline implements a one way pipe example.
pipeline implements a one way pipe example.
pubsub
pubsub implements a publish/subscribe example.
pubsub implements a publish/subscribe example.
raw
raw implements an example concurrent request/reply server, using the raw server socket.
raw implements an example concurrent request/reply server, using the raw server socket.
reqrep
reqprep implements a request/reply example.
reqprep implements a request/reply example.
survey
survey implements a survey example.
survey implements a survey example.
websocket
websocket implements a simple websocket server for mangos, demonstrating how to use multiplex multiple sockets on a single HTTP server instance.
websocket implements a simple websocket server for mangos, demonstrating how to use multiplex multiple sockets on a single HTTP server instance.
internal
macat implements a nanocat(1) work-alike command.
macat implements a nanocat(1) work-alike command.
Package protocol implements some common things protocol implementors need.
Package protocol implements some common things protocol implementors need.
bus
Package bus implements the BUS protocol.
Package bus implements the BUS protocol.
pair
Package pair implements the PAIR protocol.
Package pair implements the PAIR protocol.
pair1
Package pair1 implements the PAIRv1 protocol.
Package pair1 implements the PAIRv1 protocol.
pub
Package pub implements the PUB protocol.
Package pub implements the PUB protocol.
pull
Package pull implements the PULL protocol, which is the read side of the pipeline pattern.
Package pull implements the PULL protocol, which is the read side of the pipeline pattern.
push
Package push implements the PUSH protocol, which is the write side of the pipeline pattern.
Package push implements the PUSH protocol, which is the write side of the pipeline pattern.
rep
Package rep implements the REP protocol, which is the response side of the request/response pattern.
Package rep implements the REP protocol, which is the response side of the request/response pattern.
req
Package req implements the REQ protocol, which is the request side of the request/response pattern.
Package req implements the REQ protocol, which is the request side of the request/response pattern.
respondent
Package respondent implements the RESPONDENT protocol, which is the response side of the survey pattern.
Package respondent implements the RESPONDENT protocol, which is the response side of the survey pattern.
star
Package star implements a new, experimental protocol called "STAR".
Package star implements a new, experimental protocol called "STAR".
sub
Package sub implements the SUB protocol.
Package sub implements the SUB protocol.
surveyor
Package surveyor implements the SURVEYOR protocol.
Package surveyor implements the SURVEYOR protocol.
xbus
Package xbus implements the BUS protocol.
Package xbus implements the BUS protocol.
xpair
Package xpair implements the PAIR protocol.
Package xpair implements the PAIR protocol.
xpair1
Package xpair1 implements the PAIRv1 protocol in monogamous mode only.
Package xpair1 implements the PAIRv1 protocol in monogamous mode only.
xpub
Package xpub implements the PUB protocol.
Package xpub implements the PUB protocol.
xpull
Package xpull implements the PULL protocol.
Package xpull implements the PULL protocol.
xpush
Package xpush implements the raw PUSH protocol.
Package xpush implements the raw PUSH protocol.
xrep
Package xrep implements the raw REP protocol, which is the response side of the request/response pattern.
Package xrep implements the raw REP protocol, which is the response side of the request/response pattern.
xreq
Package xreq implements the raw REQ protocol, which is the request side of the request/response pattern.
Package xreq implements the raw REQ protocol, which is the request side of the request/response pattern.
xrespondent
Package xrespondent implements the raw RESPONDENT protocol, which is the response side of survey pattern.
Package xrespondent implements the raw RESPONDENT protocol, which is the response side of survey pattern.
xstar
Package xstar implements the experimental star protocol.
Package xstar implements the experimental star protocol.
xsub
Package xsub implements the raw SUB protocol.
Package xsub implements the raw SUB protocol.
xsurveyor
Package xsurveyor implements the SURVEYOR protocol.
Package xsurveyor implements the SURVEYOR protocol.
Package test contains support code for various mangos tests.
Package test contains support code for various mangos tests.
all
Package all is used to register all transports.
Package all is used to register all transports.
inproc
Package inproc implements an simple inproc transport for mangos.
Package inproc implements an simple inproc transport for mangos.
ipc
Package ipc implements the IPC transport on top of UNIX domain sockets.
Package ipc implements the IPC transport on top of UNIX domain sockets.
tcp
Package tcp implements the TCP transport for mangos.
Package tcp implements the TCP transport for mangos.
tlstcp
Package tlstcp implements the TLS over TCP transport for mangos.
Package tlstcp implements the TLS over TCP transport for mangos.
ws
Package ws implements a simple WebSocket transport for mangos.
Package ws implements a simple WebSocket transport for mangos.
wss
Package wss implements a secure WebSocket transport for mangos.
Package wss implements a secure WebSocket transport for mangos.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL