ws_server.go 4.65 KB
Newer Older
haoyanbin's avatar
haoyanbin committed
1 2 3 4 5 6 7
package main

import (
	"flag"
	"fmt"
	"net/http"
	"net/http/pprof"
haoyanbin's avatar
haoyanbin committed
8 9
	"pool/dao/mq"
	"pool/dao/redis"
haoyanbin's avatar
haoyanbin committed
10 11
	"pool/pool"
	"runtime"
haoyanbin's avatar
1  
haoyanbin committed
12
	"time"
haoyanbin's avatar
haoyanbin committed
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
)

func serveHome(w http.ResponseWriter, r *http.Request) {
	fmt.Println(r.URL)
	if r.URL.Path != "/" {
		http.Error(w, "Not found", http.StatusNotFound)
		return
	}
	if r.Method != "GET" {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}
	http.ServeFile(w, r, "home.html")
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())

	flag.Parse()
haoyanbin's avatar
haoyanbin committed
32 33 34

	pool.RabbitMQ = mq.NewRabbitMQSimple("im")
	fmt.Println("rabbitMq start success")
haoyanbin's avatar
haoyanbin committed
35 36 37 38

	pool.Redis = redis.Init()
	fmt.Println("redis start success")

haoyanbin's avatar
haoyanbin committed
39 40 41 42 43 44
	//初骀化连接池
	pool.InitWsPool(func(err interface{}) {
		//接收连接池中的运行时错误信息
		fmt.Println("wsPool.InitWsPool error-------------", err)
	})

haoyanbin's avatar
haoyanbin committed
45
	//自动结束会话
haoyanbin's avatar
1  
haoyanbin committed
46 47 48
	//ticker := time.NewTicker(time.Second * 10)
	//go func() {
	//	for range ticker.C {
haoyanbin's avatar
haoyanbin committed
49
	//		pool.SetDelConversation()
haoyanbin's avatar
1  
haoyanbin committed
50 51
	//	}
	//}()
haoyanbin's avatar
haoyanbin committed
52

haoyanbin's avatar
haoyanbin committed
53 54 55 56 57 58 59 60 61 62
	mux := http.NewServeMux()
	mux.HandleFunc("/", serveHome)

	mux.HandleFunc("/debug/pprof/", pprof.Index)
	mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
	mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
	mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
	mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

	mux.HandleFunc("/ws", ws)
haoyanbin's avatar
1  
haoyanbin committed
63
	mux.HandleFunc("/ws-list", GetClientList)
haoyanbin's avatar
haoyanbin committed
64 65
	mux.HandleFunc("/ws-conv-list", GetConversationList)
	mux.HandleFunc("/ws-record-list", GetConversationRecordList)
haoyanbin's avatar
haoyanbin committed
66 67 68 69 70 71 72 73
	err := http.ListenAndServe(":11001", mux)
	if err != nil {
		fmt.Printf("ListenAndServe: %s", err.Error())
	}
}

