hub.go 4.98 KB
package pool

import (
	"errors"
	"fmt"
	"pool/pool/util/queue"
	"time"
)

var (
	//最大连接池缓冲处理连接对像管道长度
	MaxClientChannelLen = 10240
	//最大全局广播缓冲处理管道长度
	MaxBroadcastQueueLen = 4096
	//最大频道广播缓冲处理管道长度
	MaxChanBroadcastQueueLen = 4096

	//最大接收消息缓冲处理管道长度
	MaxRecvChLen = 10240
	//最大发送消息缓冲处理管道长度
	MaxSendChLen = 10240
	//最大会话数量
	MaxConversationNum = 10240
)

// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
	// Registered clients.
	clients    map[string]map[string]*Client //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
	oldClients map[string]map[string]*Client //缓存断开的连接消息队列

	// Inbound messages from the clients.
	//可以用于广播所有连接对象
	broadcastQueue chan *SendMsg

	//广播指定频道的管道
	chanBroadcastQueue chan *SendMsg

	// Register requests from the clients.
	register chan *Client

	// Unregister requests from clients.
	unregister chan string

	//conversation []*Conversation
}

//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
type oldMsg struct {
	list       *queue.PriorityQueue
	Expiration time.Time //过期时间
}

func newHub() *hub {
	return &hub{
		//conversation:       make([]*Conversation, MaxConversationNum),
		register:           make(chan *Client, MaxClientChannelLen),
		unregister:         make(chan string, MaxClientChannelLen),
		clients:            make(map[string]map[string]*Client),
		oldClients:         make(map[string]map[string]*Client),
		broadcastQueue:     make(chan *SendMsg, MaxBroadcastQueueLen),
		chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen),
	}
}

func (h *hub) run() {
loop:
	for {
		select {
		case id, ok := <-h.unregister:
			if !ok {
				break loop
			}
			userInfo, _ := GetClientInfo(id)
			c, _ := h.clients[userInfo.ClientId][userInfo.User]
			if c != nil {
				delete(h.clients[userInfo.ClientId], userInfo.User)
			}
			fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients))
			if len(h.clients[userInfo.ClientId]) == 0 {
				delete(h.clients, userInfo.ClientId)
			}

		case client, ok := <-h.register:
			if !ok {
				break loop
			}
			if len(h.clients[client.Id]) > 0 {
				h.clients[client.Id][client.User] = client
			}else{
				h.clients[client.Id] = map[string]*Client{client.User: client}
			}
			fmt.Println("注册ws连接对象:", client.User, "连接总数:", len(h.clients))

		case broadcastMsg, ok := <-h.broadcastQueue:
			if !ok {
				break loop
			}
			for _, v := range h.clients {
				if v != nil {
					for k, vv := range v {
						client := vv
						broadcastMsg.ToClientId = k
						client.send(broadcastMsg)
					}
				}
			}

		case chanBroadcastMsg, ok := <-h.chanBroadcastQueue:
			if !ok {
				break loop
			}
			//广播指定频道的消息处理
			//h.clients.Iterator(func(id string, v interface{}) bool {
			for _, v := range h.clients {
				if v != nil {
					for k, vv := range v {
						client := vv
						for _, ch := range chanBroadcastMsg.Channel {
							if searchStrArray(client.channel, ch) {
								chanBroadcastMsg.ToClientId = k
								client.send(chanBroadcastMsg)
							}
						}
					}
				}
			}
			//	return true
			//})
		}
	}
}
func (h *hub) ticker() {
	//定时清理清理缓存的旧的连接对像
	//gtimer.AddSingleton(30*time.Second, func() {
	//	if len(h.oldClients) > 0 {
	//		for _, v := range h.oldClients {
	//			//h.oldClients.Iterator(func(k string, v interface{}) bool {
	//			if v != nil {
	//				client := v
	//				if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
	//					//3分钟后清理组存中的旧连接对像
	//					h.clearOldClient(client)
	//					///	h.clearOldClient <- client
	//				}
	//			}
	//			//	return true
	//			//})
	//		}
	//	}
	//})

}

func (h *hub) AddClient(client *Client) error {
	timeout := time.NewTimer(time.Second * 3)
	defer timeout.Stop()
	select {
	case h.register <- client:
		return nil
	case <-timeout.C:
		return errors.New("AddClient register消息管道blocked,写入消息超时")
	}
}

func (h *hub) clearOldClient(client *Client) {
	close(client.recvCh)
	close(client.sendCh)
	delete(h.oldClients[client.Id], client.User)
	if len(h.oldClients[client.Id]) == 0 {
		delete(h.oldClients, client.Id)
	}
	fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients))
}

func (h *hub) RemoveClient(client *Client) error {
	//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
	client.CloseTime = time.Now()
	//h.oldClients[client.Id] = client
	timeout := time.NewTimer(time.Second * 1)
	defer timeout.Stop()
	select {
	case h.unregister <- client.User:
		return nil
	case <-timeout.C:
		return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
	}
}