Merge pull request #1454 from dmitryuv/feature/socketio-rooms

socket.io rooms instead of own session management
This commit is contained in:
John McLear 2013-02-05 16:09:25 -08:00
commit 5b31030958

View file

@ -35,11 +35,6 @@ var messageLogger = log4js.getLogger("message");
var _ = require('underscore'); var _ = require('underscore');
var hooks = require("ep_etherpad-lite/static/js/pluginfw/hooks.js"); var hooks = require("ep_etherpad-lite/static/js/pluginfw/hooks.js");
/**
* A associative array that saves which sessions belong to a pad
*/
var pad2sessions = {};
/** /**
* A associative array that saves informations about a session * A associative array that saves informations about a session
* key = sessionId * key = sessionId
@ -83,14 +78,11 @@ exports.handleConnect = function(client)
exports.kickSessionsFromPad = function(padID) exports.kickSessionsFromPad = function(padID)
{ {
//skip if there is nobody on this pad //skip if there is nobody on this pad
if(!pad2sessions[padID]) if(socketio.sockets.clients(padID).length == 0)
return; return;
//disconnect everyone from this pad //disconnect everyone from this pad
for(var i in pad2sessions[padID]) socketio.sockets.in(padID).json.send({disconnect:"deleted"});
{
socketio.sockets.sockets[pad2sessions[padID][i]].json.send({disconnect:"deleted"});
}
} }
/** /**
@ -100,15 +92,13 @@ exports.kickSessionsFromPad = function(padID)
exports.handleDisconnect = function(client) exports.handleDisconnect = function(client)
{ {
//save the padname of this session //save the padname of this session
var sessionPad=sessioninfos[client.id].padId; var session = sessioninfos[client.id];
//if this connection was already etablished with a handshake, send a disconnect message to the others //if this connection was already etablished with a handshake, send a disconnect message to the others
if(sessioninfos[client.id] && sessioninfos[client.id].author) if(session && session.author)
{ {
var author = sessioninfos[client.id].author;
//get the author color out of the db //get the author color out of the db
authorManager.getAuthorColorId(author, function(err, color) authorManager.getAuthorColorId(session.author, function(err, color)
{ {
ERR(err); ERR(err);
@ -121,33 +111,16 @@ exports.handleDisconnect = function(client)
"ip": "127.0.0.1", "ip": "127.0.0.1",
"colorId": color, "colorId": color,
"userAgent": "Anonymous", "userAgent": "Anonymous",
"userId": author "userId": session.author
} }
} }
}; };
//Go trough all user that are still on the pad, and send them the USER_LEAVE message //Go trough all user that are still on the pad, and send them the USER_LEAVE message
for(i in pad2sessions[sessionPad]) client.broadcast.to(session.padId).json.send(messageToTheOtherUsers);
{
var socket = socketio.sockets.sockets[pad2sessions[sessionPad][i]];
if(socket !== undefined){
socket.json.send(messageToTheOtherUsers);
}
}
}); });
} }
//Go trough all sessions of this pad, search and destroy the entry of this client
for(i in pad2sessions[sessionPad])
{
if(pad2sessions[sessionPad][i] == client.id)
{
pad2sessions[sessionPad].splice(i, 1);
break;
}
}
//Delete the sessioninfos entrys of this session //Delete the sessioninfos entrys of this session
delete sessioninfos[client.id]; delete sessioninfos[client.id];
} }
@ -228,11 +201,10 @@ exports.handleMessage = function(client, message)
function(callback) function(callback)
{ {
if(!message.padId){ // If the message has a padId we assume the client is already known to the server and needs no re-authorization
// If the message has a padId we assume the client is already known to the server and needs no re-authorization if(!message.padId)
callback(); return callback();
return;
}
// Note: message.sessionID is an entirely different kind of // Note: message.sessionID is an entirely different kind of
// session from the sessions we use here! Beware! FIXME: Call // session from the sessions we use here! Beware! FIXME: Call
// our "sessions" "connections". // our "sessions" "connections".
@ -292,9 +264,7 @@ exports.handleCustomMessage = function (padID, msg, cb) {
time: time time: time
} }
}; };
for (var i in pad2sessions[padID]) { socketio.sockets.in(padID).json.send(msg);
socketio.sockets.sockets[pad2sessions[padID][i]].json.send(msg);
}
cb(null, {}); cb(null, {});
} }
@ -352,10 +322,7 @@ function handleChatMessage(client, message)
}; };
//broadcast the chat message to everyone on the pad //broadcast the chat message to everyone on the pad
for(var i in pad2sessions[padId]) socketio.sockets.in(padId).json.send(msg);
{
socketio.sockets.sockets[pad2sessions[padId][i]].json.send(msg);
}
callback(); callback();
} }
@ -413,23 +380,16 @@ function handleGetChatMessages(client, message)
{ {
if(ERR(err, callback)) return; if(ERR(err, callback)) return;
var infoMsg = { var infoMsg = {
type: "COLLABROOM", type: "COLLABROOM",
data: { data: {
type: "CHAT_MESSAGES", type: "CHAT_MESSAGES",
messages: chatMessages messages: chatMessages
}
};
// send the messages back to the client
for(var i in pad2sessions[padId])
{
if(pad2sessions[padId][i] == client.id)
{
socketio.sockets.sockets[pad2sessions[padId][i]].json.send(infoMsg);
break;
} }
} };
// send the messages back to the client
client.json.send(infoMsg);
}); });
}]); }]);
} }
@ -453,14 +413,14 @@ function handleSuggestUserName(client, message)
return; return;
} }
var padId = sessioninfos[client.id].padId; var padId = sessioninfos[client.id].padId,
clients = socketio.sockets.clients(padId);
//search the author and send him this message //search the author and send him this message
for(var i in pad2sessions[padId]) for(var i = 0; i < clients.length; i++) {
{ var session = sessioninfos[clients[i].id];
if(sessioninfos[pad2sessions[padId][i]].author == message.data.payload.unnamedId) if(session && session.author == message.data.payload.unnamedId) {
{ clients[i].json.send(message);
socketio.sockets.sockets[pad2sessions[padId][i]].send(message);
break; break;
} }
} }
@ -501,7 +461,8 @@ function handleUserInfoUpdate(client, message)
type: "USER_NEWINFO", type: "USER_NEWINFO",
userInfo: { userInfo: {
userId: author, userId: author,
name: message.data.userInfo.name, //set a null name, when there is no name set. cause the client wants it null
name: message.data.userInfo.name || null,
colorId: message.data.userInfo.colorId, colorId: message.data.userInfo.colorId,
userAgent: "Anonymous", userAgent: "Anonymous",
ip: "127.0.0.1", ip: "127.0.0.1",
@ -509,20 +470,8 @@ function handleUserInfoUpdate(client, message)
} }
}; };
//set a null name, when there is no name set. cause the client wants it null
if(infoMsg.data.userInfo.name == null)
{
infoMsg.data.userInfo.name = null;
}
//Send the other clients on the pad the update message //Send the other clients on the pad the update message
for(var i in pad2sessions[padId]) client.broadcast.to(padId).json.send(infoMsg);
{
if(pad2sessions[padId][i] != client.id)
{
socketio.sockets.sockets[pad2sessions[padId][i]].json.send(infoMsg);
}
}
} }
/** /**
@ -682,90 +631,76 @@ function handleUserChanges(client, message)
exports.updatePadClients = function(pad, callback) exports.updatePadClients = function(pad, callback)
{ {
//skip this step if noone is on this pad //skip this step if noone is on this pad
if(!pad2sessions[pad.id]) var roomClients = socketio.sockets.clients(pad.id);
{ if(roomClients.length==0)
callback(); return callback();
return;
} // since all clients usually get the same set of changesets, store them in local cache
// to remove unnecessary roundtrip to the datalayer
// TODO: in REAL world, if we're working without datalayer cache, all requests to revisions will be fired
// BEFORE first result will be landed to our cache object. The solution is to replace parallel processing
// via async.forEach with sequential for() loop. There is no real benefits of running this in parallel,
// but benefit of reusing cached revision object is HUGE
var revCache = {};
//go trough all sessions on this pad //go trough all sessions on this pad
async.forEach(pad2sessions[pad.id], function(session, callback) async.forEach(roomClients, function(client, callback)
{ {
var sid = client.id;
//https://github.com/caolan/async#whilst //https://github.com/caolan/async#whilst
//send them all new changesets //send them all new changesets
async.whilst( async.whilst(
function (){ return sessioninfos[session] && sessioninfos[session].rev < pad.getHeadRevisionNumber()}, function (){ return sessioninfos[sid] && sessioninfos[sid].rev < pad.getHeadRevisionNumber()},
function(callback) function(callback)
{ {
var author, revChangeset, currentTime; var r = sessioninfos[sid].rev + 1;
var r = sessioninfos[session].rev + 1;
async.parallel([ async.waterfall([
function (callback) function(callback) {
{ if(revCache[r])
pad.getRevisionAuthor(r, function(err, value) callback(null, revCache[r]);
{ else
if(ERR(err, callback)) return; pad.getRevision(r, callback);
author = value;
callback();
});
}, },
function (callback) function(revision, callback)
{ {
pad.getRevisionChangeset(r, function(err, value) revCache[r] = revision;
var author = revision.meta.author,
revChangeset = revision.changeset,
currentTime = revision.meta.timestamp;
// next if session has not been deleted
if(sessioninfos[sid] == null)
return callback(null);
if(author == sessioninfos[sid].author)
{ {
if(ERR(err, callback)) return; client.json.send({"type":"COLLABROOM","data":{type:"ACCEPT_COMMIT", newRev:r}});
revChangeset = value; }
callback(); else
});
},
function (callback)
{
pad.getRevisionDate(r, function(err, date)
{ {
if(ERR(err, callback)) return; var forWire = Changeset.prepareForWire(revChangeset, pad.pool);
currentTime = date; var wireMsg = {"type":"COLLABROOM",
callback(); "data":{type:"NEW_CHANGES",
}); newRev:r,
} changeset: forWire.translated,
], function(err) apool: forWire.pool,
{ author: author,
if(ERR(err, callback)) return; currentTime: currentTime,
// next if session has not been deleted timeDelta: currentTime - sessioninfos[sid].time
if(sessioninfos[session] == null) }};
{
client.json.send(wireMsg);
}
sessioninfos[sid].time = currentTime;
sessioninfos[sid].rev = r;
callback(null); callback(null);
return;
} }
if(author == sessioninfos[session].author) ], callback);
{
socketio.sockets.sockets[session].json.send({"type":"COLLABROOM","data":{type:"ACCEPT_COMMIT", newRev:r}});
}
else
{
var forWire = Changeset.prepareForWire(revChangeset, pad.pool);
var wireMsg = {"type":"COLLABROOM",
"data":{type:"NEW_CHANGES",
newRev:r,
changeset: forWire.translated,
apool: forWire.pool,
author: author,
currentTime: currentTime,
timeDelta: currentTime - sessioninfos[session].time
}};
socketio.sockets.sockets[session].json.send(wireMsg);
}
if(sessioninfos[session] != null)
{
sessioninfos[session].time = currentTime;
sessioninfos[session].rev = r;
}
callback(null);
});
}, },
callback callback
); );
@ -895,23 +830,14 @@ function handleClientReady(client, message)
function(callback) function(callback)
{ {
async.parallel([ async.parallel([
//get colorId //get colorId and name
function(callback) function(callback)
{ {
authorManager.getAuthorColorId(author, function(err, value) authorManager.getAuthor(author, function(err, value)
{ {
if(ERR(err, callback)) return; if(ERR(err, callback)) return;
authorColorId = value; authorColorId = value.colorId;
callback(); authorName = value.name;
});
},
//get author name
function(callback)
{
authorManager.getAuthorName(author, function(err, value)
{
if(ERR(err, callback)) return;
authorName = value;
callback(); callback();
}); });
}, },
@ -965,21 +891,17 @@ function handleClientReady(client, message)
{ {
//Check that the client is still here. It might have disconnected between callbacks. //Check that the client is still here. It might have disconnected between callbacks.
if(sessioninfos[client.id] === undefined) if(sessioninfos[client.id] === undefined)
{ return callback();
callback();
return;
}
//Check if this author is already on the pad, if yes, kick the other sessions! //Check if this author is already on the pad, if yes, kick the other sessions!
if(pad2sessions[padIds.padId]) var roomClients = socketio.sockets.clients(padIds.padId);
{ for(var i = 0; i < roomClients.length; i++) {
for(var i in pad2sessions[padIds.padId]) var sinfo = sessioninfos[roomClients[i].id];
{ if(sinfo && sinfo.author == author) {
if(sessioninfos[pad2sessions[padIds.padId][i]] && sessioninfos[pad2sessions[padIds.padId][i]].author == author) // fix user's counter, works on page refresh or if user closes browser window and then rejoins
{ sessioninfos[roomClients[i].id] = {};
var socket = socketio.sockets.sockets[pad2sessions[padIds.padId][i]]; roomClients[i].leave(padIds.padId);
if(socket) socket.json.send({disconnect:"userdup"}); roomClients[i].json.send({disconnect:"userdup"});
}
} }
} }
@ -988,15 +910,6 @@ function handleClientReady(client, message)
sessioninfos[client.id].readOnlyPadId = padIds.readOnlyPadId; sessioninfos[client.id].readOnlyPadId = padIds.readOnlyPadId;
sessioninfos[client.id].readonly = padIds.readonly; sessioninfos[client.id].readonly = padIds.readonly;
//check if there is already a pad2sessions entry, if not, create one
if(!pad2sessions[padIds.padId])
{
pad2sessions[padIds.padId] = [];
}
//Saves in pad2sessions that this session belongs to this pad
pad2sessions[padIds.padId].push(client.id);
//If this is a reconnect, we don't have to send the client the ClientVars again //If this is a reconnect, we don't have to send the client the ClientVars again
if(message.reconnect == true) if(message.reconnect == true)
{ {
@ -1044,7 +957,7 @@ function handleClientReady(client, message)
// tell the client the number of the latest chat-message, which will be // tell the client the number of the latest chat-message, which will be
// used to request the latest 100 chat-messages later (GET_CHAT_MESSAGES) // used to request the latest 100 chat-messages later (GET_CHAT_MESSAGES)
"chatHead": pad.chatHead, "chatHead": pad.chatHead,
"numConnectedUsers": pad2sessions[padIds.padId].length, "numConnectedUsers": roomClients.length,
"isProPad": false, "isProPad": false,
"readOnlyId": padIds.readOnlyPadId, "readOnlyId": padIds.readOnlyPadId,
"readonly": padIds.readonly, "readonly": padIds.readonly,
@ -1080,6 +993,8 @@ function handleClientReady(client, message)
} }
}); });
//Join the pad and start receiving updates
client.join(padIds.padId);
//Send the clientVars to the Client //Send the clientVars to the Client
client.json.send({type: "CLIENT_VARS", data: clientVars}); client.json.send({type: "CLIENT_VARS", data: clientVars});
//Save the current revision in sessioninfos, should be the same as in clientVars //Save the current revision in sessioninfos, should be the same as in clientVars
@ -1109,71 +1024,53 @@ function handleClientReady(client, message)
messageToTheOtherUsers.data.userInfo.name = authorName; messageToTheOtherUsers.data.userInfo.name = authorName;
} }
// notify all existing users about new user
client.broadcast.to(padIds.padIds).json.send(messageToTheOtherUsers);
//Run trough all sessions of this pad //Run trough all sessions of this pad
async.forEach(pad2sessions[padIds.padId], function(sessionID, callback) async.forEach(socketio.sockets.clients(padIds.padId), function(roomClient, callback)
{ {
var author, socket, sessionAuthorName, sessionAuthorColorId; var author;
//Jump over, if this session is the connection session
if(roomClient.id == client.id)
return callback();
//Since sessioninfos might change while being enumerated, check if the //Since sessioninfos might change while being enumerated, check if the
//sessionID is still assigned to a valid session //sessionID is still assigned to a valid session
if(sessioninfos[sessionID] !== undefined && if(sessioninfos[roomClient.id] !== undefined)
socketio.sockets.sockets[sessionID] !== undefined){ author = sessioninfos[roomClient.id].author;
author = sessioninfos[sessionID].author; else // If the client id is not valid, callback();
socket = socketio.sockets.sockets[sessionID]; return callback();
}else {
// If the sessionID is not valid, callback(); async.waterfall([
callback();
return;
}
async.series([
//get the authorname & colorId //get the authorname & colorId
function(callback) function(callback)
{ {
async.parallel([ // reuse previously created cache of author's data
function(callback) if(historicalAuthorData[author])
{ callback(null, historicalAuthorData[author]);
authorManager.getAuthorColorId(author, function(err, value) else
{ authorManager.getAuthor(author, callback);
if(ERR(err, callback)) return;
sessionAuthorColorId = value;
callback();
})
},
function(callback)
{
authorManager.getAuthorName(author, function(err, value)
{
if(ERR(err, callback)) return;
sessionAuthorName = value;
callback();
})
}
],callback);
}, },
function (callback) function (authorInfo, callback)
{ {
//Jump over, if this session is the connection session //Send the new User a Notification about this other user
if(sessionID != client.id) var msg = {
{ "type": "COLLABROOM",
//Send this Session the Notification about the new user "data": {
socket.json.send(messageToTheOtherUsers); type: "USER_NEWINFO",
userInfo: {
//Send the new User a Notification about this other user "ip": "127.0.0.1",
var messageToNotifyTheClientAboutTheOthers = { "colorId": authorInfo.colorId,
"type": "COLLABROOM", "name": authorInfo.name,
"data": { "userAgent": "Anonymous",
type: "USER_NEWINFO", "userId": author
userInfo: {
"ip": "127.0.0.1",
"colorId": sessionAuthorColorId,
"name": sessionAuthorName,
"userAgent": "Anonymous",
"userId": author
}
} }
}; }
client.json.send(messageToNotifyTheClientAboutTheOthers); };
} client.json.send(msg);
} }
], callback); ], callback);
}, callback); }, callback);
@ -1521,33 +1418,30 @@ function composePadChangesets(padId, startNum, endNum, callback)
* Get the number of users in a pad * Get the number of users in a pad
*/ */
exports.padUsersCount = function (padID, callback) { exports.padUsersCount = function (padID, callback) {
if (!pad2sessions[padID] || typeof pad2sessions[padID] != typeof []) { callback(null, {
callback(null, {padUsersCount: 0}); padUsersCount: socketio.sockets.clients(padId).length
} else { });
callback(null, {padUsersCount: pad2sessions[padID].length});
}
} }
/** /**
* Get the list of users in a pad * Get the list of users in a pad
*/ */
exports.padUsers = function (padID, callback) { exports.padUsers = function (padID, callback) {
if (!pad2sessions[padID] || typeof pad2sessions[padID] != typeof []) { var result = [];
callback(null, {padUsers: []});
} else { async.forEach(socketio.sockets.clients(padId), function(roomClient, callback) {
var authors = []; var s = sessioninfos[roomClient.id];
for ( var ix in sessioninfos ) { if(s) {
if ( sessioninfos[ix].padId !== padID ) { authorManager.getAuthor(s.author, function(err, author) {
continue; if(ERR(err, callback)) return;
}
var aid = sessioninfos[ix].author; author.id = s.author;
authorManager.getAuthor( aid, function ( err, author ) { result.push(author);
author.id = aid; });
authors.push( author );
if ( authors.length === pad2sessions[padID].length ) {
callback(null, {padUsers: authors});
}
} );
} }
} }, function(err) {
if(ERR(err, callback)) return;
callback(null, {padUsers: result});
});
} }