func ws(w http.ResponseWriter, r *http.Request) {
	headData := r.Header.Get("Sec-Websocket-Protocol")
haoyanbin's avatar
1  
haoyanbin committed
74

haoyanbin's avatar
haoyanbin committed
75 76 77
	head := http.Header{}
	head.Add("Sec-Websocket-Protocol", headData)

haoyanbin's avatar
haoyanbin committed
78
	userInfo, err := pool.GetClientInfo(headData)
haoyanbin's avatar
1  
haoyanbin committed
79 80

	if err != nil {
haoyanbin's avatar
haoyanbin committed
81
		fmt.Println("用户信息报错:", err)
haoyanbin's avatar
1  
haoyanbin committed
82 83 84
		return
	}

haoyanbin's avatar
haoyanbin committed
85 86
	//实例化连接对象
	client := pool.NewClient(&pool.Config{
haoyanbin's avatar
haoyanbin committed
87 88
		Id:        userInfo.ClientId, //连接标识
		User:      userInfo.User,
haoyanbin's avatar
1  
haoyanbin committed
89 90
		Type:      "ws",               //连接类型
		Channel:   userInfo.ToChannel, //指定频道
haoyanbin's avatar
haoyanbin committed
91 92
		Goroutine: 100,
	})
haoyanbin's avatar
1  
haoyanbin committed
93

haoyanbin's avatar
haoyanbin committed
94
	fmt.Println(client.User, "实例化连接对象完成")
haoyanbin's avatar
haoyanbin committed
95 96 97

	//连接成功回调
	client.OnOpen(func() {
haoyanbin's avatar
haoyanbin committed
98
		fmt.Println("连接开启回调:", client.User)
haoyanbin's avatar
haoyanbin committed
99 100 101 102
	})

	//接收消息
	client.OnMessage(func(msg *pool.SendMsg) {
haoyanbin's avatar
1  
haoyanbin committed
103

haoyanbin's avatar
haoyanbin committed
104 105 106 107 108 109 110 111
		if msg.Status == 3 {
			fmt.Println("OnMessage:收到出错消息=》", client.Id, msg.Desc)
			return
		}
		if msg.ToClientId != "" {
			//发送消息给指定的ToClientID连接
			err := pool.Send(msg)
			if err != nil {
haoyanbin's avatar
1  
haoyanbin committed
112
				fmt.Println("pool.Send(msg):", err.Error())
haoyanbin's avatar
haoyanbin committed
113 114 115 116 117 118
			}
			//发送消息给当前连接对象
			//err = client.Send(msg)
			//if err != nil {
			//}
		}
haoyanbin's avatar
haoyanbin committed
119 120 121 122 123 124 125 126 127 128
		//if len(msg.Channel)>0{
		//	//按频道广播,可指定多个频道[]string
		//	err:=pool.Broadcast(msg) //或者 wsPool.Broadcast(msg)
		//	if err!=nil {
		//	}
		//}
		////或都全局广播,所有连接都进行发送
		//err:=pool.BroadcastAll(msg)
		//if err!=nil {
		//}
haoyanbin's avatar
haoyanbin committed
129 130

	})
haoyanbin's avatar
1  
haoyanbin committed
131

haoyanbin's avatar
haoyanbin committed
132 133
	//连接断开回调
	client.OnClose(func() {
haoyanbin's avatar
haoyanbin committed
134
		user, _ := pool.GetClientInfo(client.User)
haoyanbin's avatar
1  
haoyanbin committed
135

haoyanbin's avatar
haoyanbin committed
136
		closeMsg := &pool.SetMsgReq{}
haoyanbin's avatar
1  
haoyanbin committed
137 138 139
		closeMsg.ProcedureType = 5
		closeMsg.EndTime = time.Now().Format("2006-01-02 15:04:05")

haoyanbin's avatar
1  
haoyanbin committed
140
		if user.CustomerType == "1" {
haoyanbin's avatar
haoyanbin committed
141
			closeMsg.Promoter = client.User
haoyanbin's avatar
haoyanbin committed
142

haoyanbin's avatar
haoyanbin committed
143
			pool.PublishData(closeMsg)
haoyanbin's avatar
haoyanbin committed
144
			fmt.Println("用户关闭连接", client.User)
haoyanbin's avatar
1  
haoyanbin committed
145 146
		}

haoyanbin's avatar
1  
haoyanbin committed
147
		if user.CustomerType == "2" {
haoyanbin's avatar
haoyanbin committed
148
			closeMsg.Participant = client.User
haoyanbin's avatar
1  
haoyanbin committed
149

haoyanbin's avatar
haoyanbin committed
150
			pool.PublishData(closeMsg)
haoyanbin's avatar
haoyanbin committed
151
			fmt.Println("专家关闭连接", client.User)
haoyanbin's avatar
1  
haoyanbin committed
152
		}
haoyanbin's avatar
haoyanbin committed
153
		pool.DelClient(client.User)
haoyanbin's avatar
1  
haoyanbin committed
154

haoyanbin's avatar
haoyanbin committed
155 156 157 158 159 160 161 162 163 164 165 166 167
		fmt.Printf("连接己经关闭%s", client.Id)
	})
	client.OnError(func(err error) {
		fmt.Printf("连接%s错误信息:%s", client.Id, err.Error())
	})

	//开启连接
	client.OpenClient(w, r, head)
	fmt.Println(client.Id, "开启连接")

	r.Close = true
	return
}
haoyanbin's avatar
1  
haoyanbin committed
168 169 170

func GetClientList(w http.ResponseWriter, r *http.Request) {
	data := r.URL.Query()
haoyanbin's avatar
1  
haoyanbin committed
171
	list := pool.GetList(data["source"][0], data["sourceId"][0], data["customerType"][0])
haoyanbin's avatar
haoyanbin committed
172 173 174 175 176 177

	reply := make([]string, 0)
	for k := range list {
		reply = append(reply, k)
	}

haoyanbin's avatar
1  
haoyanbin committed
178
	resp(w, reply)
haoyanbin's avatar
haoyanbin committed
179 180
}

haoyanbin's avatar
haoyanbin committed
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
func GetConversationList(w http.ResponseWriter, r *http.Request) {
	//data := r.URL.Query()

	reply := pool.GetAllList("msg:*")

	resp(w, reply)
}

func GetConversationRecordList(w http.ResponseWriter, r *http.Request) {
	data := r.URL.Query()

	reply := pool.GetAllList("msg:" + data["conversationId"][0])

	resp(w, reply)
}

haoyanbin's avatar
1  
haoyanbin committed
197
func resp(w http.ResponseWriter, data interface{}) {
haoyanbin's avatar
haoyanbin committed
198 199 200
	w.Header().Set("content-type", "text/json")
	w.WriteHeader(200)
	w.Write(pool.SerializeJson(data))
haoyanbin's avatar
1  
haoyanbin committed
201 202 203 204 205 206
}

type GetClientListReq struct {
	Source   string `json:"source" form:"source"`
	Promoter string `json:"promoter" form:"promoter"`
}