PadMessageHandler.js: completed conversion

This commit is contained in:
Ray Bellis 2019-01-30 15:27:42 +00:00
parent 9246a1de26
commit bb80325d2c
2 changed files with 115 additions and 188 deletions

View file

@ -289,9 +289,9 @@ exports.doImport = function(req, res, padId)
return;
}
padMessageHandler.updatePadClients(pad, function(){
callback();
});
// @TODO: not waiting for updatePadClients to finish
padMessageHandler.updatePadClients(pad);
callback();
});
},

View file

@ -19,8 +19,6 @@
*/
var ERR = require("async-stacktrace");
var async = require("async");
var padManager = require("../db/PadManager");
var Changeset = require("ep_etherpad-lite/static/js/Changeset");
var AttributePool = require("ep_etherpad-lite/static/js/AttributePool");
@ -38,7 +36,6 @@ var hooks = require("ep_etherpad-lite/static/js/pluginfw/hooks.js");
var channels = require("channels");
var stats = require('../stats');
var remoteAddress = require("../utils/RemoteAddress").remoteAddress;
const thenify = require("thenify").withCallback;
const nodeify = require("nodeify");
/**
@ -62,16 +59,14 @@ stats.gauge('totalUsers', function() {
/**
* A changeset queue per pad that is processed by handleUserChanges()
*/
var padChannels = new channels.channels(handleUserChangesCB);
function handleUserChangesCB(data, callback) {
var padChannels = new channels.channels(function(data, callback) {
return nodeify(handleUserChanges(data), callback);
}
});
/**
* Saves the Socket class we need to send and receive data from the client
*/
var socketio;
let socketio;
/**
* This Method is called by server.js to tell the message handler on which socket it should send
@ -115,17 +110,17 @@ exports.kickSessionsFromPad = function(padID)
* Handles the disconnection of a user
* @param client the client that leaves
*/
exports.handleDisconnect = function(client)
exports.handleDisconnect = async function(client)
{
stats.meter('disconnects').mark();
// save the padname of this session
var session = sessioninfos[client.id];
let session = sessioninfos[client.id];
// if this connection was already etablished with a handshake, send a disconnect message to the others
if (session && session.author) {
// Get the IP address from our persistant object
var ip = remoteAddress[client.id];
let ip = remoteAddress[client.id];
// Anonymize the IP address if IP logging is disabled
if (settings.disableIPlogging) {
@ -135,29 +130,27 @@ exports.handleDisconnect = function(client)
accessLogger.info('[LEAVE] Pad "' + session.padId + '": Author "' + session.author + '" on client ' + client.id + ' with IP "' + ip + '" left the pad');
// get the author color out of the db
authorManager.getAuthorColorId(session.author, function(err, color) {
ERR(err);
let color = await authorManager.getAuthorColorId(session.author);
// prepare the notification for the other users on the pad, that this user left
var messageToTheOtherUsers = {
"type": "COLLABROOM",
"data": {
type: "USER_LEAVE",
userInfo: {
"ip": "127.0.0.1",
"colorId": color,
"userAgent": "Anonymous",
"userId": session.author
}
// prepare the notification for the other users on the pad, that this user left
let messageToTheOtherUsers = {
"type": "COLLABROOM",
"data": {
type: "USER_LEAVE",
userInfo: {
"ip": "127.0.0.1",
"colorId": color,
"userAgent": "Anonymous",
"userId": session.author
}
};
}
};
// Go through all user that are still on the pad, and send them the USER_LEAVE message
client.broadcast.to(session.padId).json.send(messageToTheOtherUsers);
// Go through all user that are still on the pad, and send them the USER_LEAVE message
client.broadcast.to(session.padId).json.send(messageToTheOtherUsers);
// Allow plugins to hook into users leaving the pad
hooks.callAll("userLeave", session);
});
// Allow plugins to hook into users leaving the pad
hooks.callAll("userLeave", session);
}
// Delete the sessioninfos entrys of this session
@ -179,7 +172,7 @@ exports.handleMessage = async function(client, message)
return;
}
var thisSession = sessioninfos[client.id];
let thisSession = sessioninfos[client.id];
if (!thisSession) {
messageLogger.warn("Dropped message from an unknown connection.")
@ -248,7 +241,7 @@ exports.handleMessage = async function(client, message)
/*
* In a previous version of this code, an "if (message)" wrapped the
* following async.series().
* following series of async calls [now replaced with await calls]
* This ugly "!Boolean(message)" is a lame way to exactly negate the truthy
* condition and replace it with an early return, while being sure to leave
* the original behaviour unchanged.
@ -310,15 +303,13 @@ exports.handleMessage = async function(client, message)
* @param client the client that send this message
* @param message the message from the client
*/
function handleSaveRevisionMessage(client, message){
async function handleSaveRevisionMessage(client, message)
{
var padId = sessioninfos[client.id].padId;
var userId = sessioninfos[client.id].author;
padManager.getPad(padId, function(err, pad) {
if (ERR(err)) return;
pad.addSavedRevision(pad.head, userId);
});
let pad = await padManager.getPad(padId);
pad.addSavedRevision(pad.head, userId);
}
/**
@ -328,7 +319,7 @@ function handleSaveRevisionMessage(client, message){
* @param msg {Object} the message we're sending
* @param sessionID {string} the socketIO session to which we're sending this message
*/
exports.handleCustomObjectMessage = async function(msg, sessionID) {
exports.handleCustomObjectMessage = function(msg, sessionID) {
if (msg.data.type === "CUSTOM") {
if (sessionID){
// a sessionID is targeted: directly to this sessionID
@ -346,9 +337,9 @@ exports.handleCustomObjectMessage = async function(msg, sessionID) {
* @param padID {Pad} the pad to which we're sending this message
* @param msgString {String} the message we're sending
*/
exports.handleCustomMessage = thenify(function(padID, msgString, cb) {
var time = Date.now();
var msg = {
exports.handleCustomMessage = function(padID, msgString) {
let time = Date.now();
let msg = {
type: 'COLLABROOM',
data: {
type: msgString,
@ -356,9 +347,7 @@ exports.handleCustomMessage = thenify(function(padID, msgString, cb) {
}
};
socketio.sockets.in(padID).json.send(msg);
cb(null, {});
});
}
/**
* Handles a Chat Message
@ -382,55 +371,24 @@ function handleChatMessage(client, message)
* @param text the text of the chat message
* @param padId the padId to send the chat message to
*/
exports.sendChatMessageToPadClients = function(time, userId, text, padId) {
var pad;
var userName;
exports.sendChatMessageToPadClients = async function(time, userId, text, padId)
{
// get the pad
let pad = await padManager.getPad(padId);
async.series([
// get the pad
function(callback) {
padManager.getPad(padId, function(err, _pad) {
if (ERR(err, callback)) return;
// get the author
let userName = await authorManager.getAuthorName(userId);
pad = _pad;
callback();
});
},
// save the chat message
pad.appendChatMessage(text, userId, time);
function(callback) {
authorManager.getAuthorName(userId, function(err, _userName) {
if (ERR(err, callback)) return;
let msg = {
type: "COLLABROOM",
data: { type: "CHAT_MESSAGE", userId, userName, time, text }
};
userName = _userName;
callback();
});
},
// save the chat message and broadcast it
function(callback) {
// save the chat message
pad.appendChatMessage(text, userId, time);
var msg = {
type: "COLLABROOM",
data: {
type: "CHAT_MESSAGE",
userId: userId,
userName: userName,
time: time,
text: text
}
};
// broadcast the chat message to everyone on the pad
socketio.sockets.in(padId).json.send(msg);
callback();
}
],
function(err) {
ERR(err);
});
// broadcast the chat message to everyone on the pad
socketio.sockets.in(padId).json.send(msg);
}
/**
@ -438,7 +396,7 @@ exports.sendChatMessageToPadClients = function(time, userId, text, padId) {
* @param client the client that send this message
* @param message the message from the client
*/
function handleGetChatMessages(client, message)
async function handleGetChatMessages(client, message)
{
if (message.data.start == null) {
messageLogger.warn("Dropped message, GetChatMessages Message has no start!");
@ -450,45 +408,29 @@ function handleGetChatMessages(client, message)
return;
}
var start = message.data.start;
var end = message.data.end;
var count = end - start;
let start = message.data.start;
let end = message.data.end;
let count = end - start;
if (count < 0 || count > 100) {
messageLogger.warn("Dropped message, GetChatMessages Message, client requested invalid amount of messages!");
return;
}
var padId = sessioninfos[client.id].padId;
var pad;
let padId = sessioninfos[client.id].padId;
let pad = await padManager.getPad(padId);
async.series([
// get the pad
function(callback) {
padManager.getPad(padId, function(err, _pad) {
if (ERR(err, callback)) return;
let chatMessages = await pad.getChatMessages(start, end);
let infoMsg = {
type: "COLLABROOM",
data: {
type: "CHAT_MESSAGES",
messages: chatMessages
}
};
pad = _pad;
callback();
});
},
function(callback) {
pad.getChatMessages(start, end, function(err, chatMessages) {
if (ERR(err, callback)) return;
var infoMsg = {
type: "COLLABROOM",
data: {
type: "CHAT_MESSAGES",
messages: chatMessages
}
};
// send the messages back to the client
client.json.send(infoMsg);
});
}]);
// send the messages back to the client
client.json.send(infoMsg);
}
/**
@ -763,13 +705,13 @@ async function handleUserChanges(data)
stopWatch.end();
}
exports.updatePadClients = thenify(function(pad, callback)
exports.updatePadClients = async function(pad)
{
// skip this if no-one is on this pad
var roomClients = _getRoomClients(pad.id);
let roomClients = _getRoomClients(pad.id);
if (roomClients.length == 0) {
return callback();
return;
}
// since all clients usually get the same set of changesets, store them in local cache
@ -778,69 +720,54 @@ exports.updatePadClients = thenify(function(pad, callback)
// BEFORE first result will be landed to our cache object. The solution is to replace parallel processing
// via async.forEach with sequential for() loop. There is no real benefits of running this in parallel,
// but benefit of reusing cached revision object is HUGE
var revCache = {};
let revCache = {};
// go through all sessions on this pad
async.forEach(roomClients, function(client, callback){
var sid = client.id;
// https://github.com/caolan/async#whilst
for (let client of roomClients) {
let sid = client.id;
// send them all new changesets
async.whilst(
function() { return sessioninfos[sid] && sessioninfos[sid].rev < pad.getHeadRevisionNumber()},
function(callback)
{
var r = sessioninfos[sid].rev + 1;
while (sessioninfos[sid] && sessioninfos[sid].rev < pad.getHeadRevisionNumber()) {
let r = sessioninfos[sid].rev + 1;
let revision = revCache[r];
if (!revision) {
revision = await pad.getRevision(r);
revCache[r] = revision;
}
async.waterfall([
function(callback) {
if(revCache[r]) {
callback(null, revCache[r]);
} else {
pad.getRevision(r, callback);
}
},
let author = revision.meta.author,
revChangeset = revision.changeset,
currentTime = revision.meta.timestamp;
function(revision, callback) {
revCache[r] = revision;
// next if session has not been deleted
if (sessioninfos[sid] == null) {
continue;
}
var author = revision.meta.author,
revChangeset = revision.changeset,
currentTime = revision.meta.timestamp;
if (author == sessioninfos[sid].author) {
client.json.send({ "type": "COLLABROOM", "data":{ type: "ACCEPT_COMMIT", newRev: r }});
} else {
let forWire = Changeset.prepareForWire(revChangeset, pad.pool);
let wireMsg = {"type": "COLLABROOM",
"data": { type:"NEW_CHANGES",
newRev:r,
changeset: forWire.translated,
apool: forWire.pool,
author: author,
currentTime: currentTime,
timeDelta: currentTime - sessioninfos[sid].time
}};
// next if session has not been deleted
if (sessioninfos[sid] == null) {
return callback(null);
}
client.json.send(wireMsg);
}
if (author == sessioninfos[sid].author) {
client.json.send({"type":"COLLABROOM","data":{type:"ACCEPT_COMMIT", newRev:r}});
} else {
var forWire = Changeset.prepareForWire(revChangeset, pad.pool);
var wireMsg = {"type":"COLLABROOM",
"data":{type:"NEW_CHANGES",
newRev:r,
changeset: forWire.translated,
apool: forWire.pool,
author: author,
currentTime: currentTime,
timeDelta: currentTime - sessioninfos[sid].time
}};
client.json.send(wireMsg);
}
if (sessioninfos[sid]) {
sessioninfos[sid].time = currentTime;
sessioninfos[sid].rev = r;
}
callback(null);
}
], callback);
},
callback
);
},callback);
});
if (sessioninfos[sid]) {
sessioninfos[sid].time = currentTime;
sessioninfos[sid].rev = r;
}
}
}
}
/**
* Copied from the Etherpad Source Code. Don't know what this method does excatly...
@ -893,12 +820,12 @@ function _correctMarkersInPad(atext, apool) {
function handleSwitchToPad(client, message)
{
// clear the session and leave the room
var currentSession = sessioninfos[client.id];
var padId = currentSession.padId;
var roomClients = _getRoomClients(padId);
let currentSession = sessioninfos[client.id];
let padId = currentSession.padId;
let roomClients = _getRoomClients(padId);
async.forEach(roomClients, function(client, callback) {
var sinfo = sessioninfos[client.id];
roomClients.forEach(client => {
let sinfo = sessioninfos[client.id];
if (sinfo && sinfo.author == currentSession.author) {
// fix user's counter, works on page refresh or if user closes browser window and then rejoins
sessioninfos[client.id] = {};