hub.go 4.31 KB
Newer Older
haoyanbin's avatar
haoyanbin committed
1 2 3 4 5 6 7 8 9 10 11
package pool

import (
	"errors"
	"fmt"
	"pool/pool/util/queue"
	"time"
)

var (
	//最大连接池缓冲处理连接对像管道长度
haoyanbin's avatar
1  
haoyanbin committed
12
	MaxClientChannelLen = 10240
haoyanbin's avatar
haoyanbin committed
13
	//最大全局广播缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
14
	MaxBroadcastQueueLen = 4096
haoyanbin's avatar
haoyanbin committed
15
	//最大频道广播缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
16
	MaxChanBroadcastQueueLen = 4096
haoyanbin's avatar
haoyanbin committed
17 18

	//最大接收消息缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
19
	MaxRecvChLen = 10240
haoyanbin's avatar
haoyanbin committed
20
	//最大发送消息缓冲处理管道长度
haoyanbin's avatar
1  
haoyanbin committed
21
	MaxSendChLen = 10240
haoyanbin's avatar
haoyanbin committed
22 23 24 25 26 27
)

// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
	// Registered clients.
haoyanbin's avatar
1  
haoyanbin committed
28
	clients    map[string]*Client // //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
haoyanbin's avatar
haoyanbin committed
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
	oldClients map[string]*Client //缓存断开的连接消息队列

	// Inbound messages from the clients.
	//可以用于广播所有连接对象
	broadcastQueue chan *SendMsg

	//广播指定频道的管道
	chanBroadcastQueue chan *SendMsg

	// Register requests from the clients.
	register chan *Client

	// Unregister requests from clients.
	unregister chan string
}

//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
type oldMsg struct {
	list       *queue.PriorityQueue
	Expiration time.Time //过期时间
}

func newHub() *hub {
	return &hub{
haoyanbin's avatar
1  
haoyanbin committed
53 54
		register:           make(chan *Client, MaxClientChannelLen),
		unregister:         make(chan string, MaxClientChannelLen),
haoyanbin's avatar
haoyanbin committed
55 56
		clients:            make(map[string]*Client),
		oldClients:         make(map[string]*Client),
haoyanbin's avatar
1  
haoyanbin committed
57 58
		broadcastQueue:     make(chan *SendMsg, MaxBroadcastQueueLen),
		chanBroadcastQueue: make(chan *SendMsg, MaxChanBroadcastQueueLen),
haoyanbin's avatar
haoyanbin committed
59 60 61 62 63 64 65 66 67 68 69
	}
}

func (h *hub) run() {
loop:
	for {
		select {
		case id, ok := <-h.unregister:
			if !ok {
				break loop
			}
haoyanbin's avatar
1  
haoyanbin committed
70
			c, _ := h.clients[id]
haoyanbin's avatar
haoyanbin committed
71
			if c != nil {
haoyanbin's avatar
1  
haoyanbin committed
72
				delete(h.clients, id)
haoyanbin's avatar
haoyanbin committed
73 74 75 76 77 78 79 80
			}
			fmt.Println("取消注册ws连接对象:", id, "连接总数:", len(h.clients))

		case client, ok := <-h.register:
			if !ok {
				break loop
			}
			h.clients[client.Id] = client
haoyanbin's avatar
1  
haoyanbin committed
81
			fmt.Println("注册ws连接对象:", client.Id, "连接总数:", len(h.clients))
haoyanbin's avatar
haoyanbin committed
82 83 84 85 86

		case broadcastMsg, ok := <-h.broadcastQueue:
			if !ok {
				break loop
			}
haoyanbin's avatar
1  
haoyanbin committed
87
			for k, v := range h.clients {
haoyanbin's avatar
haoyanbin committed
88 89 90 91 92 93 94 95 96 97 98 99 100
				if v != nil {
					client := v
					broadcastMsg.ToClientId = k
					client.send(broadcastMsg)
				}
			}

		case chanBroadcastMsg, ok := <-h.chanBroadcastQueue:
			if !ok {
				break loop
			}
			//广播指定频道的消息处理
			//h.clients.Iterator(func(id string, v interface{}) bool {
haoyanbin's avatar
1  
haoyanbin committed
101 102 103 104 105 106 107
			for k, v := range h.clients {
				if v != nil {
					client := v
					for _, ch := range chanBroadcastMsg.Channel {
						if searchStrArray(client.channel, ch) {
							chanBroadcastMsg.ToClientId = k
							client.send(chanBroadcastMsg)
haoyanbin's avatar
haoyanbin committed
108 109 110
						}
					}
				}
haoyanbin's avatar
1  
haoyanbin committed
111
			}
haoyanbin's avatar
haoyanbin committed
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
			//	return true
			//})
		}

	}
}
func (h *hub) ticker() {
	//定时清理清理缓存的旧的连接对像
	//gtimer.AddSingleton(30*time.Second, func() {
	//	if len(h.oldClients) > 0 {
	//		for _, v := range h.oldClients {
	//			//h.oldClients.Iterator(func(k string, v interface{}) bool {
	//			if v != nil {
	//				client := v
	//				if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
	//					//3分钟后清理组存中的旧连接对像
	//					h.clearOldClient(client)
	//					///	h.clearOldClient <- client
	//				}
	//			}
	//			//	return true
	//			//})
	//		}
	//	}
	//})

}

func (h *hub) AddClient(client *Client) error {
	timeout := time.NewTimer(time.Second * 3)
	defer timeout.Stop()
	select {
	case h.register <- client:
		return nil
	case <-timeout.C:
		return errors.New("AddClient register消息管道blocked,写入消息超时")
	}
}

func (h *hub) clearOldClient(client *Client) {
	close(client.recvCh)
	close(client.sendCh)
haoyanbin's avatar
1  
haoyanbin committed
154
	delete(h.oldClients, client.Id)
haoyanbin's avatar
haoyanbin committed
155 156 157 158 159
	fmt.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", len(h.oldClients))
}

func (h *hub) RemoveClient(client *Client) error {
	//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
haoyanbin's avatar
1  
haoyanbin committed
160 161
	client.CloseTime = time.Now()
	h.oldClients[client.Id] = client
haoyanbin's avatar
haoyanbin committed
162 163 164 165 166 167 168 169 170
	timeout := time.NewTimer(time.Second * 1)
	defer timeout.Stop()
	select {
	case h.unregister <- client.Id:
		return nil
	case <-timeout.C:
		return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
	}
}