1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package pool
import (
"errors"
"fmt"
"pool/pool/util/queue"
"time"
)
var (
//最大连接池缓冲处理连接对像管道长度
MaxClientChannelLen = 10240
//最大全局广播缓冲处理管道长度
MaxBroadcastQueueLen = 4096
//最大频道广播缓冲处理管道长度
MaxChanBroadcastQueueLen = 4096
//最大接收消息缓冲处理管道长度
MaxRecvChLen = 10240
//最大发送消息缓冲处理管道长度
MaxSendChLen = 10240
//最大会话数量
MaxConversationNum = 10240
)
// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
// Registered clients.
clients map[string]map[string]*Client //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
oldClients map[string]map[string]*Client //缓存断开的连接消息队列
// Inbound messages from the clients.
//可以用于广播所有连接对象
broadcastQueue chan *SendMsg
//广播指定频道的管道
chanBroadcastQueue chan *SendMsg
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan string
//conversation []*Conversation
}
//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
type oldMsg struct {
list *queue.PriorityQueue
Expiration time.Time //过期时间
}
func newHub() *hub {
return &hub{
//conversation: make([]*Conversation, MaxConversationNum),
register: make(chan *Client, MaxClientChannelLen),
unregister: make(chan string, MaxClientChannelLen),
clients: make(map[string]map[string]*Client),
oldClients: make(map[string]map[string]*Client),
broadcastQueue: make(chan *SendMsg, MaxBroadcastQueueLen),
chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen),
}
}
func (h *hub) run() {
loop:
for {
select {
case id, ok := <-h.unregister:
if !ok {
break loop
}
userInfo, _ := GetClientInfo(id)
c, _ := h.clients[userInfo.ClientId][userInfo.User]
if c != nil {
delete(h.clients[userInfo.ClientId], userInfo.User)
}
fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients))
if len(h.clients[userInfo.ClientId]) == 0 {
delete(h.clients, userInfo.ClientId)
}
case client, ok := <-h.register:
if !ok {
break loop
}
if len(h.clients[client.Id]) > 0 {
h.clients[client.Id][client.User] = client
}else{
h.clients[client.Id] = map[string]*Client{client.User: client}
}
fmt.Println("注册ws连接对象:", client.User, "连接总数:", len(h.clients))
case broadcastMsg, ok := <-h.broadcastQueue:
if !ok {
break loop
}
for _, v := range h.clients {
if v != nil {
for k, vv := range v {
client := vv
broadcastMsg.ToClientId = k
client.send(broadcastMsg)
}
}
}
case chanBroadcastMsg, ok := <-h.chanBroadcastQueue:
if !ok {
break loop
}
//广播指定频道的消息处理
//h.clients.Iterator(func(id string, v interface{}) bool {
for _, v := range h.clients {
if v != nil {
for k, vv := range v {
client := vv
for _, ch := range chanBroadcastMsg.Channel {
if searchStrArray(client.channel, ch) {
chanBroadcastMsg.ToClientId = k
client.send(chanBroadcastMsg)
}
}
}
}
}
// return true
//})
}
}
}
func (h *hub) ticker() {
//定时清理清理缓存的旧的连接对像
//gtimer.AddSingleton(30*time.Second, func() {
// if len(h.oldClients) > 0 {
// for _, v := range h.oldClients {
// //h.oldClients.Iterator(func(k string, v interface{}) bool {
// if v != nil {
// client := v
// if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
// //3分钟后清理组存中的旧连接对像
// h.clearOldClient(client)
// /// h.clearOldClient <- client
// }
// }
// // return true
// //})
// }
// }
//})
}
func (h *hub) AddClient(client *Client) error {
timeout := time.NewTimer(time.Second * 3)
defer timeout.Stop()
select {
case h.register <- client:
return nil
case <-timeout.C:
return errors.New("AddClient register消息管道blocked,写入消息超时")
}
}
func (h *hub) clearOldClient(client *Client) {
close(client.recvCh)
close(client.sendCh)
delete(h.oldClients[client.Id], client.User)
if len(h.oldClients[client.Id]) == 0 {
delete(h.oldClients, client.Id)
}
fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients))
}
func (h *hub) RemoveClient(client *Client) error {
//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
client.CloseTime = time.Now()
//h.oldClients[client.Id] = client
timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop()
select {
case h.unregister <- client.User:
return nil
case <-timeout.C:
return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
}
}