1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package pool
import (
"container/list"
"errors"
)
// Goroutine Pool
type Pool struct {
limit int // Max goroutine count limit.
count int // Current running goroutine count.
list *list.List // Job list for asynchronous job adding purpose.
closed bool // Is pool closed or not.
wJobsChan chan func() //写工作方法
rJobsChan chan func() //读工作方法
}
// New creates and returns a new goroutine pool object.
// The parameter <limit> is used to limit the max goroutine count,
// which is not limited in default.
func New(limit ...int) *Pool {
p := &Pool{
limit: -1,
count: 0,
list: list.New(),
closed: false,
wJobsChan:make(chan func()),
rJobsChan:make(chan func()),
}
if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0]
}
//开始工作
go p.runWrite()
go p.runRead()
return p
}
/*
// Default goroutine pool.
var pool = New()
// Add pushes a new job to the pool using default goroutine pool.
// The job will be executed asynchronously.
func Add(f func()) error {
return pool.Add(f)
}
// Size returns current goroutine count of default goroutine pool.
func Size() int {
return pool.Size()
}
// Jobs returns current job count of default goroutine pool.
func Jobs() int {
return pool.Jobs()
}
*/
func (p *Pool) runWrite(){
for !p.closed {
select {
case f,ok:=<-p.wJobsChan:
if !ok {
break
}
p.list.PushFront(f)
}
}
}
func (p *Pool) runRead(){
for !p.closed {
if job := p.list.Back(); job != nil {
value := p.list.Remove(job)
p.rJobsChan<-value.(func())
} else {
return
}
}
}
// Add pushes a new job to the pool.
// The job will be executed asynchronously.
func (p *Pool) Add(f func()) error {
for p.closed{
return errors.New("pool closed")
}
p.wJobsChan<-f
var n int
n = p.count
if p.limit != -1 && n >= p.limit {
return nil
}
p.count=n+1
p.fork()
return nil
}
// Cap returns the capacity of the pool.
// This capacity is defined when pool is created.
// If it returns -1 means no limit.
func (p *Pool) Cap() int {
return p.limit
}
// Size returns current goroutine count of the pool.
func (p *Pool) Size() int {
return p.count
}
// Jobs returns current job count of the pool.
func (p *Pool) Jobs() int {
return p.list.Len()
}
// fork creates a new goroutine pool.
func (p *Pool) fork() {
go func() {
defer func() {
p.count--
}()
for !p.closed {
select {
case job,ok:=<-p.rJobsChan:
if !ok {
break
}
job()
}
}
}()
}
// IsClosed returns if pool is closed.
func (p *Pool) IsClosed() bool {
return p.closed
}
// Close closes the goroutine pool, which makes all goroutines exit.
func (p *Pool) Close() {
p.closed=true
close(p.wJobsChan)
close(p.rJobsChan)
}