ws_server.go 4.35 KB
Newer Older
haoyanbin's avatar
haoyanbin committed
1 2 3 4 5 6 7
package main

import (
	"flag"
	"fmt"
	"net/http"
	"net/http/pprof"
haoyanbin's avatar
haoyanbin committed
8 9
	"pool/dao/mq"
	"pool/dao/redis"
haoyanbin's avatar
haoyanbin committed
10 11
	"pool/pool"
	"runtime"
haoyanbin's avatar
1  
haoyanbin committed
12
	"time"
haoyanbin's avatar
haoyanbin committed
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
)

func serveHome(w http.ResponseWriter, r *http.Request) {
	fmt.Println(r.URL)
	if r.URL.Path != "/" {
		http.Error(w, "Not found", http.StatusNotFound)
		return
	}
	if r.Method != "GET" {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}
	http.ServeFile(w, r, "home.html")
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())

	flag.Parse()
haoyanbin's avatar
haoyanbin committed
32 33 34

	pool.RabbitMQ = mq.NewRabbitMQSimple("im")
	fmt.Println("rabbitMq start success")
haoyanbin's avatar
haoyanbin committed
35 36 37 38

	pool.Redis = redis.Init()
	fmt.Println("redis start success")

haoyanbin's avatar
haoyanbin committed
39 40 41 42 43 44
	//初骀化连接池
	pool.InitWsPool(func(err interface{}) {
		//接收连接池中的运行时错误信息
		fmt.Println("wsPool.InitWsPool error-------------", err)
	})

haoyanbin's avatar
haoyanbin committed
45 46
	//自动结束会话
	ticker := time.NewTicker(time.Second * 10)
haoyanbin's avatar
haoyanbin committed
47 48
	go func() {
		for range ticker.C {
haoyanbin's avatar
haoyanbin committed
49
			pool.SetEnd()
haoyanbin's avatar
haoyanbin committed
50 51 52
		}
	}()

haoyanbin's avatar
haoyanbin committed
53 54 55 56 57 58 59 60 61 62
	mux := http.NewServeMux()
	mux.HandleFunc("/", serveHome)

	mux.HandleFunc("/debug/pprof/", pprof.Index)
	mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
	mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
	mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
	mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

	mux.HandleFunc("/ws", ws)
haoyanbin's avatar
1  
haoyanbin committed
63
	mux.HandleFunc("/ws-list", GetClientList)
haoyanbin's avatar
haoyanbin committed
64 65 66 67 68 69 70 71
	err := http.ListenAndServe(":11001", mux)
	if err != nil {
		fmt.Printf("ListenAndServe: %s", err.Error())
	}
}

