diff --git a/lib/storage/metadata/oplog/BucketdOplogInterface.js b/lib/storage/metadata/oplog/BucketdOplogInterface.js new file mode 100644 index 000000000..7550a1f2f --- /dev/null +++ b/lib/storage/metadata/oplog/BucketdOplogInterface.js @@ -0,0 +1,251 @@ +/* + * Main interface for bucketd oplog management + */ +const async = require('async'); +const { RESTClient: BucketClient } = require('bucketclient'); +const { jsutil, errors } = require('arsenal'); +const LogConsumer = require('arsenal/lib/storage/metadata/bucketclient/LogConsumer'); +const { isMasterKey } = require('arsenal/lib/versioning/Version'); +const OplogInterface = require('./OplogInterface'); + +class BucketdOplogInterface extends OplogInterface { + + constructor(params) { + super(params); + this.backendRetryTimes = 3; + this.backendRetryInterval = 300; + this.bucketdOplogQuerySize = 20; + this.stopAt = params?.stopAt ?? -1; + const bkBootstrap = params?.bootstrap ?? ['localhost:9000']; + this.bkClient = new BucketClient(bkBootstrap); + } + + start(filter, cb) { + if (!(filter.filterType === 'bucket' || + filter.filterType === 'raftSession')) { + return cb(errors.NotImplemented); + } + const filterName = filter.filterName; + async.waterfall([ + /* + * In this step we get the raftId for filterName + */ + next => { + if (filter.filterType === 'raftSession') { + return next(null, filter.raftSession.raftId); + } + this.logger.info('obtaining raftId', + { filterName }); + async.retry( + { + times: this.backendRetryTimes, + interval: this.backendRetryInterval, + }, + done => { + this.bkClient.getBucketInformation( + filter.bucket.bucketName, + null, + (err, info) => { + if (err) { + this.logger.info('retrying getBucketInformation', { err, filterName }); + return done(err); + } + return done(null, JSON.parse(info)); + }); + }, + (err, res) => { + if (err) { + this.logger.error('getBucketInformation too many failures', { err, filterName }); + return next(err); + } + return next(null, res.raftSessionId); + }); + return undefined; + }, + /* + * In this step we get the stored offset if we have it + */ + (raftId, next) => { + let cseq = undefined; + this.persist.load(filterName, this.persistData, (err, offset) => { + if (err) { + return next(err); + } + cseq = offset; + return next(null, raftId, cseq); + }); + }, + /* + * In this step we acquire the offset if we don't already have it + */ + (raftId, cseq, next) => { + if (cseq !== undefined) { + this.logger.info(`skipping cseq acquisition (cseq=${cseq})`, + { filterName }); + return next(null, raftId, cseq, true); + } + this.logger.info('cseq acquisition', + { filterName }); + async.retry( + { + times: this.backendRetryTimes, + interval: this.backendRetryInterval, + }, + done => { + this.bkClient.getRaftLog( + raftId, + 1, + 1, + true, + null, + (err, stream) => { + if (err) { + this.logger.info('retrying getRaftLog', { err, filterName }); + return done(err); + } + const chunks = []; + stream.on('data', chunk => { + chunks.push(chunk); + }); + stream.on('end', () => { + const info = JSON.parse(Buffer.concat(chunks)); + return done(null, info); + }); + return undefined; + }); + }, + (err, res) => { + if (err) { + this.logger.error('getRaftLog too many failures', { err, filterName }); + return next(err); + } + return next(null, raftId, res.info.cseq, false); + }); + return undefined; + }, + /* + * In this step we init the state (e.g. scan) + */ + (raftId, cseq, skipInit, next) => { + if (skipInit) { + this.logger.info(`skipping state initialization cseq=${cseq}`, + { filterName }); + return next(null, raftId, cseq); + } + this.logger.info(`initializing state cseq=${cseq}`, + { filterName }); + this.persistData.initState(err => { + if (err) { + return next(err); + } + this.persist.save( + filterName, this.persistData, cseq, err => { + if (err) { + return next(err); + } + return next(null, raftId, cseq); + }); + return undefined; + }); + return undefined; + }, + /* + * In this step we loop over the oplog + */ + (raftId, cseq, next) => { + this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`, + { filterName }); + // only way to get out of the loop in all cases + const nextOnce = jsutil.once(next); + let doStop = false; + // resume reading the oplog from cseq. changes are idempotent + const logConsumer = new LogConsumer({ + bucketClient: this.bkClient, + raftSession: raftId, + }); + let _cseq = cseq; + async.until( + () => doStop, + _next => { + logConsumer.readRecords({ + startSeq: _cseq, + limit: this.bucketdOplogQuerySize, + }, (err, record) => { + if (err) { + this.logger.error('readRecords error', { err, filterName }); + return setTimeout(() => _next(), 5000); + } + if (!record.log) { + // nothing to read + return setTimeout(() => _next(), 5000); + } + const seqs = []; + record.log.on('data', chunk => { + seqs.push(chunk); + }); + record.log.on('end', () => { + const addQueue = []; + const delQueue = []; + for (let i = 0; i < seqs.length; i++) { + if (filter.filterType === 'raftSession' || + (filter.filterType === 'bucket' && + seqs[i].db === filter.bucket.bucketName)) { + for (let j = 0; j < seqs[i].entries.length; j++) { + const _item = {}; + _item.bucketName = seqs[i].db; + _item.key = seqs[i].entries[j].key; + if (seqs[i].entries[j].type !== undefined && + seqs[i].entries[j].type === 'del') { + if (!isMasterKey(_item.key)) { + // ignore for now + return; + } + delQueue.push(_item); + } else { + _item.value = Object.assign({}, seqs[i].entries[j].value); + addQueue.push(_item); + } + } + } + } + this.persistData.updateState( + addQueue, delQueue, err => { + if (err) { + return _next(err); + } + _cseq += seqs.length; + this.persist.save( + filterName, this.persistData, _cseq, err => { + if (err) { + return _next(err); + } + if (_cseq > this.stopAt) { + doStop = true; + } + return _next(); + }); + return undefined; + }); + }); + return undefined; + }); + }, err => { + if (err) { + return nextOnce(err); + } + return nextOnce(); + }); + }, + ], err => { + if (err) { + return cb(err); + } + this.logger.info('returning', + { filterName }); + return cb(); + }); + return undefined; + } +} + +module.exports = BucketdOplogInterface; diff --git a/lib/storage/metadata/oplog/MongoOplogInterface.js b/lib/storage/metadata/oplog/MongoOplogInterface.js new file mode 100644 index 000000000..a5369b4fd --- /dev/null +++ b/lib/storage/metadata/oplog/MongoOplogInterface.js @@ -0,0 +1,163 @@ +/* + * Main interface for Mongo oplog management + */ +const MongoClient = require('mongodb').MongoClient; +const bson = require('bson'); +const { jsutil, errors } = require('arsenal'); +const async = require('async'); +const { isMasterKey } = require('arsenal/lib/versioning/Version'); +const OplogInterface = require('./OplogInterface'); + +class MongoOplogInterface extends OplogInterface { + + constructor(params) { + super(params); + this.mongoDbUri = 'mongodb://localhost:27017'; + if (params && params.mongoDbUri !== undefined) { + this.mongoDbUri = params.mongoDbUri; + } + this.databaseName = 'metadata'; + if (params && params.databaseName !== undefined) { + this.databaseName = params.databaseName; + } + } + + start(filter, cb) { + if (filter.filterType !== 'bucket') { + return cb(errors.NotImplemented); + } + const filterName = filter.filterName; + const bucketName = filter.bucket.bucketName; + let db; + let collection; + async.waterfall([ + /* + * In this step we connect to MongoDB + */ + next => { + MongoClient.connect( + this.mongoDbUri, + (err, client) => { + if (err) { + this.logger.error('error connecting to mongodb', { err, filterName }); + return next(err); + } + db = client.db(this.databaseName, { + ignoreUndefined: true, + }); + collection = db.collection(bucketName); + return next(); + }); + }, + /* + * In this step we get the stored offset if we have it + */ + next => { + let resumeToken = undefined; + this.persist.load(filterName, this.persistData, (err, offset) => { + if (err) { + return next(err); + } + if (offset && offset._data) { + resumeToken = {}; + resumeToken._data = new bson.Binary(Buffer.from(offset._data, 'base64')); + } + return next(null, resumeToken); + }); + }, + /* + * In this step we acquire the offset if we don't already have it + */ + (resumeToken, next) => { + if (resumeToken !== undefined) { + this.logger.info( + `skipping resumeToken acquisition (resumeToken=${resumeToken})`, + { filterName }); + return next(null, resumeToken, true); + } + this.logger.info('resumeToken acquisition', + { filterName }); + const changeStream = collection.watch(); + // big hack to extract resumeToken + changeStream.once('change', () => next(null, changeStream.resumeToken, false)); + return undefined; + }, + /* + * In this step we init the state (e.g. scan) + */ + (resumeToken, skipInit, next) => { + if (skipInit) { + this.logger.info(`skipping state initialization resumeToken=${resumeToken}`, + { filterName }); + return next(null, resumeToken); + } + this.logger.info(`initializing state resumeToken=${resumeToken}`, + { filterName }); + this.persistData.initState( + err => { + if (err) { + // eslint-disable-next-line + console.error(err); + process.exit(1); + } + this.persist.save( + filterName, this.persistData, resumeToken, err => { + if (err) { + return next(err); + } + return next(null, resumeToken); + }); + return undefined; + }); + return undefined; + }, + /* + * In this step we loop over the oplog + */ + (resumeToken, next) => { + this.logger.info(`reading oplog resumeToken=${resumeToken}`, + { filterName }); + // only way to get out of the loop in all cases + const nextOnce = jsutil.once(next); + // read the change stream + const changeStream = collection.watch({ resumeAfter: resumeToken }); + // start bufferization + this.filterName = filterName; + this.startFlusher(); + changeStream.on( + 'change', item => { + if (item.ns.db === this.databaseName) { + const _item = {}; + _item.bucketName = bucketName; + _item.key = item.documentKey._id; + if (item.operationType === 'insert' || + item.operationType === 'replace') { + _item.value = Object.assign({}, item.fullDocument.value); + this.addEvent(_item, changeStream.resumeToken); + } else if (item.operationType === 'delete') { + if (!isMasterKey(_item.key)) { + // ignore for now + return; + } + this.delEvent(_item, changeStream.resumeToken); + } else if (item.operationType === 'invalidate') { + nextOnce(); + return; + } else { + return; + } + } + }); + }], err => { + if (err) { + return cb(err); + } + this.logger.info('returning', + { filterName }); + return cb(); + }); + return undefined; + } +} + +module.exports = MongoOplogInterface; diff --git a/lib/storage/metadata/oplog/OplogInterface.js b/lib/storage/metadata/oplog/OplogInterface.js new file mode 100644 index 000000000..81b28d638 --- /dev/null +++ b/lib/storage/metadata/oplog/OplogInterface.js @@ -0,0 +1,138 @@ +/* + * Interface for oplog management + * + * filter is an object with the following structure: { + * filterName: string, + * filterType: bucket|bucketList|raftSession, + * bucket: { + * bucketName: string, + * }, + * bucketList: { + * bucketList: [string, ...] + * }, + * raftSession: { + * raftId: number, + * }, + * } + * + * persist is an interface with the following methods: + * - constructor(params) + * - load(filterName, persistData, cb(err, offset)) + * - save(filterName, persistData, offset, cb(err)) + + * persistData is an interface with the following methods: + * - constuctor(params) + * - initState(cb(err)): initialize the structure, e.g. initial bucket scan + * - loadState(stream, cb(err)): load the state + * - saveState(stream, cb(err)): save the state + * - updateState(addQueue, delQueue, cb(err)): update the state + * item: { filterName, key, value } + */ +const werelogs = require('werelogs'); + +werelogs.configure({ + level: 'info', + dump: 'error', +}); + +class OplogInterface { + + constructor(params) { + this.persist = params?.persist; + this.persistData = params?.persistData; + this.logger = new werelogs.Logger('OplogInterface'); + /* for backends requiring bufferization only */ + this.bufferTimeoutMs = params?.bufferTimeoutMs ?? 500; + this.addQueue = []; + this.delQueue = []; + this.pending = false; + this.prevOffset = null; + this.offset = null; + } + + addEvent(item, offset) { + this.addQueue.push(item); + this.offset = offset; + } + + delEvent(item, offset) { + this.delQueue.push(item); + this.offset = offset; + } + + /* + * Optional buffer management for backends that don't bufferize. + * It avoids persisting the state at each event + */ + flushQueue(cb) { + if (this.offset === null || + this.prevOffset === this.offset) { + if (cb) { + return process.nextTick(cb); + } + return undefined; + } + if (this.pending) { + if (cb) { + return process.nextTick(cb); + } + return undefined; + } + this.pending = true; + const addQueue = this.addQueue; + this.addQueue = []; + const delQueue = this.delQueue; + this.delQueue = []; + const offset = this.offset; + this.prevOffset = this.offset; + this.persistData.updateState( + addQueue, + delQueue, + err => { + if (err) { + if (cb) { + return cb(err); + } + } + this.persist.save( + this.filterName, + this.persistData, + offset, + err => { + this.pending = false; + if (err) { + if (cb) { + return cb(err); + } + return undefined; + } + if (cb) { + return cb(); + } + return undefined; + }); + return undefined; + }); + return undefined; + } + + doFlush() { + this.flushQueue(err => { + if (err) { + this.logger.error('flusing buffer', { err }); + } + }); + this.startFlusher(); + } + + startFlusher() { + setTimeout(this.doFlush.bind(this), this.bufferTimeoutMs); + } + + // method to be overridden + start() { + throw new Error('not implemented'); + } +} + +module.exports = OplogInterface; diff --git a/lib/storage/metadata/oplog/PersistFileInterface.js b/lib/storage/metadata/oplog/PersistFileInterface.js new file mode 100644 index 000000000..4a5ca2f59 --- /dev/null +++ b/lib/storage/metadata/oplog/PersistFileInterface.js @@ -0,0 +1,91 @@ +const fs = require('fs'); +const werelogs = require('werelogs'); + +werelogs.configure({ + level: 'info', + dump: 'error', +}); + +class PersistFileInterface { + + constructor() { + this.folder = '/tmp'; + this.logger = new werelogs.Logger('PersistFileInterface'); + fs.access(this.folder, err => { + if (err) { + fs.mkdirSync(this.folder, { recursive: true }); + } + }); + } + + getFileName(filterName) { + return `${this.folder}/${filterName}.json`; + } + + getOffsetFileName(filterName) { + return `${this.folder}/${filterName}.offset.json`; + } + + load(filterName, persistData, cb) { + const fileName = this.getFileName(filterName); + const offsetFileName = this.getOffsetFileName(filterName); + let obj = {}; + fs.readFile( + offsetFileName, + 'utf-8', (err, data) => { + if (err) { + if (err.code === 'ENOENT') { + this.logger.info(`${offsetFileName} non-existent`); + } else { + this.logger.error('error loading', { err }); + return cb(err); + } + } else { + obj = JSON.parse(data); + } + if (fs.existsSync(fileName)) { + const file = fs.createReadStream(fileName); + persistData.loadState(file, err => { + if (err) { + return cb(err); + } + this.logger.info(`${fileName} loaded: offset ${obj.offset}`); + return cb(null, obj.offset); + }); + } else { + this.logger.info(`${fileName} non-existent`); + return cb(null, obj.offset); + } + return undefined; + }); + } + + save(filterName, persistData, offset, cb) { + const fileName = this.getFileName(filterName); + const offsetFileName = this.getOffsetFileName(filterName); + const file = fs.createWriteStream(fileName); + persistData.saveState(file, err => { + if (err) { + return cb(err); + } + const obj = { + offset, + }; + fs.writeFile( + offsetFileName, JSON.stringify(obj), + 'utf-8', + err => { + if (err) { + this.logger.error('error saving', { err }); + return cb(err); + } + this.logger.info(`${fileName} saved: offset ${offset}`); + return cb(); + }); + return undefined; + }); + return undefined; + } +} + +module.exports = PersistFileInterface; diff --git a/lib/storage/metadata/oplog/PersistMemInterface.js b/lib/storage/metadata/oplog/PersistMemInterface.js new file mode 100644 index 000000000..747f16e6a --- /dev/null +++ b/lib/storage/metadata/oplog/PersistMemInterface.js @@ -0,0 +1,73 @@ +// fake backend for unit tests +const assert = require('assert'); +const MemoryStream = require('memorystream'); +const werelogs = require('werelogs'); + +werelogs.configure({ + level: 'info', + dump: 'error', +}); + +class PersistMemInterface { + + constructor() { + this.memoryStreams = {}; + this.offsets = {}; + this.logger = new werelogs.Logger('PersistMemInterface'); + } + + getOffset(filterName) { + if (!this.offsets[filterName]) { + return {}; + } + return this.offsets[filterName]; + } + + setOffset(filterName, offset) { + if (!this.memoryStreams[filterName]) { + this.memoryStreams[filterName] = new MemoryStream(); + } + if (!this.offsets[filterName]) { + this.offsets[filterName] = {}; + } + Object.assign( + this.offsets[filterName], + offset); + } + + load(filterName, persistData, cb) { + this.logger.info(`loading ${filterName}`); + const stream = this.memoryStreams[filterName]; + const offset = this.offsets[filterName]; + if (stream === undefined) { + this.logger.info(`${filterName} non-existent`); + return cb(null, undefined); + } + assert(offset !== undefined); + persistData.loadState(stream, err => { + if (err) { + return cb(err); + } + this.logger.info(`${filterName} loaded: offset ${offset}`); + return cb(null, offset); + }); + return undefined; + } + + save(filterName, persistData, offset, cb) { + this.logger.info(`saving ${filterName} offset ${JSON.stringify(offset)}`); + const stream = new MemoryStream(); + this.memoryStreams[filterName] = stream; + persistData.saveState(stream, err => { + if (err) { + return cb(err); + } + this.offsets[filterName] = offset; + this.logger.info(`${filterName} saved: offset ${offset}`); + return cb(); + }); + } +} + +module.exports = PersistMemInterface; + diff --git a/lib/storage/metadata/oplog/PersistRingInterface.js b/lib/storage/metadata/oplog/PersistRingInterface.js new file mode 100644 index 000000000..8e1961108 --- /dev/null +++ b/lib/storage/metadata/oplog/PersistRingInterface.js @@ -0,0 +1,283 @@ +// Ring backend that persists on Sproxyd and offsets on ZK +const async = require('async'); +const { pipeline } = require('stream'); +const MemoryStream = require('memorystream'); +const zlib = require('zlib'); +const zookeeper = require('node-zookeeper-client'); +const Sproxy = require('sproxydclient'); +const werelogs = require('werelogs'); + +werelogs.configure({ + level: 'info', + dump: 'error', +}); + +class PersistRingInterface { + + constructor(params) { + let zkConnectionString = 'localhost:2181'; + if (params && params.zkConnectionString !== undefined) { + zkConnectionString = params.zkConnectionString; + } + this.zkPath = '/persist-ring-interface'; + if (params && params.zkPath !== undefined) { + this.zkPath = params.zkPath; + } + let spPath = '/proxy/DC1/'; // do not forget "/" at the end !!! + if (params && params.spPath !== undefined) { + spPath = params.spPath; + } + let spBootstrap = ['localhost:8181']; + if (params && params.spBootstrap !== undefined) { + spBootstrap = params.spBootstrap; + } + this.reqUid = 'persist-ring-interface-req-uid'; + this.logger = new werelogs.Logger('PersistRingInterface'); + this.zkClient = zookeeper.createClient(zkConnectionString); + this.zkClient.connect(); + this.zkClient.on('error', err => { + this.logger.error('error connecting', { err }); + }); + this.zkClient.once('connected', () => { + this.logger.info('connected'); + }); + this.spClient = new Sproxy({ + bootstrap: spBootstrap, + path: spPath, + }); + } + + getZKPath(filterName) { + return `${this.zkPath}/${filterName}`; + } + + load(filterName, persistData, cb) { + this.logger.info(`loading ${filterName}`); + async.waterfall([ + /* + * Check of we have an existing Zookeeper node + */ + next => { + this.zkClient.getData( + this.getZKPath(filterName), + (err, data) => { + if (err) { + if (err.name === 'NO_NODE') { + this.logger.info(`${filterName} non-existent`); + } else { + this.logger.error(`getData ${filterName} error`, { err }); + } + return next(err); + } + return next(null, data); + }); + }, + /* + * Extract the Sproxyd key from the Zookeeper node. + * Read the key from Sproxyd. + */ + (data, next) => { + const _data = JSON.parse(data.toString()); + this.spClient.get( + _data.key, + undefined, + this.reqUid, + (err, stream) => { + if (err) { + this.logger.error(`sproxyd ${filterName} error`, { err }); + return next(err); + } + return next(null, _data, stream); + }); + }, + /* + * Uncompress the stream in memory + */ + (_data, stream, next) => { + const ostream = new MemoryStream(); + pipeline( + stream, + zlib.createGunzip(), + ostream, + err => { + if (err) { + this.logger.error(`pipeline ${filterName} error`, { err }); + return next(err); + } + return next(null, _data, ostream); + }); + }, + /* + * Load the state from uncompressed stream + */ + (_data, stream, next) => { + persistData.loadState(stream, err => { + if (err) { + this.logger.error(`load ${filterName} error`, { err }); + return next(err); + } + this.logger.info(`${filterName} loaded: offset ${_data.offset}`); + return next(null, _data); + }); + }], (err, _data) => { + if (err) { + if (err.name === 'NO_NODE') { + return cb(null, undefined); + } + this.logger.error(`load ${filterName} error`, { err }); + return cb(err); + } + return cb(null, _data.offset); + }); + } + + save(filterName, persistData, offset, cb) { + this.logger.info(`saving ${filterName} offset ${offset}`); + async.waterfall([ + /* + * Save the state in a memory stream + */ + next => { + const stream = new MemoryStream(); + persistData.saveState( + stream, err => { + if (err) { + this.logger.error(`save ${filterName} error`, { err }); + return next(err); + } + return next(null, stream); + }); + }, + /* + * Compress the state in memory + */ + (stream, next) => { + const ostream = new MemoryStream(); + pipeline( + stream, + zlib.createGzip(), + ostream, + err => { + if (err) { + this.logger.error(`pipeline ${filterName} error`, { err }); + return next(err); + } + return next(null, ostream); + }); + }, + /* + * Store the state in Sproxyd + */ + (stream, next) => { + const parameters = { + filterName, + namespace: 'persist-ring-interface', + owner: 'persist-ring-interface', + }; + const size = stream._readableState.length; + this.spClient.put( + stream, + size, + parameters, + this.reqUid, + (err, key) => { + if (err) { + this.logger.error(`sproxyd put ${filterName} error`, { err }); + return next(err); + } + const newData = {}; + newData.offset = offset; + newData.key = key; + return next(null, newData); + }); + }, + /* + * Check if the Zookeeper node exists + */ + (newData, next) => { + this.zkClient.exists( + this.getZKPath(filterName), + (err, stat) => { + if (err) { + this.logger.error(`exists ${filterName} error`, { err }); + return next(err); + } + let doesExist = false; + if (stat) { + doesExist = true; + } + return next(null, newData, doesExist); + }); + }, + /* + * If the Zookeeper node exists read it. + * Else create it. + */ + (newData, doesExist, next) => { + if (doesExist) { + this.zkClient.getData( + this.getZKPath(filterName), + (err, _oldData) => { + if (err) { + this.logger.error(`getData ${filterName} error`, { err }); + return next(err); + } + const oldData = JSON.parse(_oldData); + return next(null, newData, oldData); + }); + } else { + this.zkClient.mkdirp( + this.getZKPath(filterName), + null, + err => { + if (err) { + this.logger.error(`mkdirp ${filterName} error`, { err }); + return next(err); + } + return next(null, newData, null); + }); + } + }, + /* + * Store the context in the Zookeeper node and delete the old sproxyd key. + */ + (newData, oldData, next) => { + const _newData = JSON.stringify(newData); + this.zkClient.setData( + this.getZKPath(filterName), + Buffer.from(_newData), + err => { + if (err) { + this.logger.error(`setData ${filterName} error`, { err }); + return cb(err); + } + this.logger.info(`${filterName} saved: new key ${newData.key} offset ${offset}`); + if (oldData) { + this.spClient.delete( + oldData.key, + this.reqUid, + err => { + if (err) { + this.logger.error( + `sproxyd del ${filterName} old key ${oldData.key} error`, + { err }); + return next(err); + } + return next(); + }); + } else { + return next(); + } + return undefined; + }); + }], err => { + if (err) { + this.logger.error(`save ${filterName} error`, { err }); + return cb(err); + } + return cb(); + }); + } +} + +module.exports = PersistRingInterface; diff --git a/package.json b/package.json index da91d5e7a..36b66c05f 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "engines": { "node": ">=16" }, - "version": "7.10.13", + "version": "7.10.14", "description": "Common utilities for the S3 project components", "main": "build/index.js", "repository": { @@ -36,8 +36,10 @@ "ipaddr.js": "1.9.1", "level": "~5.0.1", "level-sublevel": "~6.6.5", + "memorystream": "^0.3.1", "mongodb": "^3.0.1", "node-forge": "^0.7.1", + "node-zookeeper-client": "^0.2.2", "prom-client": "10.2.3", "simple-glob": "^0.2", "socket.io": "~2.3.0", @@ -57,6 +59,7 @@ "@sinonjs/fake-timers": "^6.0.1", "@types/jest": "^27.4.1", "@types/node": "^17.0.21", + "argparse": "^2.0.1", "eslint": "2.13.1", "eslint-config-airbnb": "6.2.0", "eslint-config-scality": "scality/Guidelines#7.10.2", @@ -64,6 +67,7 @@ "jest": "^27.5.1", "mocha": "8.0.1", "mongodb-memory-server": "^6.0.2", + "prando": "^6.0.1", "sinon": "^9.0.2", "temp": "0.9.1", "ts-jest": "^27.1.3", diff --git a/tests/unit/storage/metadata/FakeBucketInfo.json b/tests/unit/storage/metadata/FakeBucketInfo.json new file mode 100644 index 000000000..8f738f2cf --- /dev/null +++ b/tests/unit/storage/metadata/FakeBucketInfo.json @@ -0,0 +1,26 @@ +{ + "acl": { + "Canned": "private", + "FULL_CONTROL": [], + "WRITE": [], + "WRITE_ACP": [], + "READ": [], + "READ_ACP": [] + }, + "name": "BucketName", + "owner": "9d8fe19a78974c56dceb2ea4a8f01ed0f5fecb9d29f80e9e3b84104e4a3ea520", + "ownerDisplayName": "anonymousCoward", + "creationDate": "2018-06-04T17:45:42.592Z", + "mdBucketModelVersion": 8, + "transient": false, + "deleted": false, + "serverSideEncryption": null, + "versioningConfiguration": null, + "websiteConfiguration": null, + "locationConstraint": "us-east-1", + "readLocationConstraint": "us-east-1", + "cors": null, + "replicationConfiguration": null, + "lifecycleConfiguration": null, + "uid": "fea97818-6a9a-11e8-9777-e311618cc5d4" +} diff --git a/tests/unit/storage/metadata/Injector.js b/tests/unit/storage/metadata/Injector.js new file mode 100644 index 000000000..a827bfd7c --- /dev/null +++ b/tests/unit/storage/metadata/Injector.js @@ -0,0 +1,268 @@ +const async = require('async'); +const assert = require('assert'); +const Prando = require('prando'); +const werelogs = require('werelogs'); +const MetadataWrapper = require('../../../../lib/storage/metadata/MetadataWrapper'); +const fakeBucketInfo = require('./FakeBucketInfo.json'); + +werelogs.configure({ + level: 'info', + dump: 'error', +}); + +class Injector { + + /** + * @constructor + * + * @param {Object} backend - Metadata backend to use for injection + * @param {Object} logger - Logger to use + */ + constructor(backend, logger) { + this.backend = backend; + this.rnd = new Prando(0); + this.logger = logger; + } + + get opPut() { + return 1; + } + + get opDelete() { + return 0; + } + + genKey(len) { + const characters = + 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; + return this.rnd.nextString(len, characters); + } + + genBase16(len) { + const characters = 'abcdef0123456789'; + return this.rnd.nextString(len, characters); + } + + _zeroPad(s, n, width) { + return s + n.toString().padStart(width, '0'); + } + + /** + * Deterministic injection of puts or deletes according to parameters + * + * @param {String} bucketName - bucket name to inject to + * @param {Object} params - parameters for injection + * @param {Array} inputKeys - optional keys to use as input + * @param {Array} outputKeys - optional generated keys as output + * @param {Array} outputValues - optional generated values as output + * @param {function} cb - callback when done + * + * @return {undefined} + */ + inject(bucketName, params, inputKeys, outputKeys, outputValues, cb) { + let maxKeyLen = 1024; + if (params.maxKeyLen !== undefined) { + maxKeyLen = params.maxKeyLen; + } + async.timesLimit( + params.numKeys, + 10, + (n, next) => { + let key; + if (inputKeys) { + const idx = this.rnd.nextInt(0, inputKeys.length - 1); + key = inputKeys[idx]; + inputKeys.splice(idx, 1); + } else { + if (params.randomKey) { + const len = this.rnd.nextInt(1, maxKeyLen); + key = this.genKey(len); + } else { + let x; + if (params.randomSeq) { + x = this.rnd.nextInt(0, params.maxSeq - 1); + } else { + x = n; + } + key = this._zeroPad(params.prefix, x, 10) + + params.suffix; + } + } + if (outputKeys) { + outputKeys.push(key); + } + // eslint-disable-next-line + const value = { + versionId: this.genBase16(32), + 'content-length': this.rnd.nextInt(0, 5000000), + 'content-md5': this.genBase16(32), + }; + if (outputValues) { + outputValues.push(value); + } + if (params.op === this.opPut) { + this.backend.putObjectMD( + bucketName, + key, + value, + {}, + this.logger, + next); + return undefined; + } else if (params.op === this.opDelete) { + this.backend.deleteObjectMD( + bucketName, + key, + {}, + this.logger, + err => { + if (err) { + if (err.code !== 404) { + return next(err); + } + } + return next(); + }); + return undefined; + } + return next(new Error('unknow op')); + }, + err => { + if (err) { + // eslint-disable-next-line + console.error('inject error', err); + process.exit(1); + } + if (cb) { + return cb(); + } + return undefined; + }); + } +} + +module.exports = Injector; + +describe('Injector', () => { + const fakeBucket = 'fake'; + const logger = new werelogs.Logger('Injector'); + const memBackend = new MetadataWrapper( + 'mem', {}, null, logger); + + before(done => { + memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, done); + }); + + after(done => { + memBackend.deleteBucket(fakeBucket, logger, done); + }); + + it('zeropad', () => { + const injector = new Injector(memBackend, logger); + assert(injector._zeroPad('foo', 42, 10) === 'foo0000000042'); + }); + + it('inject inputKeys', done => { + const injector = new Injector(memBackend, logger); + const inputKeys = ['foo1', 'foo2', 'foo3']; + const outputKeys = []; + injector.inject( + fakeBucket, + { + op: injector.opPut, + numKeys: 3, + }, + inputKeys, + outputKeys, + null, + err => { + if (err) { + return done(err); + } + assert.deepEqual(outputKeys, ['foo2', 'foo1', 'foo3']); + return done(); + }); + }); + + it('inject sequence', done => { + const injector = new Injector(memBackend, logger); + const outputKeys = []; + injector.inject( + fakeBucket, + { + prefix: 'foo', + suffix: 'x', + randomSeq: true, + maxSeq: 10, + op: injector.opPut, + numKeys: 3, + }, + null, + outputKeys, + null, + err => { + if (err) { + return done(err); + } + assert.deepEqual( + outputKeys, + [ + 'foo0000000005x', + 'foo0000000001x', + 'foo0000000000x', + ]); + return done(); + }); + }); + + it('inject random', done => { + const injector = new Injector(memBackend, logger); + const outputKeys = []; + const outputValues = []; + injector.inject( + fakeBucket, + { + prefix: 'foo', + suffix: 'x', + randomKey: true, + maxKeyLen: 10, + op: injector.opPut, + numKeys: 3, + }, + null, + outputKeys, + outputValues, + err => { + if (err) { + return done(err); + } + assert.deepEqual( + outputKeys, + [ + 'f5TJ6X', + '7T', + 'LStNJxHS8', + ]); + assert.deepEqual( + outputValues, + [ + { + 'content-length': 3009012, + 'content-md5': '60f7abfdc5855e00a6e6dc2918bda7a8', + 'versionId': 'f13938435117885f7f88d7636a3b238e', + }, + { + 'content-length': 3984521, + 'content-md5': '7cfc7e8b82826b83e302d18fa0e24b12', + 'versionId': 'f7f5c4973bc353ad8a9d5084fc1f0dc3', + }, + { + 'content-length': 4112702, + 'content-md5': '6d4fe78b11cfa4ff7b2efaba5c5965fe', + 'versionId': '5540954e05a7910abeb72b8393c93afe', + }, + ]); + return done(); + }); + }); +}); diff --git a/tests/unit/storage/metadata/oplog/BucketdOplogInterface.js b/tests/unit/storage/metadata/oplog/BucketdOplogInterface.js new file mode 100644 index 000000000..0ef3e0a6f --- /dev/null +++ b/tests/unit/storage/metadata/oplog/BucketdOplogInterface.js @@ -0,0 +1,216 @@ +const async = require('async'); +const fakeBucketInfo = require('../FakeBucketInfo.json'); +const MetadataWrapper = require('../../../../../lib/storage/metadata/MetadataWrapper'); +const BucketdOplogInterface = require('../../../../../lib/storage/metadata/oplog/BucketdOplogInterface'); +const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface'); +const Injector = require('../Injector'); +const http = require('http'); +const url = require('url'); +const werelogs = require('werelogs'); + +werelogs.configure({ + level: 'info', + dump: 'error', +}); + +class PersistDataInterface { + + constructor() { + this.data = null; + } + + initState(cb) { + this.data = {}; + return process.nextTick(cb); + } + + loadState(stream, cb) { + const chunks = []; + stream.on('data', chunk => { + chunks.push(chunk); + }); + stream.on('end', () => { + this.data = JSON.parse(Buffer.concat(chunks)); + return process.nextTick(cb); + }); + } + + saveState(stream, cb) { + stream.write(JSON.stringify(this.data)); + stream.end(); + return process.nextTick(cb); + } + + updateState(addQueue, deleteQueue, cb) { + return process.nextTick(cb); + } +} + +describe('BucketdOplogInterface', () => { + const logger = new werelogs.Logger('BucketOplogInterface'); + + const fakePort = 9090; + const fakeBucket = 'fake'; + const fakeRaftId = 2; + const numObjs = 20000; + const fakeCseq = 20001; + let oplogInjected = false; + const numOplogSeqs = 100; + const oplogBatchSize = 2; + const endCseq = fakeCseq + numOplogSeqs; + const maxLimit = 2; + const oplogKeys = []; + const oplogValues = []; + let oplogKeysIdx = 0; + + const memBackend = new MetadataWrapper( + 'mem', {}, null, logger); + const injector = new Injector(memBackend, logger); + + const requestListener = (req, res) => { + const _url = url.parse(req.url, true); + if (_url.pathname === `/_/buckets/${fakeBucket}`) { + res.writeHead(200); + res.end(JSON.stringify( + { + raftSessionId: fakeRaftId, + creating: false, + deleting: false, + version: 0, + })); + } else if (_url.pathname === `/_/raft_sessions/${fakeRaftId}/log`) { + const begin = _url.query.begin; + const limit = _url.query.limit; + if (begin === '1' && limit === '1') { + res.writeHead(200); + res.end(JSON.stringify( + { + info: { + start: 1, + cseq: fakeCseq, + prune: 1, + }, + })); + } else { + const realLimit = Math.min(limit, maxLimit); + async.until( + () => oplogInjected, + next => { + // inject similar but different random objects + injector.inject( + fakeBucket, + { + numKeys: numOplogSeqs * oplogBatchSize, + maxSeq: numObjs, + op: injector.opPut, + randomSeq: true, + prefix: 'obj_', + suffix: '_bis', + }, + null, + oplogKeys, + oplogValues, + err => { + if (err) { + return next(err); + } + oplogInjected = true; + return next(); + }); + }, err => { + if (err) { + res.writeHead(404); + res.end('error', err); + return undefined; + } + if (begin < endCseq) { + res.writeHead(200); + const resp = {}; + resp.info = { + start: begin, + cseq: endCseq, + prune: 1, + }; + resp.log = []; + for (let i = 0; i < realLimit; i++) { + resp.log[i] = {}; + resp.log[i].db = fakeBucket; + resp.log[i].method = 8; + resp.log[i].entries = []; + for (let j = 0; j < oplogBatchSize; j++) { + resp.log[i].entries[j] = {}; + resp.log[i].entries[j].key = oplogKeys[oplogKeysIdx]; + resp.log[i].entries[j].value = oplogValues[oplogKeysIdx]; + oplogKeysIdx++; + } + } + res.end(JSON.stringify(resp)); + } + return undefined; + }); + } + } else if (_url.pathname === `/default/bucket/${fakeBucket}`) { + const marker = _url.query.marker === '' ? null : _url.query.marker; + const maxKeys = parseInt(_url.query.maxKeys, 10); + memBackend.listObjects(fakeBucket, { + listingType: 'Delimiter', + marker, + maxKeys, + }, (err, result) => { + if (err) { + res.writeHead(404); + res.end('error', err); + return undefined; + } + res.writeHead(200); + res.end(JSON.stringify(result)); + return undefined; + }); + } + }; + + before(done => { + const server = http.createServer(requestListener); + server.listen(fakePort); + async.waterfall([ + next => memBackend.createBucket(fakeBucket, fakeBucketInfo, logger, next), + next => injector.inject( + fakeBucket, + { + numKeys: numObjs, + maxSeq: numObjs, + op: injector.opPut, + randomSeq: false, + prefix: 'obj_', + suffix: '', + }, + null, + null, + null, + next), + ], done); + }); + + after(done => { + memBackend.deleteBucket(fakeBucket, logger, done); + }); + + it('simulation', done => { + const params = { + bootstrap: [`localhost:${fakePort}`], + persist: new PersistMemInterface(), + persistData: new PersistDataInterface(), + stopAt: numObjs + numOplogSeqs, + }; + const bucketdOplog = new BucketdOplogInterface(params); + bucketdOplog.start( + { + filterName: fakeBucket, + filterType: 'bucket', + bucket: { + bucketName: fakeBucket, + }, + }, + done); + }); +}); diff --git a/tests/unit/storage/metadata/oplog/PersistMem.js b/tests/unit/storage/metadata/oplog/PersistMem.js new file mode 100644 index 000000000..078b386c2 --- /dev/null +++ b/tests/unit/storage/metadata/oplog/PersistMem.js @@ -0,0 +1,55 @@ +const assert = require('assert'); +const PersistMemInterface = require('../../../../../lib/storage/metadata/oplog/PersistMemInterface'); + +class PersistDataInterface { + + constructor(obj) { + this.obj = obj; + } + + loadState(stream, cb) { + const chunks = []; + stream.on('data', chunk => { + chunks.push(chunk); + }); + stream.on('end', () => { + this.obj = JSON.parse(Buffer.concat(chunks)); + return cb(); + }); + } + + saveState(stream, cb) { + stream.write(JSON.stringify(this.obj)); + stream.end(); + return cb(); + } +} + +describe('Persist Mem', () => { + const persist = new PersistMemInterface(); + const filterName = 'foo'; + + it('basic operations', done => { + const pd1 = new PersistDataInterface({ + foo: 'bar', + bar: { + qux: 42, + quuux: false, + }, + }); + const pd2 = new PersistDataInterface(); + persist.save(filterName, pd1, 42, err => { + if (err) { + return done(err); + } + persist.load(filterName, pd2, err => { + if (err) { + return done(err); + } + assert.deepEqual(pd1.obj, pd2.obj); + return done(); + }); + return undefined; + }); + }); +}); diff --git a/tools/oplog-cli.js b/tools/oplog-cli.js new file mode 100644 index 000000000..6a5aafea1 --- /dev/null +++ b/tools/oplog-cli.js @@ -0,0 +1,116 @@ +/* eslint-disable no-console */ +const fs = require('fs'); +const { ArgumentParser } = require('argparse'); +const BucketdOplogInterface = require('../lib/storage/metadata/oplog/BucketdOplogInterface'); +const MongoOplogInterface = require('../lib/storage/metadata/oplog/MongoOplogInterface'); +const PersistMemInterface = require('../lib/storage/metadata/oplog/PersistMemInterface'); +const PersistFileInterface = require('../lib/storage/metadata/oplog/PersistFileInterface'); +const PersistRingInterface = require('../lib/storage/metadata/oplog/PersistRingInterface'); + +const parser = new ArgumentParser({ + description: 'Oplog CLI tool', +}); + +parser.add_argument('-v', '--verbose', { action: 'store_true' }); +parser.add_argument('-c', '--config-file', { help: 'config file' }); +parser.add_argument('--oplog-interface', { help: 'bucketd|mongo' }); +parser.add_argument('--persist', { help: 'mem|file|ring' }); +parser.add_argument('--bucket', { help: 'bucket' }); +parser.add_argument('--start', { action: 'store_true' }); + +const args = parser.parse_args(); + +class PersistDataInterface { + + constructor() { + this.data = null; + } + + initState(cb) { + this.data = {}; + return process.nextTick(cb); + } + + loadState(stream, cb) { + const chunks = []; + stream.on('data', chunk => { + chunks.push(chunk); + }); + stream.on('end', () => { + this.data = JSON.parse(Buffer.concat(chunks)); + return process.nextTick(cb); + }); + } + + saveState(stream, cb) { + stream.write(JSON.stringify(this.data)); + stream.end(); + return process.nextTick(cb); + } + + updateState(addQueue, deleteQueue, cb) { + console.log('addQueue', addQueue, 'deleteQueue', deleteQueue); + return process.nextTick(cb); + } +} + +let config = {}; + +if (args.config_file !== undefined) { + config = JSON.parse(fs.readFileSync(args.config_file, 'utf8')); +} + +let persist; +if (args.persist === 'mem') { + persist = new PersistMemInterface(config.persistMem); +} else if (args.persist === 'file') { + persist = new PersistFileInterface(config.persistFile); +} else if (args.persist === 'ring') { + persist = new PersistRingInterface(config.persistRing); +} else { + console.error(`invalid persist ${args.persist}`); + process.exit(1); +} + +let params = { + persist, + persistData: new PersistDataInterface(), +}; + +let oplogInterface; +if (args.oplog_interface === 'bucketd') { + params = Object.assign(params, config.bucketdOplog); + oplogInterface = new BucketdOplogInterface(params); +} else if (args.oplog_interface === 'mongo') { + params = Object.assign(params, config.mongoOplog); + oplogInterface = new MongoOplogInterface(params); +} else { + console.error(`invalid oplog-interface ${args.oplog_interface}`); + process.exit(1); +} + +if (args.start) { + if (args.bucket === undefined) { + console.error('please provide bucket'); + process.exit(1); + } + oplogInterface.start( + { + filterName: args.bucket, + filterType: 'bucket', + bucket: { + bucketName: args.bucket, + }, + }, + err => { + if (err) { + console.error(err); + process.exit(1); + } + console.log('exiting...'); + return; + }); +} else { + console.error('please provide an option'); + process.exit(1); +} diff --git a/yarn.lock b/yarn.lock index 3bb78c102..870595c5a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1682,6 +1682,11 @@ async@^3.2.0: resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9" integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g== +async@~0.2.7: + version "0.2.10" + resolved "https://registry.yarnpkg.com/async/-/async-0.2.10.tgz#b6bbe0b0674b9d719708ca38de8c237cb526c3d1" + integrity sha1-trvgsGdLnXGXCMo43owjfLUmw9E= + async@~2.1.5: version "2.1.5" resolved "https://registry.yarnpkg.com/async/-/async-2.1.5.tgz#e587c68580994ac67fc56ff86d3ac56bdbe810bc" @@ -4895,6 +4900,11 @@ memory-pager@^1.0.2: resolved "https://registry.yarnpkg.com/memory-pager/-/memory-pager-1.5.0.tgz#d8751655d22d384682741c972f2c3d6dfa3e66b5" integrity sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg== +memorystream@^0.3.1: + version "0.3.1" + resolved "https://registry.yarnpkg.com/memorystream/-/memorystream-0.3.1.tgz#86d7090b30ce455d63fbae12dda51a47ddcaf9b2" + integrity sha1-htcJCzDORV1j+64S3aUaR93K+bI= + merge-stream@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/merge-stream/-/merge-stream-2.0.0.tgz#52823629a14dd00c9770fb6ad47dc6310f2c1f60" @@ -5203,6 +5213,14 @@ node-releases@^2.0.2: resolved "https://registry.yarnpkg.com/node-releases/-/node-releases-2.0.2.tgz#7139fe71e2f4f11b47d4d2986aaf8c48699e0c01" integrity sha512-XxYDdcQ6eKqp/YjI+tb2C5WM2LgjnZrfYg4vgQt49EK268b6gYCHsBLrK2qvJo4FmCtqmKezb0WZFK4fkrZNsg== +node-zookeeper-client@^0.2.2: + version "0.2.3" + resolved "https://registry.yarnpkg.com/node-zookeeper-client/-/node-zookeeper-client-0.2.3.tgz#48c79129c56b8e898df9bd3bdad9e27dcad63851" + integrity sha512-V4gVHxzQ42iwhkANpPryzfjmqi3Ql3xeO9E/px7W5Yi774WplU3YtqUpnvcL/eJit4UqcfuLOgZLkpf0BPhHmg== + dependencies: + async "~0.2.7" + underscore "~1.4.4" + nopt@^5.0.0: version "5.0.0" resolved "https://registry.yarnpkg.com/nopt/-/nopt-5.0.0.tgz#530942bb58a512fccafe53fe210f13a25355dc88" @@ -5508,6 +5526,11 @@ pluralize@^1.2.1: resolved "https://registry.yarnpkg.com/pluralize/-/pluralize-1.2.1.tgz#d1a21483fd22bb41e58a12fa3421823140897c45" integrity sha1-0aIUg/0iu0HlihL6NCGCMUCJfEU= +prando@^6.0.1: + version "6.0.1" + resolved "https://registry.yarnpkg.com/prando/-/prando-6.0.1.tgz#ffa8de84c2adc4975dd9df37ae4ada0458face53" + integrity sha512-ghUWxQ1T9IJmPu6eshc3VU0OwveUtXQ33ZLXYUcz1Oc5ppKLDXKp0TBDj6b0epwhEctzcQSNGR2iHyvQSn4W5A== + prelude-ls@~1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.1.2.tgz#21932a549f5e52ffd9a827f570e04be62a97da54" @@ -6726,6 +6749,11 @@ unbzip2-stream@^1.0.9: buffer "^5.2.1" through "^2.3.8" +underscore@~1.4.4: + version "1.4.4" + resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.4.4.tgz#61a6a32010622afa07963bf325203cf12239d604" + integrity sha1-YaajIBBiKvoHljvzJSA88SI51gQ= + underscore@~1.8.3: version "1.8.3" resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.8.3.tgz#4f3fb53b106e6097fcf9cb4109f2a5e9bdfa5022"