File Transfer with Socket.io and RethinkDB

RethinkDB lets you store binary objects directly in the database. While this is perfect for small files, storing large files or accessing many small files simultaneously would impact system memory. Because you can not directly stream binary data from RethinkDB yet, you have to load the entire file content in memory. There is ReGrid module to tackle this issue which is inspired by MongoDB GridFS. But I needed a simpler version of this without any ‘revisioning’ or ‘bucket’ and also easily integrable with socket.io without using any socket.io streaming library.

The full demo is available at https://github.com/hassansin/rethinkdb-fileupload

Schema:

The schema is very similar to that of GridFS. Two tables are used:

  • messages Used to store file meta information. A stream() method is defined, which will stream file chunks in sequence so that we don’t have to load the entire file in memory. We can then easily pipe this stream with express.js response object and download the file.
  • fs_chunks Used for storing binary data. Meta information are file size, name, type, chunkSize etc. chunkSize is the maximum size of file chunks in bytes. This example used chunk size of 255KB. Each chunk has an index value that represents it’s position in the stored file.

Following are the schemas defined using thinky.io - A light ORM for Node.js:

models/messages.js

"use strict";
import thinky from '../db';
import FsChunks from './fs-chunks';
import {Transform} from 'stream';

const type = thinky.type;

const Message = thinky.createModel("messages", {
  id: type.string(),
  type: type.string(),
  name: type.string(),
  size: type.number(),
  chunkSize: type.number(),
  createdAt: type.date().default(thinky.r.now())
});

// Stream file chunks
Message.define("stream", function() {
  const transform = new Transform({
    transform: function(chunk, encoding, next) {
      this.push(chunk.data);
      next();
    },
    objectMode: true
  });
  return FsChunks.getAll(this.id, {index: 'fileId'})
    .orderBy('index')
    .pluck('data')
    .toStream()
    ._query
    .pipe(transform);
});

// Wrapper to generate uuid
Message.defineStatic("uuid", function(){
  return thinky.r.uuid().run();
});

Message.hasMany(FsChunks, "chunks", "id", "fileId");
export default Message;

models/fs-chunks.js

import thinky from '../db';
const type = thinky.type;

const FsChunks = thinky.createModel("fs_chunks", {
  id: type.string(),
  fileId: type.string(),
  data: type.buffer(),
  index: type.number(),
  size: type.number()
});

FsChunks.ensureIndex("fileIdIndex", function(doc) {
    return doc("fileId").add(doc("index"));
});

export default FsChunks;

Socket.io events:

Defined four events upload.start, upload.data, upload.finish, upload.delete for starting file upload, transferring file chunks, finishing file upload and cancelling file upload respectively:

  • upload.start : Transfer is initiated by the client by emitting this event. Server returns an uuid in response. Client uses this id for further transfer.
  • upload.data: After receiving uuid from server, client transfers file chunks in sequence along with this unique id. Server stores each chunk in fs_chunks table
  • upload.finish: Client informs server about EOF and sends some meta information. Server creates a new file record in messages table with the uuid and meta data.
  • upload.delete: Client can abort any ongoing upload or delete any existing file by sending this event and the uuid of the upload/file.

A changefeed is attached to messages table where file meta is stored. This feed emits a socket.io event whenever a new document is inserted.

import debug from 'debug';
import * as routes from './routes/socket';
import Message from './models/messages';

const errorlog = debug('app:socket:error');
const log = debug('app:socket:log');

export default function(app) {
  io
    .on('connection', function(socket){

      /*FILE UPLOAD EVENTS*/
      socket.on('upload.start', routes.startUpload);
      socket.on('upload.data', routes.chunkUpload);
      socket.on('upload.finish', routes.finishUpload);
      socket.on('upload.delete', routes.deleteUpload);

      /*Error handling*/
      socket.on('error', e => errorlog(e.stack));
    });

  // Message changefeed
  Message.changes()
    .then((feed) => {
      feed.each( (e, doc) => {
        // new message
        if(doc.getOldValue() === null){
          app.io.emit('message.new', doc);
        }
        else if(e){
          errorlog(e);
        }
        else{
          log('unhandled feed');
        }
      });
    })
    .catch( e => errorlog(e));
}

Limitations

Some limitations that I would like to address soon

  • File chunks are transmitted sequentially from browser using FileReader API. The next chunk is sent only after the previous chunk has been transferred and stored in DB. Need to explore the possibility of sending multiple chunks in parallel.
  • Any interrupted upload would result in orphan chunks in fs_chunks table.
  • Calculate MD5 checksum to detect corrupted file.