Odoo GraphQL Subscription using Node, Express JS for Sample
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

122 lines
6.0 KiB

import { handleProtocols, makeServer } from '../../server.mjs';
import { DEPRECATED_GRAPHQL_WS_PROTOCOL, CloseCode, } from '../../common.mjs';
import { limitCloseReason } from '../../utils.mjs';
/**
* Make a handler to use on a [@fastify/websocket](https://github.com/fastify/fastify-websocket) route.
* This is a basic starter, feel free to copy the code over and adjust it to your needs
*
* @category Server/@fastify/websocket
*/
export function makeHandler(options,
/**
* The timout between dispatched keep-alive messages. Internally uses the [ws Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/wss_API/Writing_ws_servers#Pings_and_Pongs_The_Heartbeat_of_wss))
* to check that the link between the clients and the server is operating and to prevent the link
* from being broken due to idling.
*
* @default 12_000 // 12 seconds
*/
keepAlive = 12000) {
const isProd = process.env.NODE_ENV === 'production';
const server = makeServer(options);
// we dont have access to the fastify-websocket server instance yet,
// register an error handler on first connection ONCE only
let handlingServerEmittedErrors = false;
return function handler(connection, request) {
const { socket } = connection;
// might be too late, but meh
this.websocketServer.options.handleProtocols = handleProtocols;
// handle server emitted errors only if not already handling
if (!handlingServerEmittedErrors) {
handlingServerEmittedErrors = true;
this.websocketServer.once('error', (err) => {
console.error('Internal error emitted on the WebSocket server. ' +
'Please check your implementation.', err);
// catch the first thrown error and re-throw it once all clients have been notified
let firstErr = null;
// report server errors by erroring out all clients with the same error
for (const client of this.websocketServer.clients) {
try {
client.close(CloseCode.InternalServerError, isProd
? 'Internal server error'
: limitCloseReason(err instanceof Error ? err.message : String(err), 'Internal server error'));
}
catch (err) {
firstErr = firstErr !== null && firstErr !== void 0 ? firstErr : err;
}
}
if (firstErr)
throw firstErr;
});
}
// used as listener on two streams, prevent superfluous calls on close
let emittedErrorHandled = false;
function handleEmittedError(err) {
if (emittedErrorHandled)
return;
emittedErrorHandled = true;
console.error('Internal error emitted on a WebSocket socket. ' +
'Please check your implementation.', err);
socket.close(CloseCode.InternalServerError, isProd
? 'Internal server error'
: limitCloseReason(err instanceof Error ? err.message : String(err), 'Internal server error'));
}
// fastify-websocket uses the WebSocket.createWebSocketStream,
// therefore errors get emitted on both the connection and the socket
connection.once('error', handleEmittedError);
socket.once('error', handleEmittedError);
// keep alive through ping-pong messages
let pongWait = null;
const pingInterval = keepAlive > 0 && isFinite(keepAlive)
? setInterval(() => {
// ping pong on open sockets only
if (socket.readyState === socket.OPEN) {
// terminate the connection after pong wait has passed because the client is idle
pongWait = setTimeout(() => {
socket.terminate();
}, keepAlive);
// listen for client's pong and stop socket termination
socket.once('pong', () => {
if (pongWait) {
clearTimeout(pongWait);
pongWait = null;
}
});
socket.ping();
}
}, keepAlive)
: null;
const closed = server.opened({
protocol: socket.protocol,
send: (data) => new Promise((resolve, reject) => {
if (socket.readyState !== socket.OPEN)
return resolve();
socket.send(data, (err) => (err ? reject(err) : resolve()));
}),
close: (code, reason) => socket.close(code, reason),
onMessage: (cb) => socket.on('message', async (event) => {
try {
await cb(String(event));
}
catch (err) {
console.error('Internal error occurred during message handling. ' +
'Please check your implementation.', err);
socket.close(CloseCode.InternalServerError, isProd
? 'Internal server error'
: limitCloseReason(err instanceof Error ? err.message : String(err), 'Internal server error'));
}
}),
}, { connection, request });
socket.once('close', (code, reason) => {
if (pongWait)
clearTimeout(pongWait);
if (pingInterval)
clearInterval(pingInterval);
if (!isProd &&
code === CloseCode.SubprotocolNotAcceptable &&
socket.protocol === DEPRECATED_GRAPHQL_WS_PROTOCOL)
console.warn(`Client provided the unsupported and deprecated subprotocol "${socket.protocol}" used by subscriptions-transport-ws.` +
'Please see https://www.apollographql.com/docs/apollo-server/data/subscriptions/#switching-from-subscriptions-transport-ws.');
closed(code, String(reason));
});
};
}