You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1336 lines
34 KiB
1336 lines
34 KiB
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex|Readable$" }] */ |
|
|
|
'use strict'; |
|
|
|
const EventEmitter = require('events'); |
|
const https = require('https'); |
|
const http = require('http'); |
|
const net = require('net'); |
|
const tls = require('tls'); |
|
const { randomBytes, createHash } = require('crypto'); |
|
const { Duplex, Readable } = require('stream'); |
|
const { URL } = require('url'); |
|
|
|
const PerMessageDeflate = require('./permessage-deflate'); |
|
const Receiver = require('./receiver'); |
|
const Sender = require('./sender'); |
|
const { |
|
BINARY_TYPES, |
|
EMPTY_BUFFER, |
|
GUID, |
|
kForOnEventAttribute, |
|
kListener, |
|
kStatusCode, |
|
kWebSocket, |
|
NOOP |
|
} = require('./constants'); |
|
const { |
|
EventTarget: { addEventListener, removeEventListener } |
|
} = require('./event-target'); |
|
const { format, parse } = require('./extension'); |
|
const { toBuffer } = require('./buffer-util'); |
|
|
|
const closeTimeout = 30 * 1000; |
|
const kAborted = Symbol('kAborted'); |
|
const protocolVersions = [8, 13]; |
|
const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED']; |
|
const subprotocolRegex = /^[!#$%&'*+\-.0-9A-Z^_`|a-z~]+$/; |
|
|
|
/** |
|
* Class representing a WebSocket. |
|
* |
|
* @extends EventEmitter |
|
*/ |
|
class WebSocket extends EventEmitter { |
|
/** |
|
* Create a new `WebSocket`. |
|
* |
|
* @param {(String|URL)} address The URL to which to connect |
|
* @param {(String|String[])} [protocols] The subprotocols |
|
* @param {Object} [options] Connection options |
|
*/ |
|
constructor(address, protocols, options) { |
|
super(); |
|
|
|
this._binaryType = BINARY_TYPES[0]; |
|
this._closeCode = 1006; |
|
this._closeFrameReceived = false; |
|
this._closeFrameSent = false; |
|
this._closeMessage = EMPTY_BUFFER; |
|
this._closeTimer = null; |
|
this._extensions = {}; |
|
this._paused = false; |
|
this._protocol = ''; |
|
this._readyState = WebSocket.CONNECTING; |
|
this._receiver = null; |
|
this._sender = null; |
|
this._socket = null; |
|
|
|
if (address !== null) { |
|
this._bufferedAmount = 0; |
|
this._isServer = false; |
|
this._redirects = 0; |
|
|
|
if (protocols === undefined) { |
|
protocols = []; |
|
} else if (!Array.isArray(protocols)) { |
|
if (typeof protocols === 'object' && protocols !== null) { |
|
options = protocols; |
|
protocols = []; |
|
} else { |
|
protocols = [protocols]; |
|
} |
|
} |
|
|
|
initAsClient(this, address, protocols, options); |
|
} else { |
|
this._autoPong = options.autoPong; |
|
this._isServer = true; |
|
} |
|
} |
|
|
|
/** |
|
* This deviates from the WHATWG interface since ws doesn't support the |
|
* required default "blob" type (instead we define a custom "nodebuffer" |
|
* type). |
|
* |
|
* @type {String} |
|
*/ |
|
get binaryType() { |
|
return this._binaryType; |
|
} |
|
|
|
set binaryType(type) { |
|
if (!BINARY_TYPES.includes(type)) return; |
|
|
|
this._binaryType = type; |
|
|
|
// |
|
// Allow to change `binaryType` on the fly. |
|
// |
|
if (this._receiver) this._receiver._binaryType = type; |
|
} |
|
|
|
/** |
|
* @type {Number} |
|
*/ |
|
get bufferedAmount() { |
|
if (!this._socket) return this._bufferedAmount; |
|
|
|
return this._socket._writableState.length + this._sender._bufferedBytes; |
|
} |
|
|
|
/** |
|
* @type {String} |
|
*/ |
|
get extensions() { |
|
return Object.keys(this._extensions).join(); |
|
} |
|
|
|
/** |
|
* @type {Boolean} |
|
*/ |
|
get isPaused() { |
|
return this._paused; |
|
} |
|
|
|
/** |
|
* @type {Function} |
|
*/ |
|
/* istanbul ignore next */ |
|
get onclose() { |
|
return null; |
|
} |
|
|
|
/** |
|
* @type {Function} |
|
*/ |
|
/* istanbul ignore next */ |
|
get onerror() { |
|
return null; |
|
} |
|
|
|
/** |
|
* @type {Function} |
|
*/ |
|
/* istanbul ignore next */ |
|
get onopen() { |
|
return null; |
|
} |
|
|
|
/** |
|
* @type {Function} |
|
*/ |
|
/* istanbul ignore next */ |
|
get onmessage() { |
|
return null; |
|
} |
|
|
|
/** |
|
* @type {String} |
|
*/ |
|
get protocol() { |
|
return this._protocol; |
|
} |
|
|
|
/** |
|
* @type {Number} |
|
*/ |
|
get readyState() { |
|
return this._readyState; |
|
} |
|
|
|
/** |
|
* @type {String} |
|
*/ |
|
get url() { |
|
return this._url; |
|
} |
|
|
|
/** |
|
* Set up the socket and the internal resources. |
|
* |
|
* @param {Duplex} socket The network socket between the server and client |
|
* @param {Buffer} head The first packet of the upgraded stream |
|
* @param {Object} options Options object |
|
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether |
|
* any of the `'message'`, `'ping'`, and `'pong'` events can be emitted |
|
* multiple times in the same tick |
|
* @param {Function} [options.generateMask] The function used to generate the |
|
* masking key |
|
* @param {Number} [options.maxPayload=0] The maximum allowed message size |
|
* @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or |
|
* not to skip UTF-8 validation for text and close messages |
|
* @private |
|
*/ |
|
setSocket(socket, head, options) { |
|
const receiver = new Receiver({ |
|
allowSynchronousEvents: options.allowSynchronousEvents, |
|
binaryType: this.binaryType, |
|
extensions: this._extensions, |
|
isServer: this._isServer, |
|
maxPayload: options.maxPayload, |
|
skipUTF8Validation: options.skipUTF8Validation |
|
}); |
|
|
|
this._sender = new Sender(socket, this._extensions, options.generateMask); |
|
this._receiver = receiver; |
|
this._socket = socket; |
|
|
|
receiver[kWebSocket] = this; |
|
socket[kWebSocket] = this; |
|
|
|
receiver.on('conclude', receiverOnConclude); |
|
receiver.on('drain', receiverOnDrain); |
|
receiver.on('error', receiverOnError); |
|
receiver.on('message', receiverOnMessage); |
|
receiver.on('ping', receiverOnPing); |
|
receiver.on('pong', receiverOnPong); |
|
|
|
// |
|
// These methods may not be available if `socket` is just a `Duplex`. |
|
// |
|
if (socket.setTimeout) socket.setTimeout(0); |
|
if (socket.setNoDelay) socket.setNoDelay(); |
|
|
|
if (head.length > 0) socket.unshift(head); |
|
|
|
socket.on('close', socketOnClose); |
|
socket.on('data', socketOnData); |
|
socket.on('end', socketOnEnd); |
|
socket.on('error', socketOnError); |
|
|
|
this._readyState = WebSocket.OPEN; |
|
this.emit('open'); |
|
} |
|
|
|
/** |
|
* Emit the `'close'` event. |
|
* |
|
* @private |
|
*/ |
|
emitClose() { |
|
if (!this._socket) { |
|
this._readyState = WebSocket.CLOSED; |
|
this.emit('close', this._closeCode, this._closeMessage); |
|
return; |
|
} |
|
|
|
if (this._extensions[PerMessageDeflate.extensionName]) { |
|
this._extensions[PerMessageDeflate.extensionName].cleanup(); |
|
} |
|
|
|
this._receiver.removeAllListeners(); |
|
this._readyState = WebSocket.CLOSED; |
|
this.emit('close', this._closeCode, this._closeMessage); |
|
} |
|
|
|
/** |
|
* Start a closing handshake. |
|
* |
|
* +----------+ +-----------+ +----------+ |
|
* - - -|ws.close()|-->|close frame|-->|ws.close()|- - - |
|
* | +----------+ +-----------+ +----------+ | |
|
* +----------+ +-----------+ | |
|
* CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING |
|
* +----------+ +-----------+ | |
|
* | | | +---+ | |
|
* +------------------------+-->|fin| - - - - |
|
* | +---+ | +---+ |
|
* - - - - -|fin|<---------------------+ |
|
* +---+ |
|
* |
|
* @param {Number} [code] Status code explaining why the connection is closing |
|
* @param {(String|Buffer)} [data] The reason why the connection is |
|
* closing |
|
* @public |
|
*/ |
|
close(code, data) { |
|
if (this.readyState === WebSocket.CLOSED) return; |
|
if (this.readyState === WebSocket.CONNECTING) { |
|
const msg = 'WebSocket was closed before the connection was established'; |
|
abortHandshake(this, this._req, msg); |
|
return; |
|
} |
|
|
|
if (this.readyState === WebSocket.CLOSING) { |
|
if ( |
|
this._closeFrameSent && |
|
(this._closeFrameReceived || this._receiver._writableState.errorEmitted) |
|
) { |
|
this._socket.end(); |
|
} |
|
|
|
return; |
|
} |
|
|
|
this._readyState = WebSocket.CLOSING; |
|
this._sender.close(code, data, !this._isServer, (err) => { |
|
// |
|
// This error is handled by the `'error'` listener on the socket. We only |
|
// want to know if the close frame has been sent here. |
|
// |
|
if (err) return; |
|
|
|
this._closeFrameSent = true; |
|
|
|
if ( |
|
this._closeFrameReceived || |
|
this._receiver._writableState.errorEmitted |
|
) { |
|
this._socket.end(); |
|
} |
|
}); |
|
|
|
// |
|
// Specify a timeout for the closing handshake to complete. |
|
// |
|
this._closeTimer = setTimeout( |
|
this._socket.destroy.bind(this._socket), |
|
closeTimeout |
|
); |
|
} |
|
|
|
/** |
|
* Pause the socket. |
|
* |
|
* @public |
|
*/ |
|
pause() { |
|
if ( |
|
this.readyState === WebSocket.CONNECTING || |
|
this.readyState === WebSocket.CLOSED |
|
) { |
|
return; |
|
} |
|
|
|
this._paused = true; |
|
this._socket.pause(); |
|
} |
|
|
|
/** |
|
* Send a ping. |
|
* |
|
* @param {*} [data] The data to send |
|
* @param {Boolean} [mask] Indicates whether or not to mask `data` |
|
* @param {Function} [cb] Callback which is executed when the ping is sent |
|
* @public |
|
*/ |
|
ping(data, mask, cb) { |
|
if (this.readyState === WebSocket.CONNECTING) { |
|
throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); |
|
} |
|
|
|
if (typeof data === 'function') { |
|
cb = data; |
|
data = mask = undefined; |
|
} else if (typeof mask === 'function') { |
|
cb = mask; |
|
mask = undefined; |
|
} |
|
|
|
if (typeof data === 'number') data = data.toString(); |
|
|
|
if (this.readyState !== WebSocket.OPEN) { |
|
sendAfterClose(this, data, cb); |
|
return; |
|
} |
|
|
|
if (mask === undefined) mask = !this._isServer; |
|
this._sender.ping(data || EMPTY_BUFFER, mask, cb); |
|
} |
|
|
|
/** |
|
* Send a pong. |
|
* |
|
* @param {*} [data] The data to send |
|
* @param {Boolean} [mask] Indicates whether or not to mask `data` |
|
* @param {Function} [cb] Callback which is executed when the pong is sent |
|
* @public |
|
*/ |
|
pong(data, mask, cb) { |
|
if (this.readyState === WebSocket.CONNECTING) { |
|
throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); |
|
} |
|
|
|
if (typeof data === 'function') { |
|
cb = data; |
|
data = mask = undefined; |
|
} else if (typeof mask === 'function') { |
|
cb = mask; |
|
mask = undefined; |
|
} |
|
|
|
if (typeof data === 'number') data = data.toString(); |
|
|
|
if (this.readyState !== WebSocket.OPEN) { |
|
sendAfterClose(this, data, cb); |
|
return; |
|
} |
|
|
|
if (mask === undefined) mask = !this._isServer; |
|
this._sender.pong(data || EMPTY_BUFFER, mask, cb); |
|
} |
|
|
|
/** |
|
* Resume the socket. |
|
* |
|
* @public |
|
*/ |
|
resume() { |
|
if ( |
|
this.readyState === WebSocket.CONNECTING || |
|
this.readyState === WebSocket.CLOSED |
|
) { |
|
return; |
|
} |
|
|
|
this._paused = false; |
|
if (!this._receiver._writableState.needDrain) this._socket.resume(); |
|
} |
|
|
|
/** |
|
* Send a data message. |
|
* |
|
* @param {*} data The message to send |
|
* @param {Object} [options] Options object |
|
* @param {Boolean} [options.binary] Specifies whether `data` is binary or |
|
* text |
|
* @param {Boolean} [options.compress] Specifies whether or not to compress |
|
* `data` |
|
* @param {Boolean} [options.fin=true] Specifies whether the fragment is the |
|
* last one |
|
* @param {Boolean} [options.mask] Specifies whether or not to mask `data` |
|
* @param {Function} [cb] Callback which is executed when data is written out |
|
* @public |
|
*/ |
|
send(data, options, cb) { |
|
if (this.readyState === WebSocket.CONNECTING) { |
|
throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); |
|
} |
|
|
|
if (typeof options === 'function') { |
|
cb = options; |
|
options = {}; |
|
} |
|
|
|
if (typeof data === 'number') data = data.toString(); |
|
|
|
if (this.readyState !== WebSocket.OPEN) { |
|
sendAfterClose(this, data, cb); |
|
return; |
|
} |
|
|
|
const opts = { |
|
binary: typeof data !== 'string', |
|
mask: !this._isServer, |
|
compress: true, |
|
fin: true, |
|
...options |
|
}; |
|
|
|
if (!this._extensions[PerMessageDeflate.extensionName]) { |
|
opts.compress = false; |
|
} |
|
|
|
this._sender.send(data || EMPTY_BUFFER, opts, cb); |
|
} |
|
|
|
/** |
|
* Forcibly close the connection. |
|
* |
|
* @public |
|
*/ |
|
terminate() { |
|
if (this.readyState === WebSocket.CLOSED) return; |
|
if (this.readyState === WebSocket.CONNECTING) { |
|
const msg = 'WebSocket was closed before the connection was established'; |
|
abortHandshake(this, this._req, msg); |
|
return; |
|
} |
|
|
|
if (this._socket) { |
|
this._readyState = WebSocket.CLOSING; |
|
this._socket.destroy(); |
|
} |
|
} |
|
} |
|
|
|
/** |
|
* @constant {Number} CONNECTING |
|
* @memberof WebSocket |
|
*/ |
|
Object.defineProperty(WebSocket, 'CONNECTING', { |
|
enumerable: true, |
|
value: readyStates.indexOf('CONNECTING') |
|
}); |
|
|
|
/** |
|
* @constant {Number} CONNECTING |
|
* @memberof WebSocket.prototype |
|
*/ |
|
Object.defineProperty(WebSocket.prototype, 'CONNECTING', { |
|
enumerable: true, |
|
value: readyStates.indexOf('CONNECTING') |
|
}); |
|
|
|
/** |
|
* @constant {Number} OPEN |
|
* @memberof WebSocket |
|
*/ |
|
Object.defineProperty(WebSocket, 'OPEN', { |
|
enumerable: true, |
|
value: readyStates.indexOf('OPEN') |
|
}); |
|
|
|
/** |
|
* @constant {Number} OPEN |
|
* @memberof WebSocket.prototype |
|
*/ |
|
Object.defineProperty(WebSocket.prototype, 'OPEN', { |
|
enumerable: true, |
|
value: readyStates.indexOf('OPEN') |
|
}); |
|
|
|
/** |
|
* @constant {Number} CLOSING |
|
* @memberof WebSocket |
|
*/ |
|
Object.defineProperty(WebSocket, 'CLOSING', { |
|
enumerable: true, |
|
value: readyStates.indexOf('CLOSING') |
|
}); |
|
|
|
/** |
|
* @constant {Number} CLOSING |
|
* @memberof WebSocket.prototype |
|
*/ |
|
Object.defineProperty(WebSocket.prototype, 'CLOSING', { |
|
enumerable: true, |
|
value: readyStates.indexOf('CLOSING') |
|
}); |
|
|
|
/** |
|
* @constant {Number} CLOSED |
|
* @memberof WebSocket |
|
*/ |
|
Object.defineProperty(WebSocket, 'CLOSED', { |
|
enumerable: true, |
|
value: readyStates.indexOf('CLOSED') |
|
}); |
|
|
|
/** |
|
* @constant {Number} CLOSED |
|
* @memberof WebSocket.prototype |
|
*/ |
|
Object.defineProperty(WebSocket.prototype, 'CLOSED', { |
|
enumerable: true, |
|
value: readyStates.indexOf('CLOSED') |
|
}); |
|
|
|
[ |
|
'binaryType', |
|
'bufferedAmount', |
|
'extensions', |
|
'isPaused', |
|
'protocol', |
|
'readyState', |
|
'url' |
|
].forEach((property) => { |
|
Object.defineProperty(WebSocket.prototype, property, { enumerable: true }); |
|
}); |
|
|
|
// |
|
// Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes. |
|
// See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface |
|
// |
|
['open', 'error', 'close', 'message'].forEach((method) => { |
|
Object.defineProperty(WebSocket.prototype, `on${method}`, { |
|
enumerable: true, |
|
get() { |
|
for (const listener of this.listeners(method)) { |
|
if (listener[kForOnEventAttribute]) return listener[kListener]; |
|
} |
|
|
|
return null; |
|
}, |
|
set(handler) { |
|
for (const listener of this.listeners(method)) { |
|
if (listener[kForOnEventAttribute]) { |
|
this.removeListener(method, listener); |
|
break; |
|
} |
|
} |
|
|
|
if (typeof handler !== 'function') return; |
|
|
|
this.addEventListener(method, handler, { |
|
[kForOnEventAttribute]: true |
|
}); |
|
} |
|
}); |
|
}); |
|
|
|
WebSocket.prototype.addEventListener = addEventListener; |
|
WebSocket.prototype.removeEventListener = removeEventListener; |
|
|
|
module.exports = WebSocket; |
|
|
|
/** |
|
* Initialize a WebSocket client. |
|
* |
|
* @param {WebSocket} websocket The client to initialize |
|
* @param {(String|URL)} address The URL to which to connect |
|
* @param {Array} protocols The subprotocols |
|
* @param {Object} [options] Connection options |
|
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether any |
|
* of the `'message'`, `'ping'`, and `'pong'` events can be emitted multiple |
|
* times in the same tick |
|
* @param {Boolean} [options.autoPong=true] Specifies whether or not to |
|
* automatically send a pong in response to a ping |
|
* @param {Function} [options.finishRequest] A function which can be used to |
|
* customize the headers of each http request before it is sent |
|
* @param {Boolean} [options.followRedirects=false] Whether or not to follow |
|
* redirects |
|
* @param {Function} [options.generateMask] The function used to generate the |
|
* masking key |
|
* @param {Number} [options.handshakeTimeout] Timeout in milliseconds for the |
|
* handshake request |
|
* @param {Number} [options.maxPayload=104857600] The maximum allowed message |
|
* size |
|
* @param {Number} [options.maxRedirects=10] The maximum number of redirects |
|
* allowed |
|
* @param {String} [options.origin] Value of the `Origin` or |
|
* `Sec-WebSocket-Origin` header |
|
* @param {(Boolean|Object)} [options.perMessageDeflate=true] Enable/disable |
|
* permessage-deflate |
|
* @param {Number} [options.protocolVersion=13] Value of the |
|
* `Sec-WebSocket-Version` header |
|
* @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or |
|
* not to skip UTF-8 validation for text and close messages |
|
* @private |
|
*/ |
|
function initAsClient(websocket, address, protocols, options) { |
|
const opts = { |
|
allowSynchronousEvents: false, |
|
autoPong: true, |
|
protocolVersion: protocolVersions[1], |
|
maxPayload: 100 * 1024 * 1024, |
|
skipUTF8Validation: false, |
|
perMessageDeflate: true, |
|
followRedirects: false, |
|
maxRedirects: 10, |
|
...options, |
|
createConnection: undefined, |
|
socketPath: undefined, |
|
hostname: undefined, |
|
protocol: undefined, |
|
timeout: undefined, |
|
method: 'GET', |
|
host: undefined, |
|
path: undefined, |
|
port: undefined |
|
}; |
|
|
|
websocket._autoPong = opts.autoPong; |
|
|
|
if (!protocolVersions.includes(opts.protocolVersion)) { |
|
throw new RangeError( |
|
`Unsupported protocol version: ${opts.protocolVersion} ` + |
|
`(supported versions: ${protocolVersions.join(', ')})` |
|
); |
|
} |
|
|
|
let parsedUrl; |
|
|
|
if (address instanceof URL) { |
|
parsedUrl = address; |
|
} else { |
|
try { |
|
parsedUrl = new URL(address); |
|
} catch (e) { |
|
throw new SyntaxError(`Invalid URL: ${address}`); |
|
} |
|
} |
|
|
|
if (parsedUrl.protocol === 'http:') { |
|
parsedUrl.protocol = 'ws:'; |
|
} else if (parsedUrl.protocol === 'https:') { |
|
parsedUrl.protocol = 'wss:'; |
|
} |
|
|
|
websocket._url = parsedUrl.href; |
|
|
|
const isSecure = parsedUrl.protocol === 'wss:'; |
|
const isIpcUrl = parsedUrl.protocol === 'ws+unix:'; |
|
let invalidUrlMessage; |
|
|
|
if (parsedUrl.protocol !== 'ws:' && !isSecure && !isIpcUrl) { |
|
invalidUrlMessage = |
|
'The URL\'s protocol must be one of "ws:", "wss:", ' + |
|
'"http:", "https", or "ws+unix:"'; |
|
} else if (isIpcUrl && !parsedUrl.pathname) { |
|
invalidUrlMessage = "The URL's pathname is empty"; |
|
} else if (parsedUrl.hash) { |
|
invalidUrlMessage = 'The URL contains a fragment identifier'; |
|
} |
|
|
|
if (invalidUrlMessage) { |
|
const err = new SyntaxError(invalidUrlMessage); |
|
|
|
if (websocket._redirects === 0) { |
|
throw err; |
|
} else { |
|
emitErrorAndClose(websocket, err); |
|
return; |
|
} |
|
} |
|
|
|
const defaultPort = isSecure ? 443 : 80; |
|
const key = randomBytes(16).toString('base64'); |
|
const request = isSecure ? https.request : http.request; |
|
const protocolSet = new Set(); |
|
let perMessageDeflate; |
|
|
|
opts.createConnection = isSecure ? tlsConnect : netConnect; |
|
opts.defaultPort = opts.defaultPort || defaultPort; |
|
opts.port = parsedUrl.port || defaultPort; |
|
opts.host = parsedUrl.hostname.startsWith('[') |
|
? parsedUrl.hostname.slice(1, -1) |
|
: parsedUrl.hostname; |
|
opts.headers = { |
|
...opts.headers, |
|
'Sec-WebSocket-Version': opts.protocolVersion, |
|
'Sec-WebSocket-Key': key, |
|
Connection: 'Upgrade', |
|
Upgrade: 'websocket' |
|
}; |
|
opts.path = parsedUrl.pathname + parsedUrl.search; |
|
opts.timeout = opts.handshakeTimeout; |
|
|
|
if (opts.perMessageDeflate) { |
|
perMessageDeflate = new PerMessageDeflate( |
|
opts.perMessageDeflate !== true ? opts.perMessageDeflate : {}, |
|
false, |
|
opts.maxPayload |
|
); |
|
opts.headers['Sec-WebSocket-Extensions'] = format({ |
|
[PerMessageDeflate.extensionName]: perMessageDeflate.offer() |
|
}); |
|
} |
|
if (protocols.length) { |
|
for (const protocol of protocols) { |
|
if ( |
|
typeof protocol !== 'string' || |
|
!subprotocolRegex.test(protocol) || |
|
protocolSet.has(protocol) |
|
) { |
|
throw new SyntaxError( |
|
'An invalid or duplicated subprotocol was specified' |
|
); |
|
} |
|
|
|
protocolSet.add(protocol); |
|
} |
|
|
|
opts.headers['Sec-WebSocket-Protocol'] = protocols.join(','); |
|
} |
|
if (opts.origin) { |
|
if (opts.protocolVersion < 13) { |
|
opts.headers['Sec-WebSocket-Origin'] = opts.origin; |
|
} else { |
|
opts.headers.Origin = opts.origin; |
|
} |
|
} |
|
if (parsedUrl.username || parsedUrl.password) { |
|
opts.auth = `${parsedUrl.username}:${parsedUrl.password}`; |
|
} |
|
|
|
if (isIpcUrl) { |
|
const parts = opts.path.split(':'); |
|
|
|
opts.socketPath = parts[0]; |
|
opts.path = parts[1]; |
|
} |
|
|
|
let req; |
|
|
|
if (opts.followRedirects) { |
|
if (websocket._redirects === 0) { |
|
websocket._originalIpc = isIpcUrl; |
|
websocket._originalSecure = isSecure; |
|
websocket._originalHostOrSocketPath = isIpcUrl |
|
? opts.socketPath |
|
: parsedUrl.host; |
|
|
|
const headers = options && options.headers; |
|
|
|
// |
|
// Shallow copy the user provided options so that headers can be changed |
|
// without mutating the original object. |
|
// |
|
options = { ...options, headers: {} }; |
|
|
|
if (headers) { |
|
for (const [key, value] of Object.entries(headers)) { |
|
options.headers[key.toLowerCase()] = value; |
|
} |
|
} |
|
} else if (websocket.listenerCount('redirect') === 0) { |
|
const isSameHost = isIpcUrl |
|
? websocket._originalIpc |
|
? opts.socketPath === websocket._originalHostOrSocketPath |
|
: false |
|
: websocket._originalIpc |
|
? false |
|
: parsedUrl.host === websocket._originalHostOrSocketPath; |
|
|
|
if (!isSameHost || (websocket._originalSecure && !isSecure)) { |
|
// |
|
// Match curl 7.77.0 behavior and drop the following headers. These |
|
// headers are also dropped when following a redirect to a subdomain. |
|
// |
|
delete opts.headers.authorization; |
|
delete opts.headers.cookie; |
|
|
|
if (!isSameHost) delete opts.headers.host; |
|
|
|
opts.auth = undefined; |
|
} |
|
} |
|
|
|
// |
|
// Match curl 7.77.0 behavior and make the first `Authorization` header win. |
|
// If the `Authorization` header is set, then there is nothing to do as it |
|
// will take precedence. |
|
// |
|
if (opts.auth && !options.headers.authorization) { |
|
options.headers.authorization = |
|
'Basic ' + Buffer.from(opts.auth).toString('base64'); |
|
} |
|
|
|
req = websocket._req = request(opts); |
|
|
|
if (websocket._redirects) { |
|
// |
|
// Unlike what is done for the `'upgrade'` event, no early exit is |
|
// triggered here if the user calls `websocket.close()` or |
|
// `websocket.terminate()` from a listener of the `'redirect'` event. This |
|
// is because the user can also call `request.destroy()` with an error |
|
// before calling `websocket.close()` or `websocket.terminate()` and this |
|
// would result in an error being emitted on the `request` object with no |
|
// `'error'` event listeners attached. |
|
// |
|
websocket.emit('redirect', websocket.url, req); |
|
} |
|
} else { |
|
req = websocket._req = request(opts); |
|
} |
|
|
|
if (opts.timeout) { |
|
req.on('timeout', () => { |
|
abortHandshake(websocket, req, 'Opening handshake has timed out'); |
|
}); |
|
} |
|
|
|
req.on('error', (err) => { |
|
if (req === null || req[kAborted]) return; |
|
|
|
req = websocket._req = null; |
|
emitErrorAndClose(websocket, err); |
|
}); |
|
|
|
req.on('response', (res) => { |
|
const location = res.headers.location; |
|
const statusCode = res.statusCode; |
|
|
|
if ( |
|
location && |
|
opts.followRedirects && |
|
statusCode >= 300 && |
|
statusCode < 400 |
|
) { |
|
if (++websocket._redirects > opts.maxRedirects) { |
|
abortHandshake(websocket, req, 'Maximum redirects exceeded'); |
|
return; |
|
} |
|
|
|
req.abort(); |
|
|
|
let addr; |
|
|
|
try { |
|
addr = new URL(location, address); |
|
} catch (e) { |
|
const err = new SyntaxError(`Invalid URL: ${location}`); |
|
emitErrorAndClose(websocket, err); |
|
return; |
|
} |
|
|
|
initAsClient(websocket, addr, protocols, options); |
|
} else if (!websocket.emit('unexpected-response', req, res)) { |
|
abortHandshake( |
|
websocket, |
|
req, |
|
`Unexpected server response: ${res.statusCode}` |
|
); |
|
} |
|
}); |
|
|
|
req.on('upgrade', (res, socket, head) => { |
|
websocket.emit('upgrade', res); |
|
|
|
// |
|
// The user may have closed the connection from a listener of the |
|
// `'upgrade'` event. |
|
// |
|
if (websocket.readyState !== WebSocket.CONNECTING) return; |
|
|
|
req = websocket._req = null; |
|
|
|
if (res.headers.upgrade.toLowerCase() !== 'websocket') { |
|
abortHandshake(websocket, socket, 'Invalid Upgrade header'); |
|
return; |
|
} |
|
|
|
const digest = createHash('sha1') |
|
.update(key + GUID) |
|
.digest('base64'); |
|
|
|
if (res.headers['sec-websocket-accept'] !== digest) { |
|
abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header'); |
|
return; |
|
} |
|
|
|
const serverProt = res.headers['sec-websocket-protocol']; |
|
let protError; |
|
|
|
if (serverProt !== undefined) { |
|
if (!protocolSet.size) { |
|
protError = 'Server sent a subprotocol but none was requested'; |
|
} else if (!protocolSet.has(serverProt)) { |
|
protError = 'Server sent an invalid subprotocol'; |
|
} |
|
} else if (protocolSet.size) { |
|
protError = 'Server sent no subprotocol'; |
|
} |
|
|
|
if (protError) { |
|
abortHandshake(websocket, socket, protError); |
|
return; |
|
} |
|
|
|
if (serverProt) websocket._protocol = serverProt; |
|
|
|
const secWebSocketExtensions = res.headers['sec-websocket-extensions']; |
|
|
|
if (secWebSocketExtensions !== undefined) { |
|
if (!perMessageDeflate) { |
|
const message = |
|
'Server sent a Sec-WebSocket-Extensions header but no extension ' + |
|
'was requested'; |
|
abortHandshake(websocket, socket, message); |
|
return; |
|
} |
|
|
|
let extensions; |
|
|
|
try { |
|
extensions = parse(secWebSocketExtensions); |
|
} catch (err) { |
|
const message = 'Invalid Sec-WebSocket-Extensions header'; |
|
abortHandshake(websocket, socket, message); |
|
return; |
|
} |
|
|
|
const extensionNames = Object.keys(extensions); |
|
|
|
if ( |
|
extensionNames.length !== 1 || |
|
extensionNames[0] !== PerMessageDeflate.extensionName |
|
) { |
|
const message = 'Server indicated an extension that was not requested'; |
|
abortHandshake(websocket, socket, message); |
|
return; |
|
} |
|
|
|
try { |
|
perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]); |
|
} catch (err) { |
|
const message = 'Invalid Sec-WebSocket-Extensions header'; |
|
abortHandshake(websocket, socket, message); |
|
return; |
|
} |
|
|
|
websocket._extensions[PerMessageDeflate.extensionName] = |
|
perMessageDeflate; |
|
} |
|
|
|
websocket.setSocket(socket, head, { |
|
allowSynchronousEvents: opts.allowSynchronousEvents, |
|
generateMask: opts.generateMask, |
|
maxPayload: opts.maxPayload, |
|
skipUTF8Validation: opts.skipUTF8Validation |
|
}); |
|
}); |
|
|
|
if (opts.finishRequest) { |
|
opts.finishRequest(req, websocket); |
|
} else { |
|
req.end(); |
|
} |
|
} |
|
|
|
/** |
|
* Emit the `'error'` and `'close'` events. |
|
* |
|
* @param {WebSocket} websocket The WebSocket instance |
|
* @param {Error} The error to emit |
|
* @private |
|
*/ |
|
function emitErrorAndClose(websocket, err) { |
|
websocket._readyState = WebSocket.CLOSING; |
|
websocket.emit('error', err); |
|
websocket.emitClose(); |
|
} |
|
|
|
/** |
|
* Create a `net.Socket` and initiate a connection. |
|
* |
|
* @param {Object} options Connection options |
|
* @return {net.Socket} The newly created socket used to start the connection |
|
* @private |
|
*/ |
|
function netConnect(options) { |
|
options.path = options.socketPath; |
|
return net.connect(options); |
|
} |
|
|
|
/** |
|
* Create a `tls.TLSSocket` and initiate a connection. |
|
* |
|
* @param {Object} options Connection options |
|
* @return {tls.TLSSocket} The newly created socket used to start the connection |
|
* @private |
|
*/ |
|
function tlsConnect(options) { |
|
options.path = undefined; |
|
|
|
if (!options.servername && options.servername !== '') { |
|
options.servername = net.isIP(options.host) ? '' : options.host; |
|
} |
|
|
|
return tls.connect(options); |
|
} |
|
|
|
/** |
|
* Abort the handshake and emit an error. |
|
* |
|
* @param {WebSocket} websocket The WebSocket instance |
|
* @param {(http.ClientRequest|net.Socket|tls.Socket)} stream The request to |
|
* abort or the socket to destroy |
|
* @param {String} message The error message |
|
* @private |
|
*/ |
|
function abortHandshake(websocket, stream, message) { |
|
websocket._readyState = WebSocket.CLOSING; |
|
|
|
const err = new Error(message); |
|
Error.captureStackTrace(err, abortHandshake); |
|
|
|
if (stream.setHeader) { |
|
stream[kAborted] = true; |
|
stream.abort(); |
|
|
|
if (stream.socket && !stream.socket.destroyed) { |
|
// |
|
// On Node.js >= 14.3.0 `request.abort()` does not destroy the socket if |
|
// called after the request completed. See |
|
// https://github.com/websockets/ws/issues/1869. |
|
// |
|
stream.socket.destroy(); |
|
} |
|
|
|
process.nextTick(emitErrorAndClose, websocket, err); |
|
} else { |
|
stream.destroy(err); |
|
stream.once('error', websocket.emit.bind(websocket, 'error')); |
|
stream.once('close', websocket.emitClose.bind(websocket)); |
|
} |
|
} |
|
|
|
/** |
|
* Handle cases where the `ping()`, `pong()`, or `send()` methods are called |
|
* when the `readyState` attribute is `CLOSING` or `CLOSED`. |
|
* |
|
* @param {WebSocket} websocket The WebSocket instance |
|
* @param {*} [data] The data to send |
|
* @param {Function} [cb] Callback |
|
* @private |
|
*/ |
|
function sendAfterClose(websocket, data, cb) { |
|
if (data) { |
|
const length = toBuffer(data).length; |
|
|
|
// |
|
// The `_bufferedAmount` property is used only when the peer is a client and |
|
// the opening handshake fails. Under these circumstances, in fact, the |
|
// `setSocket()` method is not called, so the `_socket` and `_sender` |
|
// properties are set to `null`. |
|
// |
|
if (websocket._socket) websocket._sender._bufferedBytes += length; |
|
else websocket._bufferedAmount += length; |
|
} |
|
|
|
if (cb) { |
|
const err = new Error( |
|
`WebSocket is not open: readyState ${websocket.readyState} ` + |
|
`(${readyStates[websocket.readyState]})` |
|
); |
|
process.nextTick(cb, err); |
|
} |
|
} |
|
|
|
/** |
|
* The listener of the `Receiver` `'conclude'` event. |
|
* |
|
* @param {Number} code The status code |
|
* @param {Buffer} reason The reason for closing |
|
* @private |
|
*/ |
|
function receiverOnConclude(code, reason) { |
|
const websocket = this[kWebSocket]; |
|
|
|
websocket._closeFrameReceived = true; |
|
websocket._closeMessage = reason; |
|
websocket._closeCode = code; |
|
|
|
if (websocket._socket[kWebSocket] === undefined) return; |
|
|
|
websocket._socket.removeListener('data', socketOnData); |
|
process.nextTick(resume, websocket._socket); |
|
|
|
if (code === 1005) websocket.close(); |
|
else websocket.close(code, reason); |
|
} |
|
|
|
/** |
|
* The listener of the `Receiver` `'drain'` event. |
|
* |
|
* @private |
|
*/ |
|
function receiverOnDrain() { |
|
const websocket = this[kWebSocket]; |
|
|
|
if (!websocket.isPaused) websocket._socket.resume(); |
|
} |
|
|
|
/** |
|
* The listener of the `Receiver` `'error'` event. |
|
* |
|
* @param {(RangeError|Error)} err The emitted error |
|
* @private |
|
*/ |
|
function receiverOnError(err) { |
|
const websocket = this[kWebSocket]; |
|
|
|
if (websocket._socket[kWebSocket] !== undefined) { |
|
websocket._socket.removeListener('data', socketOnData); |
|
|
|
// |
|
// On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See |
|
// https://github.com/websockets/ws/issues/1940. |
|
// |
|
process.nextTick(resume, websocket._socket); |
|
|
|
websocket.close(err[kStatusCode]); |
|
} |
|
|
|
websocket.emit('error', err); |
|
} |
|
|
|
/** |
|
* The listener of the `Receiver` `'finish'` event. |
|
* |
|
* @private |
|
*/ |
|
function receiverOnFinish() { |
|
this[kWebSocket].emitClose(); |
|
} |
|
|
|
/** |
|
* The listener of the `Receiver` `'message'` event. |
|
* |
|
* @param {Buffer|ArrayBuffer|Buffer[])} data The message |
|
* @param {Boolean} isBinary Specifies whether the message is binary or not |
|
* @private |
|
*/ |
|
function receiverOnMessage(data, isBinary) { |
|
this[kWebSocket].emit('message', data, isBinary); |
|
} |
|
|
|
/** |
|
* The listener of the `Receiver` `'ping'` event. |
|
* |
|
* @param {Buffer} data The data included in the ping frame |
|
* @private |
|
*/ |
|
function receiverOnPing(data) { |
|
const websocket = this[kWebSocket]; |
|
|
|
if (websocket._autoPong) websocket.pong(data, !this._isServer, NOOP); |
|
websocket.emit('ping', data); |
|
} |
|
|
|
/** |
|
* The listener of the `Receiver` `'pong'` event. |
|
* |
|
* @param {Buffer} data The data included in the pong frame |
|
* @private |
|
*/ |
|
function receiverOnPong(data) { |
|
this[kWebSocket].emit('pong', data); |
|
} |
|
|
|
/** |
|
* Resume a readable stream |
|
* |
|
* @param {Readable} stream The readable stream |
|
* @private |
|
*/ |
|
function resume(stream) { |
|
stream.resume(); |
|
} |
|
|
|
/** |
|
* The listener of the socket `'close'` event. |
|
* |
|
* @private |
|
*/ |
|
function socketOnClose() { |
|
const websocket = this[kWebSocket]; |
|
|
|
this.removeListener('close', socketOnClose); |
|
this.removeListener('data', socketOnData); |
|
this.removeListener('end', socketOnEnd); |
|
|
|
websocket._readyState = WebSocket.CLOSING; |
|
|
|
let chunk; |
|
|
|
// |
|
// The close frame might not have been received or the `'end'` event emitted, |
|
// for example, if the socket was destroyed due to an error. Ensure that the |
|
// `receiver` stream is closed after writing any remaining buffered data to |
|
// it. If the readable side of the socket is in flowing mode then there is no |
|
// buffered data as everything has been already written and `readable.read()` |
|
// will return `null`. If instead, the socket is paused, any possible buffered |
|
// data will be read as a single chunk. |
|
// |
|
if ( |
|
!this._readableState.endEmitted && |
|
!websocket._closeFrameReceived && |
|
!websocket._receiver._writableState.errorEmitted && |
|
(chunk = websocket._socket.read()) !== null |
|
) { |
|
websocket._receiver.write(chunk); |
|
} |
|
|
|
websocket._receiver.end(); |
|
|
|
this[kWebSocket] = undefined; |
|
|
|
clearTimeout(websocket._closeTimer); |
|
|
|
if ( |
|
websocket._receiver._writableState.finished || |
|
websocket._receiver._writableState.errorEmitted |
|
) { |
|
websocket.emitClose(); |
|
} else { |
|
websocket._receiver.on('error', receiverOnFinish); |
|
websocket._receiver.on('finish', receiverOnFinish); |
|
} |
|
} |
|
|
|
/** |
|
* The listener of the socket `'data'` event. |
|
* |
|
* @param {Buffer} chunk A chunk of data |
|
* @private |
|
*/ |
|
function socketOnData(chunk) { |
|
if (!this[kWebSocket]._receiver.write(chunk)) { |
|
this.pause(); |
|
} |
|
} |
|
|
|
/** |
|
* The listener of the socket `'end'` event. |
|
* |
|
* @private |
|
*/ |
|
function socketOnEnd() { |
|
const websocket = this[kWebSocket]; |
|
|
|
websocket._readyState = WebSocket.CLOSING; |
|
websocket._receiver.end(); |
|
this.end(); |
|
} |
|
|
|
/** |
|
* The listener of the socket `'error'` event. |
|
* |
|
* @private |
|
*/ |
|
function socketOnError() { |
|
const websocket = this[kWebSocket]; |
|
|
|
this.removeListener('error', socketOnError); |
|
this.on('error', NOOP); |
|
|
|
if (websocket) { |
|
websocket._readyState = WebSocket.CLOSING; |
|
this.destroy(); |
|
} |
|
}
|
|
|