File Transfer with Socket.io and RethinkDB
RethinkDB lets you store binary objects directly in the database. While this is perfect for small files, storing large files or accessing many small files simultaneously would impact system memory. Because you can not directly stream binary data from RethinkDB yet, you have to load the entire file content in memory. There is ReGrid module to tackle this issue which is inspired by MongoDB GridFS. But I needed a simpler version of this without any ‘revisioning’ or ‘bucket’ and also easily integrable with socket.io without using any socket.io streaming library.
The full demo is available at https://github.com/hassansin/rethinkdb-fileupload
Schema:
The schema is very similar to that of GridFS. Two tables are used:
messagesUsed to store file meta information. Astream()method is defined, which will stream file chunks in sequence so that we don’t have to load the entire file in memory. We can then easily pipe this stream with express.js response object and download the file.fs_chunksUsed for storing binary data. Meta information are file size, name, type, chunkSize etc.chunkSizeis the maximum size of file chunks in bytes. This example used chunk size of 255KB. Each chunk has anindexvalue that represents it’s position in the stored file.
Following are the schemas defined using thinky.io - A light ORM for Node.js:
models/messages.js
"use strict";
import thinky from '../db';
import FsChunks from './fs-chunks';
import {Transform} from 'stream';
const type = thinky.type;
const Message = thinky.createModel("messages", {
id: type.string(),
type: type.string(),
name: type.string(),
size: type.number(),
chunkSize: type.number(),
createdAt: type.date().default(thinky.r.now())
});
// Stream file chunks
Message.define("stream", function() {
const transform = new Transform({
transform: function(chunk, encoding, next) {
this.push(chunk.data);
next();
},
objectMode: true
});
return FsChunks.getAll(this.id, {index: 'fileId'})
.orderBy('index')
.pluck('data')
.toStream()
._query
.pipe(transform);
});
// Wrapper to generate uuid
Message.defineStatic("uuid", function(){
return thinky.r.uuid().run();
});
Message.hasMany(FsChunks, "chunks", "id", "fileId");
export default Message;models/fs-chunks.js
import thinky from '../db';
const type = thinky.type;
const FsChunks = thinky.createModel("fs_chunks", {
id: type.string(),
fileId: type.string(),
data: type.buffer(),
index: type.number(),
size: type.number()
});
FsChunks.ensureIndex("fileIdIndex", function(doc) {
return doc("fileId").add(doc("index"));
});
export default FsChunks;Socket.io events:
Defined four events upload.start, upload.data, upload.finish, upload.delete for starting file upload, transferring file chunks, finishing file upload and cancelling file upload respectively:
upload.start: Transfer is initiated by the client by emitting this event. Server returns anuuidin response. Client uses this id for further transfer.upload.data: After receivinguuidfrom server, client transfers file chunks in sequence along with this unique id. Server stores each chunk infs_chunkstableupload.finish: Client informs server about EOF and sends some meta information. Server creates a new file record inmessagestable with theuuidand meta data.upload.delete: Client can abort any ongoing upload or delete any existing file by sending this event and theuuidof the upload/file.
A changefeed is attached to messages table where file meta is stored. This feed emits a socket.io event whenever a new document is inserted.
import debug from 'debug';
import * as routes from './routes/socket';
import Message from './models/messages';
const errorlog = debug('app:socket:error');
const log = debug('app:socket:log');
export default function(app) {
io
.on('connection', function(socket){
/*FILE UPLOAD EVENTS*/
socket.on('upload.start', routes.startUpload);
socket.on('upload.data', routes.chunkUpload);
socket.on('upload.finish', routes.finishUpload);
socket.on('upload.delete', routes.deleteUpload);
/*Error handling*/
socket.on('error', e => errorlog(e.stack));
});
// Message changefeed
Message.changes()
.then((feed) => {
feed.each( (e, doc) => {
// new message
if(doc.getOldValue() === null){
app.io.emit('message.new', doc);
}
else if(e){
errorlog(e);
}
else{
log('unhandled feed');
}
});
})
.catch( e => errorlog(e));
}Limitations
Some limitations that I would like to address soon
- File chunks are transmitted sequentially from browser using
FileReaderAPI. The next chunk is sent only after the previous chunk has been transferred and stored in DB. Need to explore the possibility of sending multiple chunks in parallel. - Any interrupted upload would result in orphan chunks in
fs_chunkstable. - Calculate MD5 checksum to detect corrupted file.