PadMessageHandler.js: further conversion

This commit is contained in:
Ray Bellis 2019-01-30 13:55:49 +00:00
parent d543d5ae6a
commit 9246a1de26

View file

@ -1274,7 +1274,7 @@ async function handleClientReady(client, message)
/**
* Handles a request for a rough changeset, the timeslider client needs it
*/
function handleChangesetRequest(client, message)
async function handleChangesetRequest(client, message)
{
// check if all ok
if (message.data == null) {
@ -1308,309 +1308,182 @@ function handleChangesetRequest(client, message)
return;
}
var granularity = message.data.granularity;
var start = message.data.start;
var end = start + (100 * granularity);
var padIds;
let granularity = message.data.granularity;
let start = message.data.start;
let end = start + (100 * granularity);
async.series([
function(callback) {
readOnlyManager.getIds(message.padId, function(err, value) {
if (ERR(err, callback)) return;
let padIds = await readOnlyManager.getIds(message.padId);
padIds = value;
callback();
});
},
function(callback) {
// build the requested rough changesets and send them back
getChangesetInfo(padIds.padId, start, end, granularity, function(err, changesetInfo) {
if (err) return console.error('Error while handling a changeset request for ' + padIds.padId, err, message.data);
var data = changesetInfo;
data.requestID = message.data.requestID;
client.json.send({ type: "CHANGESET_REQ", data: data });
});
}
]);
// build the requested rough changesets and send them back
try {
let data = await getChangesetInfo(padIds.padId, start, end, granularity);
data.requestID = message.data.requestID;
client.json.send({ type: "CHANGESET_REQ", data });
} catch (err) {
console.error('Error while handling a changeset request for ' + padIds.padId, err, message.data);
}
}
/**
* Tries to rebuild the getChangestInfo function of the original Etherpad
* https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L144
*/
let getChangesetInfo = thenify(function getChangesetInfo(padId, startNum, endNum, granularity, callback)
async function getChangesetInfo(padId, startNum, endNum, granularity)
{
var forwardsChangesets = [];
var backwardsChangesets = [];
var timeDeltas = [];
var apool = new AttributePool();
var pad;
var composedChangesets = {};
var revisionDate = [];
var lines;
var head_revision = 0;
let pad = await padManager.getPad(padId);
let head_revision = pad.getHeadRevisionNumber();
async.series([
// get the pad from the database
function(callback) {
padManager.getPad(padId, function(err, _pad) {
if (ERR(err, callback)) return;
// calculate the last full endnum
if (endNum > head_revision + 1) {
endNum = head_revision + 1;
}
endNum = Math.floor(endNum / granularity) * granularity;
pad = _pad;
head_revision = pad.getHeadRevisionNumber();
callback();
});
},
let compositesChangesetNeeded = [];
let revTimesNeeded = [];
function(callback) {
// calculate the last full endnum
var lastRev = pad.getHeadRevisionNumber();
if (endNum > lastRev + 1) {
endNum = lastRev + 1;
}
// figure out which composite Changeset and revTimes we need, to load them in bulk
for (let start = startNum; start < endNum; start += granularity) {
let end = start + granularity;
endNum = Math.floor(endNum / granularity) * granularity;
// add the composite Changeset we needed
compositesChangesetNeeded.push({ start, end });
var compositesChangesetNeeded = [];
var revTimesNeeded = [];
// add the t1 time we need
revTimesNeeded.push(start == 0 ? 0 : start - 1);
// figure out which composite Changeset and revTimes we need, to load them in bulk
var compositeStart = startNum;
while (compositeStart < endNum) {
var compositeEnd = compositeStart + granularity;
// add the t2 time we need
revTimesNeeded.push(end - 1);
}
// add the composite Changeset we needed
compositesChangesetNeeded.push({ start: compositeStart, end: compositeEnd });
// get all needed db values parallel - no await here since
// it would make all the lookups run in series
// add the t1 time we need
revTimesNeeded.push(compositeStart == 0 ? 0 : compositeStart - 1);
// get all needed composite Changesets
let composedChangesets = {};
let p1 = Promise.all(compositesChangesetNeeded.map(item => {
return composePadChangesets(padId, item.start, item.end).then(changeset => {
composedChangesets[item.start + "/" + item.end] = changeset;
});
}));
// add the t2 time we need
revTimesNeeded.push(compositeEnd - 1);
// get all needed revision Dates
let revisionDate = [];
let p2 = Promise.all(revTimesNeeded.map(revNum => {
return pad.getRevisionDate(revNum).then(revDate => {
revisionDate[revNum] = Math.floor(revDate / 1000);
});
}));
compositeStart += granularity;
}
// get all needed db values parallel
async.parallel([
function(callback) {
// get all needed composite Changesets
async.forEach(compositesChangesetNeeded, function(item, callback) {
composePadChangesets(padId, item.start, item.end, function(err, changeset) {
if (ERR(err, callback)) return;
composedChangesets[item.start + "/" + item.end] = changeset;
callback();
});
}, callback);
},
function(callback) {
// get all needed revision Dates
async.forEach(revTimesNeeded, function(revNum, callback) {
pad.getRevisionDate(revNum, function(err, revDate) {
if (ERR(err, callback)) return;
revisionDate[revNum] = Math.floor(revDate/1000);
callback();
});
}, callback);
},
// get the lines
function(callback) {
getPadLines(padId, startNum-1, function(err, _lines) {
if (ERR(err, callback)) return;
lines = _lines;
callback();
});
}
], callback);
},
// don't know what happens here exactly :/
function(callback) {
var compositeStart = startNum;
while (compositeStart < endNum) {
var compositeEnd = compositeStart + granularity;
if (compositeEnd > endNum || compositeEnd > head_revision+1) {
break;
}
var forwards = composedChangesets[compositeStart + "/" + compositeEnd];
var backwards = Changeset.inverse(forwards, lines.textlines, lines.alines, pad.apool());
Changeset.mutateAttributionLines(forwards, lines.alines, pad.apool());
Changeset.mutateTextLines(forwards, lines.textlines);
var forwards2 = Changeset.moveOpsToNewPool(forwards, pad.apool(), apool);
var backwards2 = Changeset.moveOpsToNewPool(backwards, pad.apool(), apool);
var t1, t2;
if (compositeStart == 0) {
t1 = revisionDate[0];
} else {
t1 = revisionDate[compositeStart - 1];
}
t2 = revisionDate[compositeEnd - 1];
timeDeltas.push(t2 - t1);
forwardsChangesets.push(forwards2);
backwardsChangesets.push(backwards2);
compositeStart += granularity;
}
callback();
}
],
function(err) {
if (ERR(err, callback)) return;
callback(null, {forwardsChangesets: forwardsChangesets,
backwardsChangesets: backwardsChangesets,
apool: apool.toJsonable(),
actualEndNum: endNum,
timeDeltas: timeDeltas,
start: startNum,
granularity: granularity });
// get the lines
let lines;
let p3 = getPadLines(padId, startNum - 1).then(_lines => {
lines = _lines;
});
});
// wait for all of the above to complete
await Promise.all([p1, p2, p3]);
// doesn't know what happens here exactly :/
let timeDeltas = [];
let forwardsChangesets = [];
let backwardsChangesets = [];
let apool = new AttributePool();
for (let compositeStart = startNum; compositeStart < endNum; compositeStart += granularity) {
let compositeEnd = compositeStart + granularity;
if (compositeEnd > endNum || compositeEnd > head_revision + 1) {
break;
}
let forwards = composedChangesets[compositeStart + "/" + compositeEnd];
let backwards = Changeset.inverse(forwards, lines.textlines, lines.alines, pad.apool());
Changeset.mutateAttributionLines(forwards, lines.alines, pad.apool());
Changeset.mutateTextLines(forwards, lines.textlines);
let forwards2 = Changeset.moveOpsToNewPool(forwards, pad.apool(), apool);
let backwards2 = Changeset.moveOpsToNewPool(backwards, pad.apool(), apool);
let t1 = (compositeStart == 0) ? revisionDate[0] : revisionDate[compositeStart - 1];
let t2 = revisionDate[compositeEnd - 1];
timeDeltas.push(t2 - t1);
forwardsChangesets.push(forwards2);
backwardsChangesets.push(backwards2);
}
return { forwardsChangesets, backwardsChangesets,
apool: apool.toJsonable(), actualEndNum: endNum,
timeDeltas, start: startNum, granularity };
}
/**
* Tries to rebuild the getPadLines function of the original Etherpad
* https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L263
*/
let getPadLines = thenify(function getPadLines(padId, revNum, callback)
async function getPadLines(padId, revNum)
{
var atext;
var result = {};
var pad;
let pad = padManager.getPad(padId);
async.series([
// get the pad from the database
function(callback) {
padManager.getPad(padId, function(err, _pad) {
if (ERR(err, callback)) return;
// get the atext
let atext;
pad = _pad;
callback();
});
},
if (revNum >= 0) {
atext = await pad.getInternalRevisionAText(revNum);
} else {
atext = Changeset.makeAText("\n");
}
// get the atext
function(callback) {
if (revNum >= 0) {
pad.getInternalRevisionAText(revNum, function(err, _atext) {
if (ERR(err, callback)) return;
atext = _atext;
callback();
});
} else {
atext = Changeset.makeAText("\n");
callback(null);
}
},
function(callback) {
result.textlines = Changeset.splitTextLines(atext.text);
result.alines = Changeset.splitAttributionLines(atext.attribs, atext.text);
callback(null);
}
],
function(err) {
if (ERR(err, callback)) return;
callback(null, result);
});
});
return {
textlines: Changeset.splitTextLines(atext.text),
alines: Changeset.splitAttributionLines(atext.attribs, atext.text)
};
}
/**
* Tries to rebuild the composePadChangeset function of the original Etherpad
* https://github.com/ether/pad/blob/master/etherpad/src/etherpad/control/pad/pad_changeset_control.js#L241
*/
let composePadChangesets = thenify(function(padId, startNum, endNum, callback)
async function composePadChangesets (padId, startNum, endNum)
{
var pad;
var changesets = {};
var changeset;
let pad = await padManager.getPad(padId);
async.series([
// get the pad from the database
function(callback) {
padManager.getPad(padId, function(err, _pad) {
if (ERR(err, callback)) return;
// fetch all changesets we need
let headNum = pad.getHeadRevisionNumber();
endNum = Math.min(endNum, headNum + 1);
startNum = Math.max(startNum, 0);
pad = _pad;
callback();
});
},
// create an array for all changesets, we will
// replace the values with the changeset later
let changesetsNeeded = [];
for (let r = startNum ; r < endNum; r++) {
changesetsNeeded.push(r);
}
// fetch all changesets we need
function(callback) {
var changesetsNeeded=[];
// get all changesets
let changesets = {};
await Promise.all(changesetsNeeded.map(revNum => {
return pad.getRevisionChangeset(revNum).then(changeset => changesets[revNum] = changeset);
}));
var headNum = pad.getHeadRevisionNumber();
if (endNum > headNum + 1) {
endNum = headNum + 1;
}
// compose Changesets
try {
let changeset = changesets[startNum];
let pool = pad.apool();
if (startNum < 0) {
startNum = 0;
}
// create an array for all changesets, we will
// replace the values with the changeset later
for (var r = startNum; r < endNum; r++) {
changesetsNeeded.push(r);
}
// get all changesets
async.forEach(changesetsNeeded, function(revNum,callback) {
pad.getRevisionChangeset(revNum, function(err, value) {
if (ERR(err, callback)) return;
changesets[revNum] = value;
callback();
});
},callback);
},
// compose Changesets
function(callback) {
changeset = changesets[startNum];
var pool = pad.apool();
try {
for (var r = startNum + 1; r < endNum; r++) {
var cs = changesets[r];
changeset = Changeset.compose(changeset, cs, pool);
}
} catch(e) {
// r-1 indicates the rev that was build starting with startNum, applying startNum+1, +2, +3
console.warn("failed to compose cs in pad:", padId, " startrev:", startNum, " current rev:", r);
return callback(e);
}
callback(null);
for (let r = startNum + 1; r < endNum; r++) {
let cs = changesets[r];
changeset = Changeset.compose(changeset, cs, pool);
}
],
return changeset;
// return err and changeset
function(err) {
if (ERR(err, callback)) return;
callback(null, changeset);
});
});
} catch (e) {
// r-1 indicates the rev that was build starting with startNum, applying startNum+1, +2, +3
console.warn("failed to compose cs in pad:", padId, " startrev:", startNum," current rev:", r);
throw e;
}
}
function _getRoomClients(padID) {
var roomClients = [];