forked from zhurui/management
349 lines
9.5 KiB
JavaScript
349 lines
9.5 KiB
JavaScript
|
'use strict'
|
||
|
|
||
|
const DEFAULT_OPTIONS = {
|
||
|
workerOptions : {}
|
||
|
, maxCallsPerWorker : Infinity
|
||
|
, maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length
|
||
|
, maxConcurrentCallsPerWorker : 10
|
||
|
, maxConcurrentCalls : Infinity
|
||
|
, maxCallTime : Infinity // exceed this and the whole worker is terminated
|
||
|
, maxRetries : Infinity
|
||
|
, forcedKillTime : 100
|
||
|
, autoStart : false
|
||
|
, onChild : function() {}
|
||
|
}
|
||
|
|
||
|
const fork = require('./fork')
|
||
|
, TimeoutError = require('errno').create('TimeoutError')
|
||
|
, ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
|
||
|
, MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
|
||
|
|
||
|
|
||
|
function Farm (options, path) {
|
||
|
this.options = Object.assign({}, DEFAULT_OPTIONS, options)
|
||
|
this.path = path
|
||
|
this.activeCalls = 0
|
||
|
}
|
||
|
|
||
|
|
||
|
// make a handle to pass back in the form of an external API
|
||
|
Farm.prototype.mkhandle = function (method) {
|
||
|
return function () {
|
||
|
let args = Array.prototype.slice.call(arguments)
|
||
|
if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {
|
||
|
let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')
|
||
|
if (typeof args[args.length - 1] == 'function')
|
||
|
return process.nextTick(args[args.length - 1].bind(null, err))
|
||
|
throw err
|
||
|
}
|
||
|
this.addCall({
|
||
|
method : method
|
||
|
, callback : args.pop()
|
||
|
, args : args
|
||
|
, retries : 0
|
||
|
})
|
||
|
}.bind(this)
|
||
|
}
|
||
|
|
||
|
|
||
|
// a constructor of sorts
|
||
|
Farm.prototype.setup = function (methods) {
|
||
|
let iface
|
||
|
if (!methods) { // single-function export
|
||
|
iface = this.mkhandle()
|
||
|
} else { // multiple functions on the export
|
||
|
iface = {}
|
||
|
methods.forEach(function (m) {
|
||
|
iface[m] = this.mkhandle(m)
|
||
|
}.bind(this))
|
||
|
}
|
||
|
|
||
|
this.searchStart = -1
|
||
|
this.childId = -1
|
||
|
this.children = {}
|
||
|
this.activeChildren = 0
|
||
|
this.callQueue = []
|
||
|
|
||
|
if (this.options.autoStart) {
|
||
|
while (this.activeChildren < this.options.maxConcurrentWorkers)
|
||
|
this.startChild()
|
||
|
}
|
||
|
|
||
|
return iface
|
||
|
}
|
||
|
|
||
|
|
||
|
// when a child exits, check if there are any outstanding jobs and requeue them
|
||
|
Farm.prototype.onExit = function (childId) {
|
||
|
// delay this to give any sends a chance to finish
|
||
|
setTimeout(function () {
|
||
|
let doQueue = false
|
||
|
if (this.children[childId] && this.children[childId].activeCalls) {
|
||
|
this.children[childId].calls.forEach(function (call, i) {
|
||
|
if (!call) return
|
||
|
else if (call.retries >= this.options.maxRetries) {
|
||
|
this.receive({
|
||
|
idx : i
|
||
|
, child : childId
|
||
|
, args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
|
||
|
})
|
||
|
} else {
|
||
|
call.retries++
|
||
|
this.callQueue.unshift(call)
|
||
|
doQueue = true
|
||
|
}
|
||
|
}.bind(this))
|
||
|
}
|
||
|
this.stopChild(childId)
|
||
|
doQueue && this.processQueue()
|
||
|
}.bind(this), 10)
|
||
|
}
|
||
|
|
||
|
|
||
|
// start a new worker
|
||
|
Farm.prototype.startChild = function () {
|
||
|
this.childId++
|
||
|
|
||
|
let forked = fork(this.path, this.options.workerOptions)
|
||
|
, id = this.childId
|
||
|
, c = {
|
||
|
send : forked.send
|
||
|
, child : forked.child
|
||
|
, calls : []
|
||
|
, activeCalls : 0
|
||
|
, exitCode : null
|
||
|
}
|
||
|
|
||
|
this.options.onChild(forked.child);
|
||
|
|
||
|
forked.child.on('message', function(data) {
|
||
|
if (data.owner !== 'farm') {
|
||
|
return;
|
||
|
}
|
||
|
this.receive(data);
|
||
|
}.bind(this))
|
||
|
forked.child.once('exit', function (code) {
|
||
|
c.exitCode = code
|
||
|
this.onExit(id)
|
||
|
}.bind(this))
|
||
|
|
||
|
this.activeChildren++
|
||
|
this.children[id] = c
|
||
|
}
|
||
|
|
||
|
|
||
|
// stop a worker, identified by id
|
||
|
Farm.prototype.stopChild = function (childId) {
|
||
|
let child = this.children[childId]
|
||
|
if (child) {
|
||
|
child.send({owner: 'farm', event: 'die'})
|
||
|
setTimeout(function () {
|
||
|
if (child.exitCode === null)
|
||
|
child.child.kill('SIGKILL')
|
||
|
}, this.options.forcedKillTime).unref()
|
||
|
;delete this.children[childId]
|
||
|
this.activeChildren--
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
// called from a child process, the data contains information needed to
|
||
|
// look up the child and the original call so we can invoke the callback
|
||
|
Farm.prototype.receive = function (data) {
|
||
|
let idx = data.idx
|
||
|
, childId = data.child
|
||
|
, args = data.args
|
||
|
, child = this.children[childId]
|
||
|
, call
|
||
|
|
||
|
if (!child) {
|
||
|
return console.error(
|
||
|
'Worker Farm: Received message for unknown child. '
|
||
|
+ 'This is likely as a result of premature child death, '
|
||
|
+ 'the operation will have been re-queued.'
|
||
|
)
|
||
|
}
|
||
|
|
||
|
call = child.calls[idx]
|
||
|
if (!call) {
|
||
|
return console.error(
|
||
|
'Worker Farm: Received message for unknown index for existing child. '
|
||
|
+ 'This should not happen!'
|
||
|
)
|
||
|
}
|
||
|
|
||
|
if (this.options.maxCallTime !== Infinity)
|
||
|
clearTimeout(call.timer)
|
||
|
|
||
|
if (args[0] && args[0].$error == '$error') {
|
||
|
let e = args[0]
|
||
|
switch (e.type) {
|
||
|
case 'TypeError': args[0] = new TypeError(e.message); break
|
||
|
case 'RangeError': args[0] = new RangeError(e.message); break
|
||
|
case 'EvalError': args[0] = new EvalError(e.message); break
|
||
|
case 'ReferenceError': args[0] = new ReferenceError(e.message); break
|
||
|
case 'SyntaxError': args[0] = new SyntaxError(e.message); break
|
||
|
case 'URIError': args[0] = new URIError(e.message); break
|
||
|
default: args[0] = new Error(e.message)
|
||
|
}
|
||
|
args[0].type = e.type
|
||
|
args[0].stack = e.stack
|
||
|
|
||
|
// Copy any custom properties to pass it on.
|
||
|
Object.keys(e).forEach(function(key) {
|
||
|
args[0][key] = e[key];
|
||
|
});
|
||
|
}
|
||
|
|
||
|
process.nextTick(function () {
|
||
|
call.callback.apply(null, args)
|
||
|
})
|
||
|
|
||
|
;delete child.calls[idx]
|
||
|
child.activeCalls--
|
||
|
this.activeCalls--
|
||
|
|
||
|
if (child.calls.length >= this.options.maxCallsPerWorker
|
||
|
&& !Object.keys(child.calls).length) {
|
||
|
// this child has finished its run, kill it
|
||
|
this.stopChild(childId)
|
||
|
}
|
||
|
|
||
|
// allow any outstanding calls to be processed
|
||
|
this.processQueue()
|
||
|
}
|
||
|
|
||
|
|
||
|
Farm.prototype.childTimeout = function (childId) {
|
||
|
let child = this.children[childId]
|
||
|
, i
|
||
|
|
||
|
if (!child)
|
||
|
return
|
||
|
|
||
|
for (i in child.calls) {
|
||
|
this.receive({
|
||
|
idx : i
|
||
|
, child : childId
|
||
|
, args : [ new TimeoutError('worker call timed out!') ]
|
||
|
})
|
||
|
}
|
||
|
this.stopChild(childId)
|
||
|
}
|
||
|
|
||
|
|
||
|
// send a call to a worker, identified by id
|
||
|
Farm.prototype.send = function (childId, call) {
|
||
|
let child = this.children[childId]
|
||
|
, idx = child.calls.length
|
||
|
|
||
|
child.calls.push(call)
|
||
|
child.activeCalls++
|
||
|
this.activeCalls++
|
||
|
|
||
|
child.send({
|
||
|
owner : 'farm'
|
||
|
, idx : idx
|
||
|
, child : childId
|
||
|
, method : call.method
|
||
|
, args : call.args
|
||
|
})
|
||
|
|
||
|
if (this.options.maxCallTime !== Infinity) {
|
||
|
call.timer =
|
||
|
setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
// a list of active worker ids, in order, but the starting offset is
|
||
|
// shifted each time this method is called, so we work our way through
|
||
|
// all workers when handing out jobs
|
||
|
Farm.prototype.childKeys = function () {
|
||
|
let cka = Object.keys(this.children)
|
||
|
, cks
|
||
|
|
||
|
if (this.searchStart >= cka.length - 1)
|
||
|
this.searchStart = 0
|
||
|
else
|
||
|
this.searchStart++
|
||
|
|
||
|
cks = cka.splice(0, this.searchStart)
|
||
|
|
||
|
return cka.concat(cks)
|
||
|
}
|
||
|
|
||
|
|
||
|
// Calls are added to a queue, this processes the queue and is called
|
||
|
// whenever there might be a chance to send more calls to the workers.
|
||
|
// The various options all impact on when we're able to send calls,
|
||
|
// they may need to be kept in a queue until a worker is ready.
|
||
|
Farm.prototype.processQueue = function () {
|
||
|
let cka, i = 0, childId
|
||
|
|
||
|
if (!this.callQueue.length)
|
||
|
return this.ending && this.end()
|
||
|
|
||
|
if (this.activeChildren < this.options.maxConcurrentWorkers)
|
||
|
this.startChild()
|
||
|
|
||
|
for (cka = this.childKeys(); i < cka.length; i++) {
|
||
|
childId = +cka[i]
|
||
|
if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
|
||
|
&& this.children[childId].calls.length < this.options.maxCallsPerWorker) {
|
||
|
|
||
|
this.send(childId, this.callQueue.shift())
|
||
|
if (!this.callQueue.length)
|
||
|
return this.ending && this.end()
|
||
|
} /*else {
|
||
|
console.log(
|
||
|
, this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
|
||
|
, this.children[childId].calls.length < this.options.maxCallsPerWorker
|
||
|
, this.children[childId].calls.length , this.options.maxCallsPerWorker)
|
||
|
}*/
|
||
|
}
|
||
|
|
||
|
if (this.ending)
|
||
|
this.end()
|
||
|
}
|
||
|
|
||
|
|
||
|
// add a new call to the call queue, then trigger a process of the queue
|
||
|
Farm.prototype.addCall = function (call) {
|
||
|
if (this.ending)
|
||
|
return this.end() // don't add anything new to the queue
|
||
|
this.callQueue.push(call)
|
||
|
this.processQueue()
|
||
|
}
|
||
|
|
||
|
|
||
|
// kills child workers when they're all done
|
||
|
Farm.prototype.end = function (callback) {
|
||
|
let complete = true
|
||
|
if (this.ending === false)
|
||
|
return
|
||
|
if (callback)
|
||
|
this.ending = callback
|
||
|
else if (this.ending == null)
|
||
|
this.ending = true
|
||
|
Object.keys(this.children).forEach(function (child) {
|
||
|
if (!this.children[child])
|
||
|
return
|
||
|
if (!this.children[child].activeCalls)
|
||
|
this.stopChild(child)
|
||
|
else
|
||
|
complete = false
|
||
|
}.bind(this))
|
||
|
|
||
|
if (complete && typeof this.ending == 'function') {
|
||
|
process.nextTick(function () {
|
||
|
this.ending()
|
||
|
this.ending = false
|
||
|
}.bind(this))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
module.exports = Farm
|
||
|
module.exports.TimeoutError = TimeoutError
|