diff --git a/src/node/handler/PadMessageHandler.js b/src/node/handler/PadMessageHandler.js index 748b8382a..90053b416 100644 --- a/src/node/handler/PadMessageHandler.js +++ b/src/node/handler/PadMessageHandler.js @@ -1,6 +1,6 @@ /** - * The MessageHandler handles all Messages that comes from Socket.IO and controls the sessions - */ + * The MessageHandler handles all Messages that comes from Socket.IO and controls the sessions + */ /* * Copyright 2009 Google Inc., 2011 Peter 'Pita' Martischka (Primary Technology Ltd) @@ -106,12 +106,12 @@ exports.kickSessionsFromPad = function(padID) * @param client the client that leaves */ exports.handleDisconnect = function(client) -{ +{ stats.meter('disconnects').mark(); - + //save the padname of this session var session = sessioninfos[client.id]; - + //if this connection was already etablished with a handshake, send a disconnect message to the others if(session && session.author) { @@ -128,7 +128,7 @@ exports.handleDisconnect = function(client) authorManager.getAuthorColorId(session.author, function(err, color) { ERR(err); - + //prepare the notification for the other users on the pad, that this user left var messageToTheOtherUsers = { "type": "COLLABROOM", @@ -142,14 +142,14 @@ exports.handleDisconnect = function(client) } } }; - + //Go trough all user that are still on the pad, and send them the USER_LEAVE message client.broadcast.to(session.padId).json.send(messageToTheOtherUsers); - }); + }); } - + //Delete the sessioninfos entrys of this session - delete sessioninfos[client.id]; + delete sessioninfos[client.id]; } /** @@ -158,7 +158,7 @@ exports.handleDisconnect = function(client) * @param message the message from the client */ exports.handleMessage = function(client, message) -{ +{ if(message == null) { return; @@ -174,7 +174,7 @@ exports.handleMessage = function(client, message) var handleMessageHook = function(callback){ var dropMessage = false; - // Call handleMessage hook. If a plugin returns null, the message will be dropped. Note that for all messages + // Call handleMessage hook. If a plugin returns null, the message will be dropped. Note that for all messages // handleMessage will be called, even if the client is not authorized hooks.aCallAll("handleMessage", { client: client, message: message }, function ( err, messages ) { if(ERR(err, callback)) return; @@ -183,7 +183,7 @@ exports.handleMessage = function(client, message) dropMessage = true; } }); - + // If no plugins explicitly told us to drop the message, its ok to proceed if(!dropMessage){ callback() }; }); @@ -259,7 +259,7 @@ exports.handleMessage = function(client, message) var checkAccessCallback = function(err, statusObject) { if(ERR(err, callback)) return; - + //access was granted if(statusObject.accessStatus == "grant") { @@ -297,17 +297,17 @@ exports.handleMessage = function(client, message) function handleSaveRevisionMessage(client, message){ var padId = sessioninfos[client.id].padId; var userId = sessioninfos[client.id].author; - + padManager.getPad(padId, function(err, pad) { if(ERR(err)) return; - + pad.addSavedRevision(pad.head, userId); }); } /** - * Handles a custom message, different to the function below as it handles objects not strings and you can + * Handles a custom message, different to the function below as it handles objects not strings and you can * direct the message to specific sessionID * * @param msg {Object} the message we're sending @@ -356,10 +356,10 @@ function handleChatMessage(client, message) var userId = sessioninfos[client.id].author; var text = message.data.text; var padId = sessioninfos[client.id].padId; - + var pad; var userName; - + async.series([ //get the pad function(callback) @@ -385,7 +385,7 @@ function handleChatMessage(client, message) { //save the chat message pad.appendChatMessage(text, userId, time); - + var msg = { type: "COLLABROOM", data: { @@ -396,10 +396,10 @@ function handleChatMessage(client, message) text: text } }; - + //broadcast the chat message to everyone on the pad socketio.sockets.in(padId).json.send(msg); - + callback(); } ], function(err) @@ -425,20 +425,20 @@ function handleGetChatMessages(client, message) messageLogger.warn("Dropped message, GetChatMessages Message has no start!"); return; } - + var start = message.data.start; var end = message.data.end; var count = start - count; - + if(count < 0 && count > 100) { messageLogger.warn("Dropped message, GetChatMessages Message, client requested invalid amout of messages!"); return; } - + var padId = sessioninfos[client.id].padId; var pad; - + async.series([ //get the pad function(callback) @@ -488,10 +488,10 @@ function handleSuggestUserName(client, message) messageLogger.warn("Dropped message, suggestUserName Message has no unnamedId!"); return; } - + var padId = sessioninfos[client.id].padId, clients = socketio.sockets.clients(padId); - + //search the author and send him this message for(var i = 0; i < clients.length; i++) { var session = sessioninfos[clients[i].id]; @@ -520,14 +520,14 @@ function handleUserInfoUpdate(client, message) messageLogger.warn("Dropped message, USERINFO_UPDATE Message has no colorId!"); return; } - + //Find out the author name of this session var author = sessioninfos[client.id].author; - + //Tell the authorManager about the new attributes authorManager.setAuthorColorId(author, message.data.userInfo.colorId); authorManager.setAuthorName(author, message.data.userInfo.name); - + var padId = sessioninfos[client.id].padId; var infoMsg = { @@ -545,7 +545,7 @@ function handleUserInfoUpdate(client, message) } } }; - + //Send the other clients on the pad the update message client.broadcast.to(padId).json.send(infoMsg); } @@ -588,7 +588,7 @@ function handleUserChanges(data, cb) messageLogger.warn("Dropped message, USER_CHANGES Message has no changeset!"); return cb(); } - + //get all Vars we need var baseRev = message.data.baseRev; var wireApool = (new AttributePool()).fromJsonable(message.data.apool); @@ -596,12 +596,12 @@ function handleUserChanges(data, cb) // The client might disconnect between our callbacks. We should still // finish processing the changeset, so keep a reference to the session. var thisSession = sessioninfos[client.id]; - + var r, apool, pad; // Measure time to process edit var stopWatch = stats.timer('edits').start(); - + async.series([ //get the pad function(callback) @@ -617,7 +617,7 @@ function handleUserChanges(data, cb) function(callback) { //ex. _checkChangesetAndPool - + try { // Verify that the changeset has valid syntax and is in canonical form @@ -644,9 +644,9 @@ function handleUserChanges(data, cb) if('author' == attr[0] && attr[1] != thisSession.author) throw new Error("Trying to submit changes as another author in changeset "+changeset); }) } - + //ex. adoptChangesetAttribs - + //Afaik, it copies the new attributes from the changeset, to the global Attribute Pool changeset = Changeset.moveOpsToNewPool(changeset, wireApool, pad.pool); } @@ -657,7 +657,7 @@ function handleUserChanges(data, cb) stats.meter('failedChangesets').mark(); return callback(new Error("Can't apply USER_CHANGES, because "+e.message)); } - + //ex. applyUserChanges apool = pad.pool; r = baseRev; @@ -671,7 +671,7 @@ function handleUserChanges(data, cb) function(callback) { r++; - + pad.getRevisionChangeset(r, function(err, c) { if(ERR(err, callback)) return; @@ -704,16 +704,16 @@ function handleUserChanges(data, cb) function (callback) { var prevText = pad.text(); - - if (Changeset.oldLen(changeset) != prevText.length) + + if (Changeset.oldLen(changeset) != prevText.length) { client.json.send({disconnect:"badChangeset"}); stats.meter('failedChangesets').mark(); return callback(new Error("Can't apply USER_CHANGES "+changeset+" with oldLen " + Changeset.oldLen(changeset) + " to document of length " + prevText.length)); } - + pad.appendRevision(changeset, thisSession.author); - + var correctionChangeset = _correctMarkersInPad(pad.atext, pad.pool); if (correctionChangeset) { pad.appendRevision(correctionChangeset); @@ -724,7 +724,7 @@ function handleUserChanges(data, cb) var nlChangeset = Changeset.makeSplice(pad.text(), pad.text().length-1, 0, "\n"); pad.appendRevision(nlChangeset); } - + exports.updatePadClients(pad, function(er) { ERR(er) }); @@ -739,16 +739,16 @@ function handleUserChanges(data, cb) } exports.updatePadClients = function(pad, callback) -{ +{ //skip this step if noone is on this pad var roomClients = socketio.sockets.clients(pad.id); if(roomClients.length==0) return callback(); - + // since all clients usually get the same set of changesets, store them in local cache // to remove unnecessary roundtrip to the datalayer // TODO: in REAL world, if we're working without datalayer cache, all requests to revisions will be fired - // BEFORE first result will be landed to our cache object. The solution is to replace parallel processing + // BEFORE first result will be landed to our cache object. The solution is to replace parallel processing // via async.forEach with sequential for() loop. There is no real benefits of running this in parallel, // but benefit of reusing cached revision object is HUGE var revCache = {}; @@ -763,7 +763,7 @@ exports.updatePadClients = function(pad, callback) async.whilst( function (){ return sessioninfos[sid] && sessioninfos[sid].rev < pad.getHeadRevisionNumber()}, function(callback) - { + { var r = sessioninfos[sid].rev + 1; async.waterfall([ @@ -772,7 +772,7 @@ exports.updatePadClients = function(pad, callback) callback(null, revCache[r]); else pad.getRevision(r, callback); - }, + }, function(revision, callback) { revCache[r] = revision; @@ -800,8 +800,8 @@ exports.updatePadClients = function(pad, callback) author: author, currentTime: currentTime, timeDelta: currentTime - sessioninfos[sid].time - }}; - + }}; + client.json.send(wireMsg); } @@ -814,7 +814,7 @@ exports.updatePadClients = function(pad, callback) }, callback ); - },callback); + },callback); } /** @@ -830,11 +830,11 @@ function _correctMarkersInPad(atext, apool) { var offset = 0; while (iter.hasNext()) { var op = iter.next(); - + var hasMarker = _.find(AttributeManager.lineAttributes, function(attribute){ return Changeset.opAttributeValue(op, attribute, apool); }) !== undefined; - + if (hasMarker) { for(var i=0;i 0 && text.charAt(offset-1) != '\n') { @@ -864,7 +864,7 @@ function _correctMarkersInPad(atext, apool) { } /** - * Handles a CLIENT_READY. A CLIENT_READY is the first message from the client to the server. The Client sends his token + * Handles a CLIENT_READY. A CLIENT_READY is the first message from the client to the server. The Client sends his token * and the pad it wants to enter. The Server answers with the inital values (clientVars) of the pad * @param client the client that send this message * @param message the message from the client @@ -922,7 +922,7 @@ function handleClientReady(client, message) securityManager.checkAccess (padIds.padId, message.sessionID, message.token, message.password, function(err, statusObject) { if(ERR(err, callback)) return; - + //access was granted if(statusObject.accessStatus == "grant") { @@ -935,7 +935,7 @@ function handleClientReady(client, message) client.json.send({accessStatus: statusObject.accessStatus}) } }); - }, + }, //get all authordata of this new user, and load the pad-object from the database function(callback) { @@ -967,7 +967,7 @@ function handleClientReady(client, message) function(callback) { var authors = pad.getAllAuthors(); - + async.parallel([ //get timestamp of latest revission needed for timeslider function(callback) @@ -993,7 +993,7 @@ function handleClientReady(client, message) }, callback); } ], callback); - + }, //glue the clientVars together, send them and tell the other clients that a new one is there function(callback) @@ -1013,12 +1013,12 @@ function handleClientReady(client, message) roomClients[i].json.send({disconnect:"userdup"}); } } - + //Save in sessioninfos that this session belonges to this pad sessioninfos[client.id].padId = padIds.padId; sessioninfos[client.id].readOnlyPadId = padIds.readOnlyPadId; sessioninfos[client.id].readonly = padIds.readonly; - + //Log creation/(re-)entering of a pad client.get('remoteAddress', function(er, ip) { //Anonymize the IP address if IP logging is disabled @@ -1056,7 +1056,7 @@ function handleClientReady(client, message) client.json.send({disconnect:"corruptPad"});// pull the breaks return callback(); } - + // Warning: never ever send padIds.padId to the client. If the // client is read only you would open a security hole 1 swedish // mile wide... @@ -1085,7 +1085,7 @@ function handleClientReady(client, message) "padId": message.padId, "initialTitle": "Pad: " + message.padId, "opts": {}, - // tell the client the number of the latest chat-message, which will be + // tell the client the number of the latest chat-message, which will be // used to request the latest 100 chat-messages later (GET_CHAT_MESSAGES) "chatHead": pad.chatHead, "numConnectedUsers": roomClients.length, @@ -1093,7 +1093,7 @@ function handleClientReady(client, message) "readonly": padIds.readonly, "serverTimestamp": new Date().getTime(), "userId": author, - "abiwordAvailable": settings.abiwordAvailable(), + "abiwordAvailable": settings.abiwordAvailable(), "plugins": { "plugins": plugins.plugins, "parts": plugins.parts, @@ -1106,18 +1106,18 @@ function handleClientReady(client, message) { clientVars.userName = authorName; } - + //call the clientVars-hook so plugins can modify them before they get sent to the client hooks.aCallAll("clientVars", { clientVars: clientVars, pad: pad }, function ( err, messages ) { if(ERR(err, callback)) return; - + _.each(messages, function(newVars) { //combine our old object with the new attributes from the hook for(var attr in newVars) { clientVars[attr] = newVars[attr]; } }); - + //Join the pad and start receiving updates client.join(padIds.padId); //Send the clientVars to the Client @@ -1126,9 +1126,9 @@ function handleClientReady(client, message) sessioninfos[client.id].rev = pad.getHeadRevisionNumber(); }); } - + sessioninfos[client.id].author = author; - + //prepare the notification for the other users on the pad, that this user joined var messageToTheOtherUsers = { "type": "COLLABROOM", @@ -1142,7 +1142,7 @@ function handleClientReady(client, message) } } }; - + //Add the authorname of this new User, if avaiable if(authorName != null) { @@ -1151,7 +1151,7 @@ function handleClientReady(client, message) // notify all existing users about new user client.broadcast.to(padIds.padId).json.send(messageToTheOtherUsers); - + //Run trough all sessions of this pad async.forEach(socketio.sockets.clients(padIds.padId), function(roomClient, callback) { @@ -1160,9 +1160,9 @@ function handleClientReady(client, message) //Jump over, if this session is the connection session if(roomClient.id == client.id) return callback(); - - - //Since sessioninfos might change while being enumerated, check if the + + + //Since sessioninfos might change while being enumerated, check if the //sessionID is still assigned to a valid session if(sessioninfos[roomClient.id] !== undefined) author = sessioninfos[roomClient.id].author; @@ -1178,7 +1178,7 @@ function handleClientReady(client, message) callback(null, historicalAuthorData[author]); else authorManager.getAuthor(author, callback); - }, + }, function (authorInfo, callback) { //Send the new User a Notification about this other user @@ -1207,7 +1207,7 @@ function handleClientReady(client, message) } /** - * Handles a request for a rough changeset, the timeslider client needs it + * Handles a request for a rough changeset, the timeslider client needs it */ function handleChangesetRequest(client, message) { @@ -1237,7 +1237,7 @@ function handleChangesetRequest(client, message) messageLogger.warn("Dropped message, changeset request has no requestID!"); return; } - + var granularity = message.data.granularity; var start = message.data.start; var end = start + (100 * granularity); @@ -1281,47 +1281,49 @@ function getChangesetInfo(padId, startNum, endNum, granularity, callback) var composedChangesets = {}; var revisionDate = []; var lines; - + var head_revision = 0; + async.series([ //get the pad from the database function(callback) { padManager.getPad(padId, function(err, _pad) - { + { if(ERR(err, callback)) return; pad = _pad; + head_revision = pad.getHeadRevisionNumber(); callback(); }); }, function(callback) - { + { //calculate the last full endnum var lastRev = pad.getHeadRevisionNumber(); if (endNum > lastRev+1) { endNum = lastRev+1; } endNum = Math.floor(endNum / granularity)*granularity; - + var compositesChangesetNeeded = []; var revTimesNeeded = []; - + //figure out which composite Changeset and revTimes we need, to load them in bulk var compositeStart = startNum; - while (compositeStart < endNum) + while (compositeStart < endNum) { var compositeEnd = compositeStart + granularity; - + //add the composite Changeset we needed compositesChangesetNeeded.push({start: compositeStart, end: compositeEnd}); - + //add the t1 time we need revTimesNeeded.push(compositeStart == 0 ? 0 : compositeStart - 1); //add the t2 time we need revTimesNeeded.push(compositeEnd - 1); - + compositeStart += granularity; } - + //get all needed db values parallel async.parallel([ function(callback) @@ -1358,58 +1360,57 @@ function getChangesetInfo(padId, startNum, endNum, granularity, callback) if(ERR(err, callback)) return; lines = _lines; callback(); - }); + }); } ], callback); }, //doesn't know what happens here excatly :/ function(callback) - { + { var compositeStart = startNum; - - while (compositeStart < endNum) + + while (compositeStart < endNum) { - if (compositeStart + granularity > endNum) + var compositeEnd = compositeStart + granularity; + if (compositeEnd > endNum || compositeEnd > head_revision) { break; } - - var compositeEnd = compositeStart + granularity; - + var forwards = composedChangesets[compositeStart + "/" + compositeEnd]; var backwards = Changeset.inverse(forwards, lines.textlines, lines.alines, pad.apool()); - + Changeset.mutateAttributionLines(forwards, lines.alines, pad.apool()); Changeset.mutateTextLines(forwards, lines.textlines); - + var forwards2 = Changeset.moveOpsToNewPool(forwards, pad.apool(), apool); var backwards2 = Changeset.moveOpsToNewPool(backwards, pad.apool(), apool); - + var t1, t2; - if (compositeStart == 0) + if (compositeStart == 0) { t1 = revisionDate[0]; } - else + else { t1 = revisionDate[compositeStart - 1]; } - + t2 = revisionDate[compositeEnd - 1]; - + timeDeltas.push(t2 - t1); forwardsChangesets.push(forwards2); backwardsChangesets.push(backwards2); - + compositeStart += granularity; } - + callback(); } ], function(err) { if(ERR(err, callback)) return; - + callback(null, {forwardsChangesets: forwardsChangesets, backwardsChangesets: backwardsChangesets, apool: apool.toJsonable(), @@ -1424,7 +1425,7 @@ function getChangesetInfo(padId, startNum, endNum, granularity, callback) * Tries to rebuild the getPadLines function of the original Etherpad * https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L263 */ -function getPadLines(padId, revNum, callback) +function getPadLines(padId, revNum, callback) { var atext; var result = {}; @@ -1435,7 +1436,7 @@ function getPadLines(padId, revNum, callback) function(callback) { padManager.getPad(padId, function(err, _pad) - { + { if(ERR(err, callback)) return; pad = _pad; callback(); @@ -1479,7 +1480,7 @@ function getPadLines(padId, revNum, callback) function composePadChangesets(padId, startNum, endNum, callback) { var pad; - var changesets = []; + var changesets = {}; var changeset; async.series([ @@ -1497,14 +1498,19 @@ function composePadChangesets(padId, startNum, endNum, callback) function(callback) { var changesetsNeeded=[]; - - //create a array for all changesets, we will + + var headNum = pad.getHeadRevisionNumber(); + if (endNum > headNum) + endNum = headNum; + if (startNum < 0) + startNum = 0; + //create a array for all changesets, we will //replace the values with the changeset later for(var r=startNum;r