src/streamer/streamer.js
/**
* @file Streamer
* @author Alexander Rose <alexander.rose@weirdbyte.de>
* @private
*/
import { DecompressorRegistry } from '../globals.js'
import { uint8ToString, defaults } from '../utils.js'
class Streamer {
constructor (src, params) {
const p = params || {}
this.compressed = defaults(p.compressed, false)
this.binary = defaults(p.binary, false)
this.json = defaults(p.json, false)
this.xml = defaults(p.xml, false)
this.src = src
this.chunkSize = 1024 * 1024 * 10
this.newline = '\n'
this.__pointer = 0
this.__partialLine = ''
if (this.__srcName) {
this[ this.__srcName ] = src
}
}
get type () { return '' }
get __srcName () { return undefined }
isBinary () {
return this.binary || this.compressed
}
onload () {}
onprogress () {}
onerror () {}
read () {
return new Promise((resolve, reject) => {
try {
this._read(data => {
const decompressFn = DecompressorRegistry.get(this.compressed)
if (this.compressed && decompressFn) {
this.data = decompressFn(data)
} else {
if ((this.binary || this.compressed) && data instanceof ArrayBuffer) {
data = new Uint8Array(data)
}
this.data = data
}
resolve(this.data)
})
} catch (e) {
reject(e)
}
})
}
_read (callback) {
// overwrite this method when this.src does not contain the data
callback(this.src)
}
_chunk (start, end) {
end = Math.min(this.data.length, end)
if (start === 0 && this.data.length === end) {
return this.data
} else {
if (this.isBinary()) {
return this.data.subarray(start, end)
} else {
return this.data.substring(start, end)
}
}
}
chunk (start) {
const end = start + this.chunkSize
return this._chunk(start, end)
}
peekLines (m) {
const data = this.data
const n = data.length
// FIXME does not work for multi-char newline
const newline = this.isBinary() ? this.newline.charCodeAt(0) : this.newline
let i
let count = 0
for (i = 0; i < n; ++i) {
if (data[ i ] === newline) ++count
if (count === m) break
}
const chunk = this._chunk(0, i + 1)
const d = this.chunkToLines(chunk, '', i > n)
return d.lines
}
chunkCount () {
return Math.floor(this.data.length / this.chunkSize) + 1
}
asText () {
return this.isBinary() ? uint8ToString(this.data) : this.data
}
chunkToLines (chunk, partialLine, isLast) {
const newline = this.newline
if (!this.isBinary() && chunk.length === this.data.length) {
return {
lines: chunk.split(newline),
partialLine: ''
}
}
let lines = []
const str = this.isBinary() ? uint8ToString(chunk) : chunk
const idx = str.lastIndexOf(newline)
if (idx === -1) {
partialLine += str
} else {
const str2 = partialLine + str.substr(0, idx)
lines = lines.concat(str2.split(newline))
if (idx === str.length - newline.length) {
partialLine = ''
} else {
partialLine = str.substr(idx + newline.length)
}
}
if (isLast && partialLine !== '') {
lines.push(partialLine)
}
return {
lines: lines,
partialLine: partialLine
}
}
nextChunk () {
const start = this.__pointer
if (start > this.data.length) {
return undefined
}
this.__pointer += this.chunkSize
return this.chunk(start)
}
nextChunkOfLines () {
const chunk = this.nextChunk()
if (chunk === undefined) {
return undefined
}
const isLast = this.__pointer > this.data.length
const d = this.chunkToLines(chunk, this.__partialLine, isLast)
this.__partialLine = d.partialLine
return d.lines
}
eachChunk (callback) {
const chunkSize = this.chunkSize
const n = this.data.length
const chunkCount = this.chunkCount()
for (let i = 0; i < n; i += chunkSize) {
const chunk = this.chunk(i)
const chunkNo = Math.round(i / chunkSize)
callback(chunk, chunkNo, chunkCount)
}
}
eachChunkOfLines (callback) {
this.eachChunk((chunk, chunkNo, chunkCount) => {
const isLast = chunkNo === chunkCount + 1
const d = this.chunkToLines(chunk, this.__partialLine, isLast)
this.__partialLine = d.partialLine
callback(d.lines, chunkNo, chunkCount)
})
}
dispose () {
delete this.src
if (this.__srcName) {
delete this[ this.__srcName ]
}
}
}
export default Streamer