hub.go 4.45 KB
Newer Older
haoyanbin's avatar
haoyanbin committed
1 2 3 4 5 6 7 8 9 10 11
package pool

import (
	"errors"
	"fmt"
	"pool/pool/util/queue"
	"time"
)

var (
	//最大连接池缓冲处理连接对像管道长度
haoyanbin's avatar
1  
haoyanbin committed
12
	MaxClientChannelLen = 10240
haoyanbin's avatar
haoyanbin committed
13
	//最大全局广播缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
14
	MaxBroadcastQueueLen = 4096
haoyanbin's avatar
haoyanbin committed
15
	//最大频道广播缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
16
	MaxChanBroadcastQueueLen = 4096
haoyanbin's avatar
haoyanbin committed
17 18

	//最大接收消息缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
19
	MaxRecvChLen = 10240
haoyanbin's avatar
haoyanbin committed
20
	//最大发送消息缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
21
	MaxSendChLen = 10240
haoyanbin's avatar
haoyanbin committed
22 23
	//最大会话数量
	MaxConversationNum = 10240
haoyanbin's avatar
haoyanbin committed
24 25 26 27 28 29
)

// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
	// Registered clients.
haoyanbin's avatar
1  
haoyanbin committed
30
	clients    map[string]*Client //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
haoyanbin's avatar
haoyanbin committed
31 32 33 34 35 36 37 38 39 40 41 42 43 44
	oldClients map[string]*Client //缓存断开的连接消息队列

	// Inbound messages from the clients.
	//可以用于广播所有连接对象
	broadcastQueue chan *SendMsg

	//广播指定频道的管道
	chanBroadcastQueue chan *SendMsg

	// Register requests from the clients.
	register chan *Client

	// Unregister requests from clients.
	unregister chan string
haoyanbin's avatar
haoyanbin committed
45 46

	//conversation []*Conversation
haoyanbin's avatar
haoyanbin committed
47 48 49 50 51 52 53 54 55 56
}

//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
type oldMsg struct {
	list       *queue.PriorityQueue
	Expiration time.Time //过期时间
}

func newHub() *hub {
	return &hub{
haoyanbin's avatar
haoyanbin committed
57
		//conversation:       make([]*Conversation, MaxConversationNum),
haoyanbin's avatar
1  
haoyanbin committed
58 59
		register:           make(chan *Client, MaxClientChannelLen),
		unregister:         make(chan string, MaxClientChannelLen),
haoyanbin's avatar
haoyanbin committed
60 61
		clients:            make(map[string]*Client),
		oldClients:         make(map[string]*Client),
haoyanbin's avatar
1  
haoyanbin committed
62 63
		broadcastQueue:     make(chan *SendMsg, MaxBroadcastQueueLen),
		chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen),
haoyanbin's avatar
haoyanbin committed
64 65 66 67 68 69 70 71 72 73 74
	}
}

func (h *hub) run() {
loop:
	for {
		select {
		case id, ok := <-h.unregister:
			if !ok {
				break loop
			}
haoyanbin's avatar
1  
haoyanbin committed
75
			c, _ := h.clients[id]
haoyanbin's avatar
haoyanbin committed
76
			if c != nil {
haoyanbin's avatar
1  
haoyanbin committed
77
				delete(h.clients, id)
haoyanbin's avatar
haoyanbin committed
78 79 80 81 82 83 84 85
			}
			fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients))

		case client, ok := <-h.register:
			if !ok {
				break loop
			}
			h.clients[client.Id] = client
haoyanbin's avatar
1  
haoyanbin committed
86
			fmt.Println("注册ws连接对象:", client.Id, "连接总数:", len(h.clients))
haoyanbin's avatar
haoyanbin committed
87 88 89 90 91

		case broadcastMsg, ok := <-h.broadcastQueue:
			if !ok {
				break loop
			}
haoyanbin's avatar
1  
haoyanbin committed
92
			for k, v := range h.clients {
haoyanbin's avatar
haoyanbin committed
93 94 95 96 97 98 99 100 101 102 103 104 105
				if v != nil {
					client := v
					broadcastMsg.ToClientId = k
					client.send(broadcastMsg)
				}
			}

		case chanBroadcastMsg, ok := <-h.chanBroadcastQueue:
			if !ok {
				break loop
			}
			//广播指定频道的消息处理
			//h.clients.Iterator(func(id string, v interface{}) bool {
haoyanbin's avatar
1  
haoyanbin committed
106 107 108 109 110 111 112
			for k, v := range h.clients {
				if v != nil {
					client := v
					for _, ch := range chanBroadcastMsg.Channel {
						if searchStrArray(client.channel, ch) {
							chanBroadcastMsg.ToClientId = k
							client.send(chanBroadcastMsg)
haoyanbin's avatar
haoyanbin committed
113 114 115
						}
					}
				}
haoyanbin's avatar
1  
haoyanbin committed
116
			}
haoyanbin's avatar
haoyanbin committed
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
			//	return true
			//})
		}

	}
}
func (h *hub) ticker() {
	//定时清理清理缓存的旧的连接对像
	//gtimer.AddSingleton(30*time.Second, func() {
	//	if len(h.oldClients) > 0 {
	//		for _, v := range h.oldClients {
	//			//h.oldClients.Iterator(func(k string, v interface{}) bool {
	//			if v != nil {
	//				client := v
	//				if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
	//					//3分钟后清理组存中的旧连接对像
	//					h.clearOldClient(client)
	//					///	h.clearOldClient <- client
	//				}
	//			}
	//			//	return true
	//			//})
	//		}
	//	}
	//})

}

func (h *hub) AddClient(client *Client) error {
	timeout := time.NewTimer(time.Second * 3)
	defer timeout.Stop()
	select {
	case h.register <- client:
		return nil
	case <-timeout.C:
		return errors.New("AddClient register消息管道blocked,写入消息超时")
	}
}

func (h *hub) clearOldClient(client *Client) {
	close(client.recvCh)
	close(client.sendCh)
haoyanbin's avatar
1  
haoyanbin committed
159
	delete(h.oldClients, client.Id)
haoyanbin's avatar
haoyanbin committed
160 161 162 163 164
	fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients))
}

func (h *hub) RemoveClient(client *Client) error {
	//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
haoyanbin's avatar
1  
haoyanbin committed
165
	client.CloseTime = time.Now()
haoyanbin's avatar
haoyanbin committed
166
	//h.oldClients[client.Id] = client
haoyanbin's avatar
haoyanbin committed
167 168 169 170 171 172 173 174 175
	timeout := time.NewTimer(time.Second * 1)
	defer timeout.Stop()
	select {
	case h.unregister <- client.Id:
		return nil
	case <-timeout.C:
		return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
	}
}