219 lines
4.9 KiB
JavaScript
219 lines
4.9 KiB
JavaScript
'use strict'
|
|
|
|
const check = require('check-types')
|
|
const DataStream = require('./datastream')
|
|
const events = require('./events')
|
|
const Hoopy = require('hoopy')
|
|
const walk = require('./walk')
|
|
|
|
const DEFAULT_BUFFER_LENGTH = 1024
|
|
|
|
module.exports = match
|
|
|
|
/**
|
|
* Public function `match`.
|
|
*
|
|
* Asynchronously parses a stream of JSON data, returning a stream of items
|
|
* that match the argument. Note that if a value is `null`, it won't be matched
|
|
* because `null` is used to signify end-of-stream in node.
|
|
*
|
|
* @param stream: Readable instance representing the incoming JSON.
|
|
*
|
|
* @param selector: Regular expression, string or predicate function used to
|
|
* identify matches. If a regular expression or string is
|
|
* passed, only property keys are tested. If a predicate is
|
|
* passed, both the key and the value are passed to it as
|
|
* arguments.
|
|
*
|
|
* @option numbers: Boolean, indicating whether numerical keys (e.g. array
|
|
* indices) should be coerced to strings before testing the
|
|
* match. Only applies if the `selector` argument is a string
|
|
* or regular expression.
|
|
*
|
|
* @option ndjson: Set this to true to parse newline-delimited JSON,
|
|
* default is `false`.
|
|
*
|
|
* @option yieldRate: The number of data items to process per timeslice,
|
|
* default is 16384.
|
|
*
|
|
* @option bufferLength: The length of the match buffer, default is 1024.
|
|
*
|
|
* @option highWaterMark: If set, will be passed to the readable stream constructor
|
|
* as the value for the highWaterMark option.
|
|
*
|
|
* @option Promise: The promise constructor to use, defaults to bluebird.
|
|
**/
|
|
function match (stream, selector, options = {}) {
|
|
const scopes = []
|
|
const properties = []
|
|
const emitter = walk(stream, options)
|
|
const matches = new Hoopy(options.bufferLength || DEFAULT_BUFFER_LENGTH)
|
|
let streamOptions
|
|
const { highWaterMark } = options
|
|
if (highWaterMark) {
|
|
streamOptions = { highWaterMark }
|
|
}
|
|
const results = new DataStream(read, streamOptions)
|
|
|
|
let selectorFunction, selectorString, resume
|
|
let coerceNumbers = false
|
|
let awaitPush = true
|
|
let isEnded = false
|
|
let length = 0
|
|
let index = 0
|
|
|
|
if (check.function(selector)) {
|
|
selectorFunction = selector
|
|
selector = null
|
|
} else {
|
|
coerceNumbers = !! options.numbers
|
|
|
|
if (check.string(selector)) {
|
|
check.assert.nonEmptyString(selector)
|
|
selectorString = selector
|
|
selector = null
|
|
} else {
|
|
check.assert.instanceStrict(selector, RegExp)
|
|
}
|
|
}
|
|
|
|
emitter.on(events.array, array)
|
|
emitter.on(events.object, object)
|
|
emitter.on(events.property, property)
|
|
emitter.on(events.endArray, endScope)
|
|
emitter.on(events.endObject, endScope)
|
|
emitter.on(events.string, value)
|
|
emitter.on(events.number, value)
|
|
emitter.on(events.literal, value)
|
|
emitter.on(events.end, end)
|
|
emitter.on(events.error, error)
|
|
emitter.on(events.dataError, dataError)
|
|
|
|
return results
|
|
|
|
function read () {
|
|
if (awaitPush) {
|
|
awaitPush = false
|
|
|
|
if (isEnded) {
|
|
if (length > 0) {
|
|
after()
|
|
}
|
|
|
|
return endResults()
|
|
}
|
|
}
|
|
|
|
if (resume) {
|
|
const resumeCopy = resume
|
|
resume = null
|
|
resumeCopy()
|
|
after()
|
|
}
|
|
}
|
|
|
|
function after () {
|
|
if (awaitPush || resume) {
|
|
return
|
|
}
|
|
|
|
let i
|
|
|
|
for (i = 0; i < length && ! resume; ++i) {
|
|
if (! results.push(matches[i + index])) {
|
|
pause()
|
|
}
|
|
}
|
|
|
|
if (i === length) {
|
|
index = length = 0
|
|
} else {
|
|
length -= i
|
|
index += i
|
|
}
|
|
}
|
|
|
|
function pause () {
|
|
resume = emitter.pause()
|
|
}
|
|
|
|
function endResults () {
|
|
if (! awaitPush) {
|
|
results.push(null)
|
|
}
|
|
}
|
|
|
|
function array () {
|
|
scopes.push([])
|
|
}
|
|
|
|
function object () {
|
|
scopes.push({})
|
|
}
|
|
|
|
function property (name) {
|
|
properties.push(name)
|
|
}
|
|
|
|
function endScope () {
|
|
value(scopes.pop())
|
|
}
|
|
|
|
function value (v) {
|
|
let key
|
|
|
|
if (scopes.length > 0) {
|
|
const scope = scopes[scopes.length - 1]
|
|
|
|
if (Array.isArray(scope)) {
|
|
key = scope.length
|
|
} else {
|
|
key = properties.pop()
|
|
}
|
|
|
|
scope[key] = v
|
|
}
|
|
|
|
if (v === null) {
|
|
return
|
|
}
|
|
|
|
if (selectorFunction) {
|
|
if (selectorFunction(key, v, scopes.length)) {
|
|
push(v)
|
|
}
|
|
} else {
|
|
if (coerceNumbers && typeof key === 'number') {
|
|
key = key.toString()
|
|
}
|
|
|
|
if ((selectorString && selectorString === key) || (selector && selector.test(key))) {
|
|
push(v)
|
|
}
|
|
}
|
|
}
|
|
|
|
function push (v) {
|
|
if (length + 1 === matches.length) {
|
|
pause()
|
|
}
|
|
|
|
matches[index + length++] = v
|
|
|
|
after()
|
|
}
|
|
|
|
function end () {
|
|
isEnded = true
|
|
endResults()
|
|
}
|
|
|
|
function error (e) {
|
|
results.emit('error', e)
|
|
}
|
|
|
|
function dataError (e) {
|
|
results.emit('dataError', e)
|
|
}
|
|
}
|