forked from karalef/coincap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
159 lines (143 loc) · 3.46 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package coincap
import (
"errors"
"io"
"strconv"
"strings"
"sync"
"github.com/gorilla/websocket"
)
// Trade type.
type Trade struct {
Exchange string `json:"exchange"`
Base string `json:"base"`
Quote string `json:"quote"`
Direction string `json:"direction"`
Price float64 `json:"price"`
Volume float64 `json:"volume"`
Timestamp int64 `json:"timestamp"`
PriceUSD float64 `json:"priceUds"`
}
// Trades streams trades from other cryptocurrency exchange websockets.
// Users must select a specific exchange. In the /exchanges endpoint users
// can determine if an exchange has a socket available by noting
// response 'socket':true/false.
// The trades websocket is the only way to receive individual
// trade data through CoinCap.
func (c *Client) Trades(exchange string) (*Stream[*Trade], error) {
e, _, err := c.ExchangeByID(exchange)
if err != nil {
return nil, err
}
if e == nil {
return nil, errors.New("exchange not found")
}
if !e.Socket {
return nil, errors.New("exchange '" + exchange + "' does not support websockets")
}
const u = "wss:https://ws.coincap.io/trades/"
return dial[*Trade](c.ws, u+exchange)
}
// Price implements Unmarshaler interface for float64.
type Price float64
// UnmarshalJSON is Unmarshaler implementation.
func (p *Price) UnmarshalJSON(data []byte) error {
v, err := strconv.ParseFloat(string(data[1:len(data)-1]), 64)
*p = Price(v)
return err
}
// Prices is the most accurate source of real-time changes to the global price
// of an asset. Each time the system receives data that moves the global price
// in one direction or another, this change is immediately published through
// the websocket.
// These prices correspond with the values shown in /assets - a value that may
// change several times per second based on market activity.
//
// Emtpy 'assets' means prices for all assets.
func (c *Client) Prices(assets ...string) (*Stream[map[string]Price], error) {
a := "ALL"
if len(assets) > 0 {
t, _, err := c.AssetsSearchByIDs(assets)
if err != nil {
return nil, err
}
if len(t) != len(assets) {
return nil, errors.New("incorrect assets ids")
}
a = strings.Join(assets, ",")
}
const u = "wss:https://ws.coincap.io/prices?assets="
return dial[map[string]Price](c.ws, u+a)
}
// Stream streams data from websocket conneсtion.
type Stream[T any] struct {
conn *websocket.Conn
ch chan T
stop chan struct{}
mut sync.Mutex
err error
}
// DataChannel returns data channel.
// It will be closed if there is an error or if the stream is closed.
func (s *Stream[T]) DataChannel() <-chan T {
return s.ch
}
// Close closes stream.
func (s *Stream[T]) Close() {
s.mut.Lock()
s.conn.Close()
close(s.stop)
if s.err == nil {
close(s.ch)
}
s.mut.Unlock()
}
func (s *Stream[T]) Err() error {
s.mut.Lock()
defer s.mut.Unlock()
return s.err
}
func (s *Stream[T]) dial() {
var err error
for {
var r io.Reader
_, r, err = s.conn.NextReader()
if err != nil {
break
}
var v *T
var b []byte
v, b, err = decodeJSON[T](r)
if err != nil {
err = errors.New(string(b))
break
}
select {
case <-s.stop:
return
case s.ch <- *v:
}
}
s.mut.Lock()
select {
case <-s.stop:
default:
s.err = err
close(s.ch)
}
s.mut.Unlock()
return
}
func dial[T any](ws *websocket.Dialer, u string) (*Stream[T], error) {
conn, _, err := ws.Dial(u, nil)
if err != nil {
return nil, err
}
s := Stream[T]{
conn: conn,
ch: make(chan T),
stop: make(chan struct{}),
}
go s.dial()
return &s, nil
}