priorityQueue.go 3.12 KB
Newer Older
haoyanbin's avatar
haoyanbin committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
// bill 2018.1.8
//优先级队列[同级别先进先出]权重值越大越优先
package queue

import (
	"container/list"
	"log"
	"sync"
	"time"
)

type Item struct {
	Data         interface{} //数据
	Priority     int32         //优先级
	AddTime   time.Time      //插入队列的时间
	Expiration int64 //过期时间值 以秒为单位
}

type PriorityQueue struct {
	Data        *list.List
	PriorityMap map[int32]*pqmap
}

type pqmap struct {
	element *list.Element
	totle   int
}

var lock sync.RWMutex

func NewPriorityQueue() *PriorityQueue {
	pq:= &PriorityQueue{
		Data:        list.New(),
		PriorityMap: make(map[int32]*pqmap),
	}

	return pq
}

func (pq *PriorityQueue) Len() int {
	defer lock.RUnlock()
	lock.RLock()
	return pq.Data.Len()
}

func (pq *PriorityQueue) Push(v *Item) {
	defer lock.Unlock()
	lock.Lock()

	newElement := pq.Data.PushFront(v)
	if _, ok := pq.PriorityMap[v.Priority]; !ok {
		pq.PriorityMap[v.Priority] = &pqmap{
			element: newElement,
			totle:   1,
		}
	} else {
		pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle + 1
	}
	//找出小于自己的最大值权重值
	var maxKey int32 = 1
	for p, _ := range pq.PriorityMap {
		if p < v.Priority && p >= maxKey {
			maxKey = p
		}
	}
	//pq.Dump()
	if v.Priority != maxKey {
		if _, ok := pq.PriorityMap[maxKey]; ok {
			pq.Data.MoveAfter(newElement, pq.PriorityMap[maxKey].element)
		}
	}
	//log.Println("挺入队列的消息:",v,"消息权重值:",v.Priority)
}

func (pq *PriorityQueue) Pop() *Item {
	defer lock.Unlock()
	lock.Lock()
	iter := pq.Data.Back()
	if iter==nil||iter.Value==nil{
		return nil
	}
	v := iter.Value.(*Item)
	pq.Data.Remove(iter)
	if pq.PriorityMap[v.Priority].totle > 1 {
		pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
	} else {
		delete(pq.PriorityMap, v.Priority)
	}
	//log.Println("取出队列的消息:",v,"消息权重值:",v.Priority)
	return v
}

func (pq *PriorityQueue) Dump() {
	for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
		log.Println("队列信息:", iter.Value.(*Item))
	}
}
//清除队列
func (pq *PriorityQueue) Clear() {
	defer lock.RUnlock()
	lock.RLock()
	for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
		//fmt.Println("item:", iter.Value.(*Item))
		v := iter.Value.(*Item)
		pq.Data.Remove(iter)
		if pq.PriorityMap[v.Priority].totle > 1 {
			pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
		} else {
			delete(pq.PriorityMap, v.Priority)
		}
	}
}

//检测超时任务
func (pq *PriorityQueue) Expirations(expriCallback func(item *Item)) {
	defer lock.RUnlock()
	lock.RLock()
	for iter := pq.Data.Back(); iter != nil; iter = iter.Prev() {
		//fmt.Println("item:", iter.Value.(*Item))
		v := iter.Value.(*Item)
		if v.Expiration==0 {
			continue
		}
		isExpri:=v.AddTime.Add(time.Duration(v.Expiration)*time.Second).Before(time.Now())
		if isExpri {
			pq.Data.Remove(iter)
			if pq.PriorityMap[v.Priority].totle > 1 {
				pq.PriorityMap[v.Priority].totle = pq.PriorityMap[v.Priority].totle - 1
			} else {
				delete(pq.PriorityMap, v.Priority)
			}
			expriCallback(v)
		}else{
			//没过期说明越往前越新
			//break
		}

	}
}