-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
99 lines (75 loc) · 1.78 KB
/
index.js
File metadata and controls
99 lines (75 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
const EventEmitter = require('events')
function qWorker (options = {}) {
const _delay = options.delay || 0
const _concurrent = options.concurrent || 0
const _queue = new Map()
const _processing = new Map()
const _emitter = new EventEmitter()
let _i = 0
function _addQueueTask (fn) {
_i++
_queue.set(_i, fn)
return _i
}
function _addProcessingTask (i, fn) {
_processing.set(i, fn)
return i
}
function _removeQueuedTask (i) {
return _queue.delete(i)
}
function _removeProcessingTask (i) {
_processing.delete(i)
if (_processing.size === 0) {
return _emitter.emit('done')
}
}
async function _handleTask (i, fn) {
let res
_removeQueuedTask(i)
_addProcessingTask(i, fn)
try {
res = await fn()
return _emitter.emit(`done:${i}`, res)
} catch (err) {
return _emitter.emit(`error:${i}`, err)
} finally {
setTimeout(() => _removeProcessingTask(i), _delay)
}
}
function _work () {
setTimeout(() => {
let current = 0
for (const [i, fn] of _queue) {
if (_concurrent && current === _concurrent) {
break
}
_handleTask(i, fn)
current++
}
_work()
}, _delay)
}
async function _add (fn) {
if (typeof fn !== 'function') {
throw new TypeError('Function required.')
}
const i = _addQueueTask(fn)
return new Promise((resolve, reject) => {
_emitter.on(`done:${i}`, (res) => {
_emitter.removeAllListeners(`done:${i}`)
return resolve(res)
})
_emitter.on(`error:${i}`, (err) => {
_emitter.removeAllListeners(`error:${i}`)
return reject(err)
})
})
}
_work()
return {
add: _add,
events: _emitter
}
}
module.exports = qWorker