func ws(w http.ResponseWriter, r *http.Request) {
	headData := r.Header.Get("Sec-Websocket-Protocol")
haoyanbin's avatar
1  
haoyanbin committed
72

haoyanbin's avatar
haoyanbin committed
73 74 75
	head := http.Header{}
	head.Add("Sec-Websocket-Protocol", headData)

haoyanbin's avatar
1  
haoyanbin committed
76 77 78
	userInfo, err := pool.GetClientInfoByToken(headData)

	if err != nil {
haoyanbin's avatar
haoyanbin committed
79
		fmt.Println("用户信息报错:", err)
haoyanbin's avatar
1  
haoyanbin committed
80 81 82
		return
	}

haoyanbin's avatar
haoyanbin committed
83 84
	//实例化连接对象
	client := pool.NewClient(&pool.Config{
haoyanbin's avatar
1  
haoyanbin committed
85 86 87
		Id:        userInfo.ClientId,  //连接标识
		Type:      "ws",               //连接类型
		Channel:   userInfo.ToChannel, //指定频道
haoyanbin's avatar
haoyanbin committed
88 89
		Goroutine: 100,
	})
haoyanbin's avatar
1  
haoyanbin committed
90

haoyanbin's avatar
haoyanbin committed
91 92 93 94
	fmt.Println(client.Id, "实例化连接对象完成")

	//连接成功回调
	client.OnOpen(func() {
haoyanbin's avatar
1  
haoyanbin committed
95
		fmt.Println("连接开启回调:", client.Id)
haoyanbin's avatar
haoyanbin committed
96 97 98 99
	})

	//接收消息
	client.OnMessage(func(msg *pool.SendMsg) {
haoyanbin's avatar
1  
haoyanbin committed
100

haoyanbin's avatar
haoyanbin committed
101 102 103 104
		if msg.Status == 3 {
			fmt.Println("OnMessage:收到出错消息=》", client.Id, msg.Desc)
			return
		}
haoyanbin's avatar
1  
haoyanbin committed
105
		//fmt.Println(msg.Msg)
haoyanbin's avatar
haoyanbin committed
106 107 108 109
		if msg.ToClientId != "" {
			//发送消息给指定的ToClientID连接
			err := pool.Send(msg)
			if err != nil {
haoyanbin's avatar
1  
haoyanbin committed
110
				fmt.Println("pool.Send(msg):", err.Error())
haoyanbin's avatar
haoyanbin committed
111 112 113 114 115 116 117
			}
			//发送消息给当前连接对象
			//err = client.Send(msg)
			//if err != nil {
			//	fmt.Println("client.Send(msg):", err.Error())
			//}
		}
haoyanbin's avatar
haoyanbin committed
118 119 120 121 122 123 124 125 126 127 128 129
		//if len(msg.Channel)>0{
		//	//按频道广播,可指定多个频道[]string
		//	err:=pool.Broadcast(msg) //或者 wsPool.Broadcast(msg)
		//	if err!=nil {
		//		fmt.Println("pool.Broadcast(msg)", err.Error())
		//	}
		//}
		////或都全局广播,所有连接都进行发送
		//err:=pool.BroadcastAll(msg)
		//if err!=nil {
		//	fmt.Println("pool.BroadcastAll(msg)", err.Error())
		//}
haoyanbin's avatar
haoyanbin committed
130 131

	})
haoyanbin's avatar
1  
haoyanbin committed
132

haoyanbin's avatar
haoyanbin committed
133 134
	//连接断开回调
	client.OnClose(func() {
haoyanbin's avatar
1  
haoyanbin committed
135 136
		user := pool.GetClientInfoById(client.Id)

haoyanbin's avatar
haoyanbin committed
137
		closeMsg := &pool.SetMsgReq{}
haoyanbin's avatar
1  
haoyanbin committed
138 139 140
		closeMsg.ProcedureType = 5
		closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05")

haoyanbin's avatar
1  
haoyanbin committed
141 142
		if user.CustomerType == "1" {
			closeMsg.Promoter = client.Id
haoyanbin's avatar
haoyanbin committed
143

haoyanbin's avatar
haoyanbin committed
144 145
			pool.PublishData(closeMsg)
			fmt.Println("用户关闭连接", client.Id)
haoyanbin's avatar
1  
haoyanbin committed
146 147
		}

haoyanbin's avatar
1  
haoyanbin committed
148 149 150
		if user.CustomerType == "2" {
			closeMsg.Participant = client.Id

haoyanbin's avatar
haoyanbin committed
151 152
			pool.PublishData(closeMsg)
			fmt.Println("专家关闭连接", client.Id)
haoyanbin's avatar
1  
haoyanbin committed
153
		}
haoyanbin's avatar
1  
haoyanbin committed
154
		pool.DelClient(client.Id)
haoyanbin's avatar
1  
haoyanbin committed
155

haoyanbin's avatar
haoyanbin committed
156 157 158 159 160 161 162 163 164 165 166 167 168
		fmt.Printf("连接己经关闭%s", client.Id)
	})
	client.OnError(func(err error) {
		fmt.Printf("连接%s错误信息:%s", client.Id, err.Error())
	})

	//开启连接
	client.OpenClient(w, r, head)
	fmt.Println(client.Id, "开启连接")

	r.Close = true
	return
}
haoyanbin's avatar
1  
haoyanbin committed
169 170 171

func GetClientList(w http.ResponseWriter, r *http.Request) {
	data := r.URL.Query()
haoyanbin's avatar
1  
haoyanbin committed
172
	list := pool.GetList(data["source"][0], data["sourceId"][0], data["customerType"][0])
haoyanbin's avatar
haoyanbin committed
173 174 175 176 177 178

	reply := make([]string, 0)
	for k := range list {
		reply = append(reply, k)
	}

haoyanbin's avatar
1  
haoyanbin committed
179
	resp(w, reply)
haoyanbin's avatar
haoyanbin committed
180 181
}

haoyanbin's avatar
1  
haoyanbin committed
182
func resp(w http.ResponseWriter, data interface{}) {
haoyanbin's avatar
haoyanbin committed
183 184 185
	w.Header().Set("content-type", "text/json")
	w.WriteHeader(200)
	w.Write(pool.SerializeJson(data))
haoyanbin's avatar
1  
haoyanbin committed
186 187 188 189 190 191
}

type GetClientListReq struct {
	Source   string `json:"source" form:"source"`
	Promoter string `json:"promoter" form:"promoter"`
}