1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
package pool
import (
"errors"
"fmt"
"net/http"
mq "pool/dao"
"pool/pool/util/grpool"
"strings"
"sync"
"time"
)
// 第一步,实例化连接对像
func NewClient(conf *Config) *Client {
if conf.Goroutine < 5 {
conf.Goroutine = 200
}
var client *Client
oldclient := wsSever.hub.oldClients[conf.Id]
delete(wsSever.hub.oldClients, conf.Id)
if oldclient != nil {
c := oldclient
client = c
} else {
client = &Client{
Id: conf.Id,
types: conf.Type,
hub: wsSever.hub,
sendCh: make(chan *SendMsg, MaxRecvChLen),
recvCh: make(chan *SendMsg, MaxSendChLen),
mux: new(sync.Mutex),
}
}
client.recvPing = make(chan int)
client.sendPing = make(chan int)
client.channel = conf.Channel
client.grpool = grpool.NewPool(conf.Goroutine)
client.IsClose = make(chan bool)
client.OnError(nil)
client.OnOpen(nil)
client.OnMessage(nil)
client.OnClose(nil)
client.OnPing(nil)
client.OnPong(nil)
wsSever.hub.AddClient(client)
return client
}
// 开启连接
// serveWs handles websocket requests from the peer.
func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.Header) {
defer dump()
conn, err := upgrader.Upgrade(w, r, head)
if err != nil {
if c.onError != nil {
c.onError(err)
}
return
}
r.Close = true
c.conn = conn
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker = time.NewTimer(pingPeriod)
c.closeTicker = time.NewTimer(closeWait)
c.conn.SetPongHandler(func(str string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
fmt.Println("收到pong---", c.Id, str)
c.pingPeriodTicker.Reset(pingPeriod)
c.onPong()
return nil
})
c.conn.SetPingHandler(func(str string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker.Reset(pingPeriod)
fmt.Println("收到ping---", c.Id, str)
c.recvPing <- 1
//if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
// c.onError(errors.New("回复客户端PongMessage出现异常:"+err.Error()))
//}
c.onPing()
return nil
})
c.conn.SetCloseHandler(func(code int, str string) error {
//收到客户端连接关闭时的回调
fmt.Println("连接ID:"+c.Id, "SetCloseHandler接收到连接关闭状态:", code, "关闭信息:", str)
return nil
})
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
//连接开启后瑞添加连接池中
c.openTime = time.Now()
c.grpool.Add(c.writePump)
c.grpool.Add(c.readPump)
//开始接收消息
c.grpool.Add(c.recvMessage)
c.grpool.Add(c.Tickers)
c.grpool.Add(c.closeTickers)
c.onOpen()
//客户端建立连接
//user := GetClientInfoById(c.Id)
//if user.Promoter == "1" {
// mqData := &mq.SetMsgReq{
// ProcedureType: 1,
// BusinessId: user.Source,
// CustomerId: user.CustomerId,
// Status: "0",
// StartTime: time.Now().Format("2006-01-02 15:04:05"),
// Promoter: user.Promoter,
// }
// PublishData(mqData)
//}
}
//获取连接对像运行过程中的信息
func (c *Client) GetRuntimeInfo() *RuntimeInfo {
return &RuntimeInfo{
Id: c.Id,
Type: c.types,
Channel: c.channel,
OpenTime: c.openTime,
LastReceiveTime: c.lastSendTime,
LastSendTime: c.lastSendTime,
Ip: c.conn.RemoteAddr().String(),
}
}
//回调添加方法
//监听连接对象的连接open成功的事件
func (c *Client) OnOpen(h func()) {
if h == nil {
c.onOpen = func() {
}
return
}
c.onOpen = h
}
func (c *Client) OnPing(h func()) {
if h == nil {
c.onPing = func() {
}
return
}
c.onPing = h
}
func (c *Client) OnPong(h func()) {
if h == nil {
c.onPong = func() {
}
return
}
c.onPong = h
}
//监听连接对象的连接open成功的事件
func (c *Client) OnMessage(h func(msg *SendMsg)) {
if h == nil {
c.onMessage = func(msg *SendMsg) {
}
return
}
c.onMessage = h
}
//监听连接对象的连接open成功的事件
func (c *Client) OnClose(h func()) {
if h == nil {
c.onClose = func() {
}
return
}
c.onClose = h
}
//监听连接对象的错误信息
func (c *Client) OnError(h func(err error)) {
if h == nil {
c.onError = func(err error) {
}
return
}
c.onError = h
}
// 单个连接发送消息
func (c *Client) Send(msg *SendMsg) error {
select {
case <-c.IsClose:
c.close()
return errors.New("连接ID:" + c.Id + "ws连接己经断开,无法发送消息")
default:
msg.ToClientId = c.Id
c.send(msg)
}
return nil
}
//服务主动关闭连接
func (c *Client) Close() {
c.close()
}
// 包级的公开方法
// 所有包级的发送如果连接断开,消息会丢失
// 发送消息 只从连接池中按指定的toClientId的连接对象发送出消息
// 在此方法中sendMsg.Channel指定的值不会处理
func Send(msg *SendMsg) error {
//log.Info("发送指令:",msg.Cmd,msg.ToClientId)
if msg.ToClientId == "" {
return errors.New("发送消息的消息体中未指定ToClient目标!")
}
c := wsSever.hub.clients[msg.ToClientId]
if c != nil {
client := c
if client.Id == msg.ToClientId {
msg.ToClientId = client.Id
err := client.Send(msg)
if err != nil {
return errors.New("发送消息出错:" + err.Error() + ",连接对象id=" + client.Id + "。")
}
}
} else {
fmt.Println(1)
fmt.Println(wsSever.hub.clients)
}
return nil
}
// 通过连接池广播消息,每次广播只能指定一个类型下的一个频道
// 广播一定要设置toClientId,这个对象可以确定广播超时消息回复对像
// 并且只针对频道内的连接进行处理
func Broadcast(msg *SendMsg) error {
if len(msg.Channel) == 0 {
return errors.New("广播消息的消息体中未指定Channel频道!")
}
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case wsSever.hub.chanBroadcastQueue <- msg:
return nil
case <-timeout.C:
return errors.New("hub.chanBroadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.chanBroadcastQueue)))
}
}
//全局广播
//广播一定要设置toClientId,这个对象可以确定广播超时消息回复对像
//通过此方法进行广播的消息体,会对所有的类型和频道都进行广播
func BroadcastAll(msg *SendMsg) error {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case wsSever.hub.broadcastQueue <- msg:
return nil
case <-timeout.C:
return errors.New("hub.broadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.broadcastQueue)))
}
}
// 根据来源和类型获取客户端列表
func GetList(source, userType string) map[string]*Client {
clientData := make(map[string]*Client, 0)
for k, v := range wsSever.hub.clients {
userInfo := strings.Split(k, "_")
if userInfo[0] == source {
if userType != "" {
if userInfo[1] == userType {
clientData[k] = v
}
} else {
clientData[k] = v
}
}
}
return clientData
}
type UserInfo struct {
Source string `json:"source"` //
Promoter string `json:"promoter"`
CustomerId string `json:"customerId"`
ClientId string `json:"clientId"`
ToChannel []string `json:"toChannel"`
}
//根据token获取用户来源信息
func GetClientInfoByToken(token string) (*UserInfo, error) {
tokenData := strings.Split(token, "_")
if len(tokenData) < 3 {
return nil, errors.New("用户数据有误")
}
source, err := DataAesDecrypt(tokenData[0])
if err != nil {
return nil, errors.New("用户数据有误")
}
if len(source) <= 0 {
return nil, errors.New("用户数据有误")
}
userData := new(UserInfo)
userData.Source = tokenData[0]
userData.Promoter = tokenData[1]
userData.CustomerId = tokenData[2]
userData.ClientId = userData.Source + "_" + userData.Promoter + "_" + userData.CustomerId
return userData, nil
}
//会话状态 0 新建会话 2 等待连接 3 离线 4 双方建立连接 5 结束会话 6 对话 7 已读
func SaveMsg(msg *SendMsg) {
user := GetClientInfoById(msg.FromClientId)
toUser := &UserInfo{}
if msg.ToClientId != "" {
toUser = GetClientInfoById(msg.ToClientId)
}
mqData := &mq.SetMsgReq{}
if user.Promoter == "1" {
mqData.BusinessId = user.Source
mqData.CustomerId = user.CustomerId
mqData.GroupId = toUser.Source
mqData.UserId = toUser.CustomerId
}
if user.Promoter == "2" {
mqData.BusinessId = toUser.Source
mqData.CustomerId = toUser.CustomerId
mqData.GroupId = user.Source
mqData.UserId = user.CustomerId
}
mqData.BusinessId, _ = DataAesDecrypt(mqData.BusinessId)
mqData.ProcedureType = msg.ProcedureType
mqData.StartTime = msg.SendTime
if msg.ProcedureType == 1 {
mqData.Promoter = user.Promoter
}
if msg.ProcedureType == 2 {
mqData.GuideMsg = msg.Msg
mqData.GuideDate = msg.SendTime
}
if msg.ProcedureType == 4 {
mqData.StartReceiveDate = msg.SendTime
}
if msg.ProcedureType == 5 {
mqData.EndTime = msg.SendTime
mqData.Promoter = user.Promoter
if user.Promoter == "1" {
mqData.Finish = "3"
}
if user.Promoter == "2" {
mqData.Finish = "1"
}
}
if msg.ProcedureType == 6 {
mqData.PromoterType = user.Promoter
mqData.SendTime = msg.SendTime
mqData.MsgType = msg.MsgType
mqData.Content = msg.Msg
mqData.Sender = msg.FromClientId
mqData.Receiver = msg.ToClientId
}
//已读状态
if msg.ProcedureType == 7 {
if user.Promoter == "1" {
mqData.PromoterType = "2"
}
if user.Promoter == "2" {
mqData.PromoterType = "1"
}
mqData.SendTime = msg.SendTime
}
if mqData.ProcedureType != 0 {
PublishData(mqData)
}
return
}
//clientId转结构体
func GetClientInfoById(clientId string) *UserInfo {
data := strings.Split(clientId, "_")
userData := new(UserInfo)
userData.Source = data[0]
userData.Promoter = data[1]
userData.CustomerId = data[2]
return userData
}
func PublishData(mqData *mq.SetMsgReq) {
rabbitMQ.PublishSimple(SerializeJson(mqData))
return
}
// 五分钟内 用户未连接、则判定为用户离线、这时候把会话置为离线状态、如果中间顾客 连线了则return、 如果中间客服离线了、则return
//func (c *Client)SetOffline(user *UserInfo, customerId string) {
//
// fmt.Println("开始执行定时任务")
// count := 0
// for {
// // 客服离线则 goroutine结束
// if _, ok := wsSever.hub.clients[customerId]; !ok {
// //客户端建立连接后创建数据
// if user.Promoter == "3" {
// mqData := &mq.SetMsgReq{
// ProcedureType: 1,
// BusinessId: user.Source,
// CustomerId: user.CustomerId,
// Status: "0",
// StartTime: time.Now().Format("2006-01-02 15:04:05"),
// Promoter: user.Promoter,
// }
// PublishData(mqData)
// }
// return
// }
//
// if count > 300 {
//
// var conversation ConversationList
// global.GVA_DB.Where("id = ?", conversationId).First(&conversation)
// if conversation.State == 0 {
// global.GVA_DB.Exec(`update conversation_list set state = 1, finish = 3, end_time = ? where id = ?`, time.Now().Format("2006-01-02 15:04:05"), conversationId)
// }
// return
// }
//
// count++
//
// time.Sleep(1 * time.Second)
// fmt.Println("开始计时关闭会话: ", count)
// }
//
//}