forked from zelenin/go-tdlib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
139 lines (111 loc) · 2.65 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package client
import (
"context"
"errors"
"sync"
"time"
)
type Client struct {
jsonClient *JsonClient
extraGenerator ExtraGenerator
responses chan *Response
listenerStore *listenerStore
catchersStore *sync.Map
updatesTimeout time.Duration
catchTimeout time.Duration
}
type Option func(*Client)
func WithExtraGenerator(extraGenerator ExtraGenerator) Option {
return func(client *Client) {
client.extraGenerator = extraGenerator
}
}
func WithCatchTimeout(timeout time.Duration) Option {
return func(client *Client) {
client.catchTimeout = timeout
}
}
func WithProxy(req *AddProxyRequest) Option {
return func(client *Client) {
client.AddProxy(req)
}
}
func WithLogVerbosity(req *SetLogVerbosityLevelRequest) Option {
return func(client *Client) {
client.SetLogVerbosityLevel(req)
}
}
func NewClient(authorizationStateHandler AuthorizationStateHandler, options ...Option) (*Client, error) {
client := &Client{
jsonClient: NewJsonClient(),
responses: make(chan *Response, 1000),
listenerStore: newListenerStore(),
catchersStore: &sync.Map{},
}
client.extraGenerator = UuidV4Generator()
client.catchTimeout = 60 * time.Second
for _, option := range options {
option(client)
}
tdlibInstance.addClient(client)
go client.receiver()
err := Authorize(client, authorizationStateHandler)
if err != nil {
return nil, err
}
return client, nil
}
func (client *Client) receiver() {
for response := range client.responses {
if response.Extra != "" {
value, ok := client.catchersStore.Load(response.Extra)
if ok {
value.(chan *Response) <- response
}
}
typ, err := UnmarshalType(response.Data)
if err != nil {
continue
}
needGc := false
for _, listener := range client.listenerStore.Listeners() {
if listener.IsActive() {
listener.Updates <- typ
} else {
needGc = true
}
}
if needGc {
client.listenerStore.gc()
}
}
}
func (client *Client) Send(req Request) (*Response, error) {
req.Extra = client.extraGenerator()
catcher := make(chan *Response, 1)
client.catchersStore.Store(req.Extra, catcher)
defer func() {
client.catchersStore.Delete(req.Extra)
close(catcher)
}()
client.jsonClient.Send(req)
ctx, cancel := context.WithTimeout(context.Background(), client.catchTimeout)
defer cancel()
select {
case response := <-catcher:
return response, nil
case <-ctx.Done():
return nil, errors.New("response catching timeout")
}
}
func (client *Client) GetListener() *Listener {
listener := &Listener{
isActive: true,
Updates: make(chan Type, 1000),
}
client.listenerStore.Add(listener)
return listener
}
func (client *Client) Stop() {
client.Destroy()
}