Commit bd8767f2 authored by haoyanbin's avatar haoyanbin

end_time

parent c690c961
...@@ -209,14 +209,14 @@ loop: ...@@ -209,14 +209,14 @@ loop:
break loop break loop
} }
/* //ToClientId与Channel不能同时存在!!!注意!!!!
if message.ToClientId!="" {
Send(message)
}
//ToClientId与Channel不能同时存在!!!注意!!!! //ToClientId与Channel不能同时存在!!!注意!!!!
if message.Channel!="" { //if message.ToClientId != "" {
Broadcast(message) // Send(message)
}*/ //}
//ToClientId与Channel不能同时存在!!!注意!!!!
//if message.Channel != "" {
// Broadcast(message)
//}
//收到消息触发回调 //收到消息触发回调
//c.onMessage(data) //c.onMessage(data)
...@@ -266,19 +266,19 @@ Loop: ...@@ -266,19 +266,19 @@ Loop:
c.onError(errors.New("连接ID:" + c.Id + "写消息进写入IO错误!连接中断" + err.Error())) c.onError(errors.New("连接ID:" + c.Id + "写消息进写入IO错误!连接中断" + err.Error()))
goto Loop1 goto Loop1
} }
/*// Add queued chat messages to the current websocket message. // Add queued chat messages to the current websocket message.
n := len(c.sendCh) //n := len(c.sendCh)
if n > 0 { //if n > 0 {
for i := 0; i < n; i++ { // for i := 0; i < n; i++ {
//
// _, err = w.Write(<-c.sendCh)
// if err != nil {
// c.onError(errors.New("连接ID:" + c.Id + "写上次连接未发送的消息消息进写入IO错误!连接中断" + err.Error()))
// return
// }
// }
//}
_, err = w.Write(<-c.sendCh)
if err != nil {
c.onError(errors.New("连接ID:" + c.Id + "写上次连接未发送的消息消息进写入IO错误!连接中断" + err.Error()))
return
}
}
}
*/
//关闭写入io对象 //关闭写入io对象
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error())) c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error()))
......
...@@ -15,8 +15,6 @@ type Pool struct { ...@@ -15,8 +15,6 @@ type Pool struct {
rJobsChan chan func() //读工作方法 rJobsChan chan func() //读工作方法
} }
// New creates and returns a new goroutine pool object. // New creates and returns a new goroutine pool object.
// The parameter <limit> is used to limit the max goroutine count, // The parameter <limit> is used to limit the max goroutine count,
// which is not limited in default. // which is not limited in default.
...@@ -26,8 +24,8 @@ func New(limit ...int) *Pool { ...@@ -26,8 +24,8 @@ func New(limit ...int) *Pool {
count: 0, count: 0,
list: list.New(), list: list.New(),
closed: false, closed: false,
wJobsChan:make(chan func()), wJobsChan: make(chan func()),
rJobsChan:make(chan func()), rJobsChan: make(chan func()),
} }
if len(limit) > 0 && limit[0] > 0 { if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0] p.limit = limit[0]
...@@ -38,31 +36,28 @@ func New(limit ...int) *Pool { ...@@ -38,31 +36,28 @@ func New(limit ...int) *Pool {
return p return p
} }
/* //// Default goroutine pool.
// Default goroutine pool. //var pool = New()
var pool = New() //// Add pushes a new job to the pool using default goroutine pool.
// Add pushes a new job to the pool using default goroutine pool. //// The job will be executed asynchronously.
// The job will be executed asynchronously. //func Add(f func()) error {
func Add(f func()) error { // return pool.Add(f)
return pool.Add(f) //}
} //
//// Size returns current goroutine count of default goroutine pool.
// Size returns current goroutine count of default goroutine pool. //func Size() int {
func Size() int { // return pool.Size()
return pool.Size() //}
} //
//// Jobs returns current job count of default goroutine pool.
// Jobs returns current job count of default goroutine pool. //func Jobs() int {
func Jobs() int { // return pool.Jobs()
return pool.Jobs() //}
}
*/ func (p *Pool) runWrite() {
func (p *Pool) runWrite(){
for !p.closed { for !p.closed {
select { select {
case f,ok:=<-p.wJobsChan: case f, ok := <-p.wJobsChan:
if !ok { if !ok {
break break
} }
...@@ -71,11 +66,11 @@ func (p *Pool) runWrite(){ ...@@ -71,11 +66,11 @@ func (p *Pool) runWrite(){
} }
} }
func (p *Pool) runRead(){ func (p *Pool) runRead() {
for !p.closed { for !p.closed {
if job := p.list.Back(); job != nil { if job := p.list.Back(); job != nil {
value := p.list.Remove(job) value := p.list.Remove(job)
p.rJobsChan<-value.(func()) p.rJobsChan <- value.(func())
} else { } else {
return return
} }
...@@ -85,17 +80,17 @@ func (p *Pool) runRead(){ ...@@ -85,17 +80,17 @@ func (p *Pool) runRead(){
// Add pushes a new job to the pool. // Add pushes a new job to the pool.
// The job will be executed asynchronously. // The job will be executed asynchronously.
func (p *Pool) Add(f func()) error { func (p *Pool) Add(f func()) error {
for p.closed{ for p.closed {
return errors.New("pool closed") return errors.New("pool closed")
} }
p.wJobsChan<-f p.wJobsChan <- f
var n int var n int
n = p.count n = p.count
if p.limit != -1 && n >= p.limit { if p.limit != -1 && n >= p.limit {
return nil return nil
} }
p.count=n+1 p.count = n + 1
p.fork() p.fork()
return nil return nil
} }
...@@ -125,7 +120,7 @@ func (p *Pool) fork() { ...@@ -125,7 +120,7 @@ func (p *Pool) fork() {
}() }()
for !p.closed { for !p.closed {
select { select {
case job,ok:=<-p.rJobsChan: case job, ok := <-p.rJobsChan:
if !ok { if !ok {
break break
} }
...@@ -142,7 +137,7 @@ func (p *Pool) IsClosed() bool { ...@@ -142,7 +137,7 @@ func (p *Pool) IsClosed() bool {
// Close closes the goroutine pool, which makes all goroutines exit. // Close closes the goroutine pool, which makes all goroutines exit.
func (p *Pool) Close() { func (p *Pool) Close() {
p.closed=true p.closed = true
close(p.wJobsChan) close(p.wJobsChan)
close(p.rJobsChan) close(p.rJobsChan)
} }
...@@ -434,6 +434,14 @@ func PublishData(mqData *mq.SetMsgReq) { ...@@ -434,6 +434,14 @@ func PublishData(mqData *mq.SetMsgReq) {
return return
} }
func SetEndTime(){
mqData := &mq.SetMsgReq{
ProcedureType:8,
}
PublishData(mqData)
return
}
// 五分钟内 用户未连接、则判定为用户离线、这时候把会话置为离线状态、如果中间顾客 连线了则return、 如果中间客服离线了、则return // 五分钟内 用户未连接、则判定为用户离线、这时候把会话置为离线状态、如果中间顾客 连线了则return、 如果中间客服离线了、则return
//func (c *Client)SetOffline(user *UserInfo, customerId string) { //func (c *Client)SetOffline(user *UserInfo, customerId string) {
// //
......
...@@ -34,6 +34,14 @@ func main() { ...@@ -34,6 +34,14 @@ func main() {
fmt.Println("wsPool.InitWsPool error-------------", err) fmt.Println("wsPool.InitWsPool error-------------", err)
}) })
ticker := time.NewTicker(time.Minute)
go func() {
for range ticker.C {
pool.SetEndTime()
}
}()
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/", serveHome) mux.HandleFunc("/", serveHome)
...@@ -105,18 +113,18 @@ func ws(w http.ResponseWriter, r *http.Request) { ...@@ -105,18 +113,18 @@ func ws(w http.ResponseWriter, r *http.Request) {
// fmt.Println("client.Send(msg):", err.Error()) // fmt.Println("client.Send(msg):", err.Error())
//} //}
} }
/*if len(msg.Channel)>0{ //if len(msg.Channel)>0{
//按频道广播,可指定多个频道[]string // //按频道广播,可指定多个频道[]string
err:=pool.Broadcast(msg) //或者 wsPool.Broadcast(msg) // err:=pool.Broadcast(msg) //或者 wsPool.Broadcast(msg)
if err!=nil { // if err!=nil {
fmt.Println("pool.Broadcast(msg)", err.Error()) // fmt.Println("pool.Broadcast(msg)", err.Error())
} // }
} //}
//或都全局广播,所有连接都进行发送 ////或都全局广播,所有连接都进行发送
err:=pool.BroadcastAll(msg) //err:=pool.BroadcastAll(msg)
if err!=nil { //if err!=nil {
fmt.Println("pool.BroadcastAll(msg)", err.Error()) // fmt.Println("pool.BroadcastAll(msg)", err.Error())
}*/ //}
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment