From bb80325d2cf8e653cb8fc7f8b40e0a64c6b3c071 Mon Sep 17 00:00:00 2001 From: Ray Bellis Date: Wed, 30 Jan 2019 15:27:42 +0000 Subject: [PATCH] PadMessageHandler.js: completed conversion --- src/node/handler/ImportHandler.js | 6 +- src/node/handler/PadMessageHandler.js | 297 ++++++++++---------------- 2 files changed, 115 insertions(+), 188 deletions(-) diff --git a/src/node/handler/ImportHandler.js b/src/node/handler/ImportHandler.js index ec07bf6e5..e92df4434 100644 --- a/src/node/handler/ImportHandler.js +++ b/src/node/handler/ImportHandler.js @@ -289,9 +289,9 @@ exports.doImport = function(req, res, padId) return; } - padMessageHandler.updatePadClients(pad, function(){ - callback(); - }); + // @TODO: not waiting for updatePadClients to finish + padMessageHandler.updatePadClients(pad); + callback(); }); }, diff --git a/src/node/handler/PadMessageHandler.js b/src/node/handler/PadMessageHandler.js index 25f9879bc..b41af86b9 100644 --- a/src/node/handler/PadMessageHandler.js +++ b/src/node/handler/PadMessageHandler.js @@ -19,8 +19,6 @@ */ -var ERR = require("async-stacktrace"); -var async = require("async"); var padManager = require("../db/PadManager"); var Changeset = require("ep_etherpad-lite/static/js/Changeset"); var AttributePool = require("ep_etherpad-lite/static/js/AttributePool"); @@ -38,7 +36,6 @@ var hooks = require("ep_etherpad-lite/static/js/pluginfw/hooks.js"); var channels = require("channels"); var stats = require('../stats'); var remoteAddress = require("../utils/RemoteAddress").remoteAddress; -const thenify = require("thenify").withCallback; const nodeify = require("nodeify"); /** @@ -62,16 +59,14 @@ stats.gauge('totalUsers', function() { /** * A changeset queue per pad that is processed by handleUserChanges() */ -var padChannels = new channels.channels(handleUserChangesCB); - -function handleUserChangesCB(data, callback) { +var padChannels = new channels.channels(function(data, callback) { return nodeify(handleUserChanges(data), callback); -} +}); /** * Saves the Socket class we need to send and receive data from the client */ -var socketio; +let socketio; /** * This Method is called by server.js to tell the message handler on which socket it should send @@ -115,17 +110,17 @@ exports.kickSessionsFromPad = function(padID) * Handles the disconnection of a user * @param client the client that leaves */ -exports.handleDisconnect = function(client) +exports.handleDisconnect = async function(client) { stats.meter('disconnects').mark(); // save the padname of this session - var session = sessioninfos[client.id]; + let session = sessioninfos[client.id]; // if this connection was already etablished with a handshake, send a disconnect message to the others if (session && session.author) { // Get the IP address from our persistant object - var ip = remoteAddress[client.id]; + let ip = remoteAddress[client.id]; // Anonymize the IP address if IP logging is disabled if (settings.disableIPlogging) { @@ -135,29 +130,27 @@ exports.handleDisconnect = function(client) accessLogger.info('[LEAVE] Pad "' + session.padId + '": Author "' + session.author + '" on client ' + client.id + ' with IP "' + ip + '" left the pad'); // get the author color out of the db - authorManager.getAuthorColorId(session.author, function(err, color) { - ERR(err); + let color = await authorManager.getAuthorColorId(session.author); - // prepare the notification for the other users on the pad, that this user left - var messageToTheOtherUsers = { - "type": "COLLABROOM", - "data": { - type: "USER_LEAVE", - userInfo: { - "ip": "127.0.0.1", - "colorId": color, - "userAgent": "Anonymous", - "userId": session.author - } + // prepare the notification for the other users on the pad, that this user left + let messageToTheOtherUsers = { + "type": "COLLABROOM", + "data": { + type: "USER_LEAVE", + userInfo: { + "ip": "127.0.0.1", + "colorId": color, + "userAgent": "Anonymous", + "userId": session.author } - }; + } + }; - // Go through all user that are still on the pad, and send them the USER_LEAVE message - client.broadcast.to(session.padId).json.send(messageToTheOtherUsers); + // Go through all user that are still on the pad, and send them the USER_LEAVE message + client.broadcast.to(session.padId).json.send(messageToTheOtherUsers); - // Allow plugins to hook into users leaving the pad - hooks.callAll("userLeave", session); - }); + // Allow plugins to hook into users leaving the pad + hooks.callAll("userLeave", session); } // Delete the sessioninfos entrys of this session @@ -179,7 +172,7 @@ exports.handleMessage = async function(client, message) return; } - var thisSession = sessioninfos[client.id]; + let thisSession = sessioninfos[client.id]; if (!thisSession) { messageLogger.warn("Dropped message from an unknown connection.") @@ -248,7 +241,7 @@ exports.handleMessage = async function(client, message) /* * In a previous version of this code, an "if (message)" wrapped the - * following async.series(). + * following series of async calls [now replaced with await calls] * This ugly "!Boolean(message)" is a lame way to exactly negate the truthy * condition and replace it with an early return, while being sure to leave * the original behaviour unchanged. @@ -310,15 +303,13 @@ exports.handleMessage = async function(client, message) * @param client the client that send this message * @param message the message from the client */ -function handleSaveRevisionMessage(client, message){ +async function handleSaveRevisionMessage(client, message) +{ var padId = sessioninfos[client.id].padId; var userId = sessioninfos[client.id].author; - padManager.getPad(padId, function(err, pad) { - if (ERR(err)) return; - - pad.addSavedRevision(pad.head, userId); - }); + let pad = await padManager.getPad(padId); + pad.addSavedRevision(pad.head, userId); } /** @@ -328,7 +319,7 @@ function handleSaveRevisionMessage(client, message){ * @param msg {Object} the message we're sending * @param sessionID {string} the socketIO session to which we're sending this message */ -exports.handleCustomObjectMessage = async function(msg, sessionID) { +exports.handleCustomObjectMessage = function(msg, sessionID) { if (msg.data.type === "CUSTOM") { if (sessionID){ // a sessionID is targeted: directly to this sessionID @@ -346,9 +337,9 @@ exports.handleCustomObjectMessage = async function(msg, sessionID) { * @param padID {Pad} the pad to which we're sending this message * @param msgString {String} the message we're sending */ -exports.handleCustomMessage = thenify(function(padID, msgString, cb) { - var time = Date.now(); - var msg = { +exports.handleCustomMessage = function(padID, msgString) { + let time = Date.now(); + let msg = { type: 'COLLABROOM', data: { type: msgString, @@ -356,9 +347,7 @@ exports.handleCustomMessage = thenify(function(padID, msgString, cb) { } }; socketio.sockets.in(padID).json.send(msg); - - cb(null, {}); -}); +} /** * Handles a Chat Message @@ -382,55 +371,24 @@ function handleChatMessage(client, message) * @param text the text of the chat message * @param padId the padId to send the chat message to */ -exports.sendChatMessageToPadClients = function(time, userId, text, padId) { - var pad; - var userName; +exports.sendChatMessageToPadClients = async function(time, userId, text, padId) +{ + // get the pad + let pad = await padManager.getPad(padId); - async.series([ - // get the pad - function(callback) { - padManager.getPad(padId, function(err, _pad) { - if (ERR(err, callback)) return; + // get the author + let userName = await authorManager.getAuthorName(userId); - pad = _pad; - callback(); - }); - }, + // save the chat message + pad.appendChatMessage(text, userId, time); - function(callback) { - authorManager.getAuthorName(userId, function(err, _userName) { - if (ERR(err, callback)) return; + let msg = { + type: "COLLABROOM", + data: { type: "CHAT_MESSAGE", userId, userName, time, text } + }; - userName = _userName; - callback(); - }); - }, - - // save the chat message and broadcast it - function(callback) { - // save the chat message - pad.appendChatMessage(text, userId, time); - - var msg = { - type: "COLLABROOM", - data: { - type: "CHAT_MESSAGE", - userId: userId, - userName: userName, - time: time, - text: text - } - }; - - // broadcast the chat message to everyone on the pad - socketio.sockets.in(padId).json.send(msg); - - callback(); - } - ], - function(err) { - ERR(err); - }); + // broadcast the chat message to everyone on the pad + socketio.sockets.in(padId).json.send(msg); } /** @@ -438,7 +396,7 @@ exports.sendChatMessageToPadClients = function(time, userId, text, padId) { * @param client the client that send this message * @param message the message from the client */ -function handleGetChatMessages(client, message) +async function handleGetChatMessages(client, message) { if (message.data.start == null) { messageLogger.warn("Dropped message, GetChatMessages Message has no start!"); @@ -450,45 +408,29 @@ function handleGetChatMessages(client, message) return; } - var start = message.data.start; - var end = message.data.end; - var count = end - start; + let start = message.data.start; + let end = message.data.end; + let count = end - start; if (count < 0 || count > 100) { messageLogger.warn("Dropped message, GetChatMessages Message, client requested invalid amount of messages!"); return; } - var padId = sessioninfos[client.id].padId; - var pad; + let padId = sessioninfos[client.id].padId; + let pad = await padManager.getPad(padId); - async.series([ - // get the pad - function(callback) { - padManager.getPad(padId, function(err, _pad) { - if (ERR(err, callback)) return; + let chatMessages = await pad.getChatMessages(start, end); + let infoMsg = { + type: "COLLABROOM", + data: { + type: "CHAT_MESSAGES", + messages: chatMessages + } + }; - pad = _pad; - callback(); - }); - }, - - function(callback) { - pad.getChatMessages(start, end, function(err, chatMessages) { - if (ERR(err, callback)) return; - - var infoMsg = { - type: "COLLABROOM", - data: { - type: "CHAT_MESSAGES", - messages: chatMessages - } - }; - - // send the messages back to the client - client.json.send(infoMsg); - }); - }]); + // send the messages back to the client + client.json.send(infoMsg); } /** @@ -763,13 +705,13 @@ async function handleUserChanges(data) stopWatch.end(); } -exports.updatePadClients = thenify(function(pad, callback) +exports.updatePadClients = async function(pad) { // skip this if no-one is on this pad - var roomClients = _getRoomClients(pad.id); + let roomClients = _getRoomClients(pad.id); if (roomClients.length == 0) { - return callback(); + return; } // since all clients usually get the same set of changesets, store them in local cache @@ -778,69 +720,54 @@ exports.updatePadClients = thenify(function(pad, callback) // BEFORE first result will be landed to our cache object. The solution is to replace parallel processing // via async.forEach with sequential for() loop. There is no real benefits of running this in parallel, // but benefit of reusing cached revision object is HUGE - var revCache = {}; + let revCache = {}; // go through all sessions on this pad - async.forEach(roomClients, function(client, callback){ - var sid = client.id; - // https://github.com/caolan/async#whilst + for (let client of roomClients) { + let sid = client.id; + // send them all new changesets - async.whilst( - function() { return sessioninfos[sid] && sessioninfos[sid].rev < pad.getHeadRevisionNumber()}, - function(callback) - { - var r = sessioninfos[sid].rev + 1; + while (sessioninfos[sid] && sessioninfos[sid].rev < pad.getHeadRevisionNumber()) { + let r = sessioninfos[sid].rev + 1; + let revision = revCache[r]; + if (!revision) { + revision = await pad.getRevision(r); + revCache[r] = revision; + } - async.waterfall([ - function(callback) { - if(revCache[r]) { - callback(null, revCache[r]); - } else { - pad.getRevision(r, callback); - } - }, + let author = revision.meta.author, + revChangeset = revision.changeset, + currentTime = revision.meta.timestamp; - function(revision, callback) { - revCache[r] = revision; + // next if session has not been deleted + if (sessioninfos[sid] == null) { + continue; + } - var author = revision.meta.author, - revChangeset = revision.changeset, - currentTime = revision.meta.timestamp; + if (author == sessioninfos[sid].author) { + client.json.send({ "type": "COLLABROOM", "data":{ type: "ACCEPT_COMMIT", newRev: r }}); + } else { + let forWire = Changeset.prepareForWire(revChangeset, pad.pool); + let wireMsg = {"type": "COLLABROOM", + "data": { type:"NEW_CHANGES", + newRev:r, + changeset: forWire.translated, + apool: forWire.pool, + author: author, + currentTime: currentTime, + timeDelta: currentTime - sessioninfos[sid].time + }}; - // next if session has not been deleted - if (sessioninfos[sid] == null) { - return callback(null); - } + client.json.send(wireMsg); + } - if (author == sessioninfos[sid].author) { - client.json.send({"type":"COLLABROOM","data":{type:"ACCEPT_COMMIT", newRev:r}}); - } else { - var forWire = Changeset.prepareForWire(revChangeset, pad.pool); - var wireMsg = {"type":"COLLABROOM", - "data":{type:"NEW_CHANGES", - newRev:r, - changeset: forWire.translated, - apool: forWire.pool, - author: author, - currentTime: currentTime, - timeDelta: currentTime - sessioninfos[sid].time - }}; - - client.json.send(wireMsg); - } - - if (sessioninfos[sid]) { - sessioninfos[sid].time = currentTime; - sessioninfos[sid].rev = r; - } - callback(null); - } - ], callback); - }, - callback - ); - },callback); -}); + if (sessioninfos[sid]) { + sessioninfos[sid].time = currentTime; + sessioninfos[sid].rev = r; + } + } + } +} /** * Copied from the Etherpad Source Code. Don't know what this method does excatly... @@ -893,12 +820,12 @@ function _correctMarkersInPad(atext, apool) { function handleSwitchToPad(client, message) { // clear the session and leave the room - var currentSession = sessioninfos[client.id]; - var padId = currentSession.padId; - var roomClients = _getRoomClients(padId); + let currentSession = sessioninfos[client.id]; + let padId = currentSession.padId; + let roomClients = _getRoomClients(padId); - async.forEach(roomClients, function(client, callback) { - var sinfo = sessioninfos[client.id]; + roomClients.forEach(client => { + let sinfo = sessioninfos[client.id]; if (sinfo && sinfo.author == currentSession.author) { // fix user's counter, works on page refresh or if user closes browser window and then rejoins sessioninfos[client.id] = {};