package pool import ( "errors" "fmt" "net/http" "pool/pool/util/grpool" "strconv" "strings" "sync" "time" ) type SetMsgReq struct { ProcedureType int `json:"procedureType"` ConversationId int `json:"conversationId" db:"conversation_id"` Promoter string `json:"promoter" db:"promoter"` Participant string `json:"participant" db:"participant"` Status string `json:"status" db:"status"` StartTime string `json:"startTime" db:"start_time"` EndTime string `json:"endTime" db:"end_time"` Remark string `json:"remark" db:"remark"` Finish string `json:"finish" db:"finish"` GuideMsg string `json:"guideMsg" db:"guide_msg"` GuideDate string `json:"guideDate" db:"guide_date"` StartReceiveDate string `json:"startReceiveDate" db:"start_receive_date"` SendTime string `json:"sendTime" db:"send_time"` MsgType int `json:"msgType" db:"msg_type"` Content string `json:"content" db:"content"` Sender string `json:"sender" db:"sender"` Receiver string `json:"receiver" db:"receiver"` ReadStatus string `json:"readStatus" db:"read_status"` } // 第一步,实例化连接对像 func NewClient(conf *Config) *Client { if conf.Goroutine < 5 { conf.Goroutine = 200 } var client *Client oldclient := wsSever.hub.oldClients[conf.Id] if oldclient != nil { delete(wsSever.hub.oldClients, conf.Id) c := oldclient client = c } else { client = &Client{ Id: conf.Id, types: conf.Type, hub: wsSever.hub, sendCh: make(chan *SendMsg, MaxRecvChLen), recvCh: make(chan *SendMsg, MaxSendChLen), mux: new(sync.Mutex), } } client.recvPing = make(chan int) client.sendPing = make(chan int) client.channel = conf.Channel client.grpool = grpool.NewPool(conf.Goroutine) client.IsClose = make(chan bool) client.OnError(nil) client.OnOpen(nil) client.OnMessage(nil) client.OnClose(nil) client.OnPing(nil) client.OnPong(nil) wsSever.hub.AddClient(client) AppendClient(client.Id, string(SerializeJson(conf))) return client } // 开启连接 // serveWs handles websocket requests from the peer. func (c *Client) OpenClient(w http.ResponseWriter, r *http.Request, head http.Header) { defer dump() conn, err := upgrader.Upgrade(w, r, head) if err != nil { if c.onError != nil { c.onError(err) } return } r.Close = true c.conn = conn c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.pingPeriodTicker = time.NewTimer(pingPeriod) c.closeTicker = time.NewTimer(closeWait) c.conn.SetPongHandler(func(str string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)) fmt.Println("收到pong---", c.Id, str) c.pingPeriodTicker.Reset(pingPeriod) c.onPong() return nil }) c.conn.SetPingHandler(func(str string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.pingPeriodTicker.Reset(pingPeriod) fmt.Println("收到ping---", c.Id, str) c.recvPing <- 1 //if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil { // c.onError(errors.New("回复客户端PongMessage出现异常:"+err.Error())) //} c.onPing() return nil }) c.conn.SetCloseHandler(func(code int, str string) error { //收到客户端连接关闭时的回调 fmt.Println("连接ID:"+c.Id, "SetCloseHandler接收到连接关闭状态:", code, "关闭信息:", str) return nil }) // Allow collection of memory referenced by the caller by doing all work in // new goroutines. //连接开启后瑞添加连接池中 c.openTime = time.Now() c.grpool.Add(c.writePump) c.grpool.Add(c.readPump) //开始接收消息 c.grpool.Add(c.recvMessage) c.grpool.Add(c.Tickers) c.grpool.Add(c.closeTickers) c.onOpen() //客户端建立连接 //user := GetClientInfoById(c.Id) //if user.Promoter == "1" { // mqData := &mq.SetMsgReq{ // ProcedureType: 1, // BusinessId: user.Source, // CustomerId: user.CustomerId, // Status: "0", // StartTime: time.Now().Format("2006-01-02 15:04:05"), // Promoter: user.Promoter, // } // PublishData(mqData) //} } //获取连接对像运行过程中的信息 func (c *Client) GetRuntimeInfo() *RuntimeInfo { return &RuntimeInfo{ Id: c.Id, Type: c.types, Channel: c.channel, OpenTime: c.openTime, LastReceiveTime: c.lastSendTime, LastSendTime: c.lastSendTime, Ip: c.conn.RemoteAddr().String(), } } //回调添加方法 //监听连接对象的连接open成功的事件 func (c *Client) OnOpen(h func()) { if h == nil { c.onOpen = func() { } return } c.onOpen = h } func (c *Client) OnPing(h func()) { if h == nil { c.onPing = func() { } return } c.onPing = h } func (c *Client) OnPong(h func()) { if h == nil { c.onPong = func() { } return } c.onPong = h } //监听连接对象的连接open成功的事件 func (c *Client) OnMessage(h func(msg *SendMsg)) { if h == nil { c.onMessage = func(msg *SendMsg) { } return } c.onMessage = h } //监听连接对象的连接open成功的事件 func (c *Client) OnClose(h func()) { if h == nil { c.onClose = func() { } return } c.onClose = h } //监听连接对象的错误信息 func (c *Client) OnError(h func(err error)) { if h == nil { c.onError = func(err error) { } return } c.onError = h } // 单个连接发送消息 func (c *Client) Send(msg *SendMsg) error { select { case <-c.IsClose: c.close() return errors.New("连接ID:" + c.Id + "ws连接己经断开,无法发送消息") default: msg.ToClientId = c.Id c.send(msg) } return nil } //服务主动关闭连接 func (c *Client) Close() { c.close() } // 包级的公开方法 // 所有包级的发送如果连接断开,消息会丢失 // 发送消息 只从连接池中按指定的toClientId的连接对象发送出消息 // 在此方法中sendMsg.Channel指定的值不会处理 func Send(msg *SendMsg) error { //log.Info("发送指令:",msg.Cmd,msg.ToClientId) fmt.Println(msg) if msg.ToClientId == "" { return errors.New("发送消息的消息体中未指定ToClient目标!") } c := wsSever.hub.clients[msg.ToClientId] if c != nil { client := c if client.Id == msg.ToClientId { msg.ToClientId = client.Id err := client.Send(msg) if err != nil { return errors.New("发送消息出错:" + err.Error() + ",连接对象id=" + client.Id + "。") } } } else { fmt.Println(1) //fmt.Println(wsSever.hub.clients) } return nil } // 通过连接池广播消息,每次广播只能指定一个类型下的一个频道 // 广播一定要设置toClientId,这个对象可以确定广播超时消息回复对像 // 并且只针对频道内的连接进行处理 func Broadcast(msg *SendMsg) error { if len(msg.Channel) == 0 { return errors.New("广播消息的消息体中未指定Channel频道!") } timeout := time.NewTimer(time.Millisecond * 800) defer timeout.Stop() select { case wsSever.hub.chanBroadcastQueue <- msg: return nil case <-timeout.C: return errors.New("hub.chanBroadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.chanBroadcastQueue))) } } //全局广播 //广播一定要设置toClientId,这个对象可以确定广播超时消息回复对像 //通过此方法进行广播的消息体,会对所有的类型和频道都进行广播 func BroadcastAll(msg *SendMsg) error { timeout := time.NewTimer(time.Millisecond * 800) defer timeout.Stop() select { case wsSever.hub.broadcastQueue <- msg: return nil case <-timeout.C: return errors.New("hub.broadcastQueue消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.broadcastQueue))) } } // 根据来源和类型获取客户端列表 func GetList(source, sourceId, customerType string) map[string]*Client { clientData := make(map[string]*Client, 0) for k, v := range wsSever.hub.clients { userInfo := strings.Split(k, "_") if (userInfo[0] == source && userInfo[1] == sourceId) || (source == "") { if customerType != "" { if userInfo[1] == customerType { clientData[k] = v } } else { clientData[k] = v } } } return clientData } type UserInfo struct { Source string `json:"source"` // SourceId string `json:"sourceId"` // CustomerType string `json:"customerType"` CustomerId string `json:"customerId"` ClientId string `json:"clientId"` ToChannel []string `json:"toChannel"` } //根据token获取用户来源信息 func GetClientInfoByToken(token string) (*UserInfo, error) { tokenData := strings.Split(token, "_") fmt.Println(tokenData) if len(tokenData) < 4 { return nil, errors.New("用户数据有误") } //if tokenData[1] == "1" { // source, err := DataAesDecrypt(tokenData[0]) // if err != nil { // return nil, errors.New("用户数据有误") // } // // if len(source) <= 0 { // return nil, errors.New("用户数据有误") // } //} userData := new(UserInfo) userData.Source = tokenData[0] userData.SourceId = tokenData[1] userData.CustomerType = tokenData[2] userData.CustomerId = tokenData[3] userData.ClientId = userData.Source + "_" + userData.SourceId + "_" + userData.CustomerType + "_" + userData.CustomerId return userData, nil } // 会话状态 1 开始导诊 2 等待连接 3 双方建立连接 4 结束会话 5 离线 6 对话 7 已读 8 超时自动结束 func SaveMsg(msg *SendMsg) { if msg.ProcedureType == 9 || msg.ProcedureType == 10 { return } msg.SendTime = time.Now().Format("2006-01-02 15:04:05") user := GetClientInfoById(msg.FromClientId) mqData := &SetMsgReq{} mqData.ProcedureType = msg.ProcedureType mqData.ConversationId = msg.ConversationId if msg.ProcedureType == 2 { mqData.GuideMsg = msg.Msg mqData.GuideDate = msg.SendTime } //建立连接 if msg.ProcedureType == 3 { mqData.StartReceiveDate = msg.SendTime mqData.Participant = msg.FromClientId //连接开启信息存入 //conversation := &Conversation{} //conversation.ConversationId = msg.ConversationId //conversation.Promoter = msg.FromClientId //conversation.Participant = msg.ToClientId //conversation.Status = 3 //conversation.StartReceiveDate = mqData.StartReceiveDate //AppendConversation(conversation) } //结束 if msg.ProcedureType == 4 { mqData.EndTime = msg.SendTime if user.CustomerType == "1" { mqData.Finish = "3" } if user.CustomerType == "2" { mqData.Finish = "1" } } //离线 if msg.ProcedureType == 5 { if user.CustomerType == "1" { mqData.Finish = "1" } if user.CustomerType == "2" { mqData.Finish = "2" } } //对话 if msg.ProcedureType == 6 { mqData.SendTime = msg.SendTime mqData.MsgType = msg.MsgType mqData.Content = msg.Msg mqData.Sender = msg.FromClientId mqData.Receiver = msg.ToClientId mqData.ReadStatus = "0" AppendConversationRecord(mqData) } if msg.ProcedureType == 8 { mqData.Finish = "5" mqData.EndTime = msg.SendTime //DelConversation(msg.ConversationId) } if mqData.ProcedureType != 0 { PublishData(mqData) } return } //clientId转结构体 func GetClientInfoById(clientId string) *UserInfo { data := strings.Split(clientId, "_") userData := new(UserInfo) if len(data) < 4 { fmt.Println("用户数据有误") return userData } userData.Source = data[0] userData.SourceId = data[1] userData.CustomerType = data[2] userData.CustomerId = data[3] return userData } func PublishData(mqData *SetMsgReq) { go RabbitMQ.PublishSimple(SerializeJson(mqData)) return } func GetAllList(key string) map[string]string { jsonData := Redis.HGetAll(key) jsonDataAll := jsonData.Val() return jsonDataAll } func AppendConversationRecord(msg *SetMsgReq) { Redis.HSetNX("msg:"+strconv.Itoa(msg.ConversationId), msg.Sender+":"+msg.Receiver+":"+msg.SendTime, string(SerializeJson(msg))) return } func GetConversationRecord(conversationId int) { //Redis.HGet() } func DelConversation(conversationId int) { Redis.HDel("msg:" + strconv.Itoa(conversationId)) return } func AppendClient(clientId, data string) { Redis.HSet(cliKey, clientId, data) return } func DelClient(clientId string) { Redis.HDel(cliKey, clientId) return } func SetEnd() { conversationList := GetAllList(convKey) for _, vClients := range conversationList { conversationData := &Conversation{} UnserislizeJson([]byte(vClients), conversationData) if conversationData.StartReceiveDate < time.Now().Add(-endDate).Format("2006-01-02 15:04:05") { mqData := &SendMsg{ ProcedureType: 8, ConversationId: conversationData.ConversationId, ToClientId: conversationData.Promoter, FromClientId: conversationData.Participant, SendTime: time.Now().Format("2006-01-02 15:04:05"), } //发送结束会话给客户端 _, isSet := wsSever.hub.clients[conversationData.Promoter] if isSet == true { wsSever.hub.clients[conversationData.Promoter].readMessage(mqData) } toMqData := &SendMsg{ ProcedureType: 8, ConversationId: conversationData.ConversationId, ToClientId: conversationData.Participant, FromClientId: conversationData.Promoter, SendTime: time.Now().Format("2006-01-02 15:04:05"), } _, isSet = wsSever.hub.clients[conversationData.Participant] if isSet == true { wsSever.hub.clients[conversationData.Participant].readMessage(toMqData) } SaveMsg(toMqData) } } return } //func AppendToSendData(clientId, toClientId, sendTime string) { // sendData := ToSendData{toSendTime: sendTime, toSendId: clientId} // toSendDataLock.Lock() // wsSever.hub.clients[toClientId].ToSendData = append(wsSever.hub.clients[toClientId].ToSendData, sendData) // toSendDataLock.Unlock() // return //} // //func DelToSendData(clientId, toClientId string) { // for kclientId, vclientId := range wsSever.hub.clients[clientId].ToSendData { // if vclientId.toSendId == toClientId { // toSendDataLock.Lock() // wsSever.hub.clients[clientId].ToSendData = append(wsSever.hub.clients[clientId].ToSendData[:kclientId], wsSever.hub.clients[clientId].ToSendData[kclientId+1:]...) // toSendDataLock.Unlock() // } // } // return //} // 五分钟内 用户未连接、则判定为用户离线、这时候把会话置为离线状态、如果中间顾客 连线了则return、 如果中间客服离线了、则return //func (c *Client)SetOffline(user *UserInfo, customerId string) { // // fmt.Println("开始执行定时任务") // count := 0 // for { // // 客服离线则 goroutine结束 // if _, ok := wsSever.hub.clients[customerId]; !ok { // //客户端建立连接后创建数据 // if user.Promoter == "3" { // mqData := &mq.SetMsgReq{ // ProcedureType: 1, // BusinessId: user.Source, // CustomerId: user.CustomerId, // Status: "0", // StartTime: time.Now().Format("2006-01-02 15:04:05"), // Promoter: user.Promoter, // } // PublishData(mqData) // } // return // } // // if count > 300 { // // var conversation ConversationList // global.GVA_DB.Where("id = ?", conversationId).First(&conversation) // if conversation.State == 0 { // global.GVA_DB.Exec(`update conversation_list set state = 1, finish = 3, end_time = ? where id = ?`, time.Now().Format("2006-01-02 15:04:05"), conversationId) // } // return // } // // count++ // // time.Sleep(1 * time.Second) // fmt.Println("开始计时关闭会话: ", count) // } // //}