package main import ( "flag" "fmt" "net/http" "net/http/pprof" "pool/dao/mq" "pool/dao/redis" "pool/pool" "runtime" "time" ) func serveHome(w http.ResponseWriter, r *http.Request) { fmt.Println(r.URL) if r.URL.Path != "/" { http.Error(w, "Not found", http.StatusNotFound) return } if r.Method != "GET" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } http.ServeFile(w, r, "home.html") } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) flag.Parse() pool.RabbitMQ = mq.NewRabbitMQSimple("im") fmt.Println("rabbitMq start success") pool.Redis = redis.Init() fmt.Println("redis start success") //初骀化连接池 pool.InitWsPool(func(err interface{}) { //接收连接池中的运行时错误信息 fmt.Println("wsPool.InitWsPool error-------------", err) }) //自动结束会话 ticker := time.NewTicker(time.Second * 10) go func() { for range ticker.C { pool.SetEnd() } }() mux := http.NewServeMux() mux.HandleFunc("/", serveHome) mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) mux.HandleFunc("/ws", ws) mux.HandleFunc("/ws-list", GetClientList) err := http.ListenAndServe(":11001", mux) if err != nil { fmt.Printf("ListenAndServe: %s", err.Error()) } } func ws(w http.ResponseWriter, r *http.Request) { headData := r.Header.Get("Sec-Websocket-Protocol") head := http.Header{} head.Add("Sec-Websocket-Protocol", headData) userInfo, err := pool.GetClientInfoByToken(headData) if err != nil { fmt.Println("用户信息报错:", err) return } //实例化连接对象 client := pool.NewClient(&pool.Config{ Id: userInfo.ClientId, //连接标识 Type: "ws", //连接类型 Channel: userInfo.ToChannel, //指定频道 Goroutine: 100, }) fmt.Println(client.Id, "实例化连接对象完成") //连接成功回调 client.OnOpen(func() { fmt.Println("连接开启回调:", client.Id) }) //接收消息 client.OnMessage(func(msg *pool.SendMsg) { if msg.Status == 3 { fmt.Println("OnMessage:收到出错消息=》", client.Id, msg.Desc) return } //fmt.Println(msg.Msg) if msg.ToClientId != "" { //发送消息给指定的ToClientID连接 err := pool.Send(msg) if err != nil { fmt.Println("pool.Send(msg):", err.Error()) } //发送消息给当前连接对象 //err = client.Send(msg) //if err != nil { // fmt.Println("client.Send(msg):", err.Error()) //} } //if len(msg.Channel)>0{ // //按频道广播,可指定多个频道[]string // err:=pool.Broadcast(msg) //或者 wsPool.Broadcast(msg) // if err!=nil { // fmt.Println("pool.Broadcast(msg)", err.Error()) // } //} ////或都全局广播,所有连接都进行发送 //err:=pool.BroadcastAll(msg) //if err!=nil { // fmt.Println("pool.BroadcastAll(msg)", err.Error()) //} }) //连接断开回调 client.OnClose(func() { user := pool.GetClientInfoById(client.Id) closeMsg := &pool.SetMsgReq{} closeMsg.ProcedureType = 5 closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05") if user.CustomerType == "1" { closeMsg.Promoter = client.Id pool.PublishData(closeMsg) fmt.Println("用户关闭连接", client.Id) } if user.CustomerType == "2" { closeMsg.Participant = client.Id pool.PublishData(closeMsg) fmt.Println("专家关闭连接", client.Id) } pool.DelClient(client.Id) fmt.Printf("连接己经关闭%s", client.Id) }) client.OnError(func(err error) { fmt.Printf("连接%s错误信息:%s", client.Id, err.Error()) }) //开启连接 client.OpenClient(w, r, head) fmt.Println(client.Id, "开启连接") r.Close = true return } func GetClientList(w http.ResponseWriter, r *http.Request) { data := r.URL.Query() list := pool.GetList(data["source"][0], data["sourceId"][0], data["customerType"][0]) reply := make([]string, 0) for k := range list { reply = append(reply, k) } resp(w, reply) } func resp(w http.ResponseWriter, data interface{}) { w.Header().Set("content-type", "text/json") w.WriteHeader(200) w.Write(pool.SerializeJson(data)) } type GetClientListReq struct { Source string `json:"source" form:"source"` Promoter string `json:"promoter" form:"promoter"` }