[Node.js] Build a clean process manager

In this article, I’ll demonstrate how to buid a simple and maintainable process manager for Node.js, leveraging its Event Loop.

The main idea is have a core processor that will be able to run a serie of tasks, in a synchronous way.
As you may know, Javascript is by essence an asynchronous language.
Let take simple example :

function CrawlingAWebPageAndGetLinks(){
// do the stuff the method pretend;
}
(function program(){
var result = CrawlingAWebPageAndGetLinks();
console.log(result);
})();

view raw
callbackDemo_01.js
hosted with ❤ by GitHub

As the execution flow is asynchroneous, the result variable will not be valued before the crawling method ends, and the usage of the variable will occurs before its valuation
The standard solution in javascript to handle this issue is to use a callback method :

function CrawlingAWebPageAndGetLinks(callback){
// do the stuff the method pretend;
var value = Crawler.Get("https://www.google.com/search?q=callback%20hell");
// then use
callback(value);
}
(function program(){
var callback = function(v){
console.log(v);
};
var result = CrawlingAWebPageAndGetLinks(callback);
})();

view raw
callbackDemo_02.js
hosted with ❤ by GitHub

Callbacks are really handy, but what happen if if I have a process with several tasks in it?

var results = {};
function checkInDatabase(callback){
results.checkresult = SomeComputationThatTakesTime();
callback();
}
function crawl(callback){
results.crawlingResult = SomeComputationThatTakesTime();
callback();
}
function extractData(callback){
results.data = SomeComputationThatTakesTime();
callback();
}
function saveData(callback){
results.dbAcknowledgementMessage = SomeComputationThatTakesTime();
callback();
}
function displayProcessResult(){
console.log(results);
}
function process (){
checkInDatabase(function(){
crawl(function(){
extractData(function(){
saveData(function(){
displayProcessResult();
});
});
});
});
}

view raw
callbackDemo_03.js
hosted with ❤ by GitHub

Yes, we just felt into the Callback Hell!
The main issue with the nested callback is that your code has a lack of readability and flexibility. It’s quite difficult to understand what the code is meant to do. And if you have to modify the process, you’ll have to rethink the whole process.

We can do much better!

NodeJs processing model is based on a event loop, so we can easily use an event-driven approach.
We build a very simple “event bus” :

var processEvents = {
taskDone: 'taskDone',
taskFaulted: 'taskFaulted',
processDone: 'processDone',
};
module.exports = processEvents;

view raw
processEvents.js
hosted with ❤ by GitHub

var processEvents = require('./processEvents.js');
var events = require('events'),
util = require('util');
function ProcessEventBus() {
events.EventEmitter.call(this);
}
util.inherits(ProcessEventBus, events.EventEmitter);
ProcessEventBus.prototype.taskDone = function(data) {
this.emit(processEvents.taskDone, data);
};
ProcessEventBus.prototype.taskFaulted = function(data) {
this.emit(processEvents.taskFaulted, data);
};
ProcessEventBus.prototype.processDone = function(data) {
this.emit(processEvents.processDone, data);
};
module.exports = ProcessEventBus;

view raw
processEventsBus.js
hosted with ❤ by GitHub

A task is a function that wrap a logical unit of work and a callback to the event bus:

var Merger = require("../../merger.js");
function mergeCrawlerResults() {
this.execute = function(context, eventBus) {
var taskCompleteCallback = function(data) {
context.crawlerMergeResults = data;
eventBus.taskDone();
}
var merger = new Merger(taskCompleteCallback);
merger.merge(context.crawledNewLinks);
}
}
module.exports = mergeCrawlerResults;

view raw
taskExample.js
hosted with ❤ by GitHub

Note that i use a context object, where task can store some data that following tasks can use, and use data previously setted.

Then we can build the processor. As we want the maximun readability, we want our process to be a declarative list of tasks. So our processor execute tasks in a synchronous way, waiting an event from the event bus before executing the next task :

var processEvents = require("./processEvents.js"),
ProcessEventBus = require("./processManager/processEventBus.js");
function Processor(process, onCompletion, options) {
options = options || {};
options.logEvents = options.hasOwnProperty('logEvents') ? options.logEvents : false;
options.context = options.hasOwnProperty('context') ? options.context : {};
var eventBus = new ProcessEventBus();
function executeNext() {
if(options.logEvents) console.log('Processor.executeNext, tasks left : ' + process.length);
if (process.length > 0) {
var nextTask = process.shift();
nextTask.execute(options.context, eventBus);
}
else
eventBus.processDone();
}
eventBus.on(processEvents.taskDone, function(data) {
if(options.logEvents) console.log('processEvents.taskDone');
executeNext();
});
eventBus.on(processEvents.processDone, function(data) {
if(options.logEvents) console.log('processEvents.processDone');
onCompletion(context);
});
this.execute = function() {
if(options.logEvents) console.log('Processor.execute, initial tasks count : ' + process.length);
executeNext();
};
}
module.exports = Processor;

view raw
processor.js
hosted with ❤ by GitHub

Then, the definition of the process itself :
We first gather all the needed components, then simply build an array of the task in the needed order:

var // infra & services
Store = require("../../dal.js"),
// process infrastructure
Processor = require("../processManager/processor.js"),
// tasks
GetStoredActiveLinks = require("./tasks/getStoredActiveLinks.js"),
GetAllLinks = require("./tasks/getAllLinks.js"),
Crawl = require("./tasks/crawl.js"),
MergeCrawlerResults = require("./tasks/mergeCrawlerResults.js"),
CheckForDelta = require("./tasks/checkForDelta.js"),
SaveNewLinks = require("./tasks/saveNewLinks.js"),
SaveLinksSummary = require("./tasks/saveLinksSummary.js");
function Process() {
var store = new Store();
var process = [
new GetStoredActiveLinks(store)
, new GetAllLinks(store)
, new Crawl()
, new MergeCrawlerResults()
, new CheckForDelta()
, new SaveNewLinks(store)
, new SaveLinksSummary(store)
];
this.execute = function() {
console.log('new process');
var onProcessComplete = function(result){ console.log("process completed")};
var processor = new Processor(process, onProcessComplete);
processor.execute();
}
};
module.exports = Process;

view raw
Process.js
hosted with ❤ by GitHub

We are done! We now have a nicely decoupled task processor, and we now focus on writing unit of work that will be wrapped in tasks, so we are now will be able to follow the Single Responsibility Principle.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s