[Node.js] Build a clean process manager

In this article, I’ll demonstrate how to buid a simple and maintainable process manager for Node.js, leveraging its Event Loop.

The main idea is have a core processor that will be able to run a serie of tasks, in a synchronous way.
As you may know, Javascript is by essence an asynchronous language.
Let take simple example :


function CrawlingAWebPageAndGetLinks(){
// do the stuff the method pretend;
}
(function program(){
var result = CrawlingAWebPageAndGetLinks();
console.log(result);
})();

As the execution flow is asynchroneous, the result variable will not be valued before the crawling method ends, and the usage of the variable will occurs before its valuation
The standard solution in javascript to handle this issue is to use a callback method :


function CrawlingAWebPageAndGetLinks(callback){
// do the stuff the method pretend;
var value = Crawler.Get("https://www.google.com/search?q=callback%20hell");
// then use
callback(value);
}
(function program(){
var callback = function(v){
console.log(v);
};
var result = CrawlingAWebPageAndGetLinks(callback);
})();

Callbacks are really handy, but what happen if if I have a process with several tasks in it?


var results = {};
function checkInDatabase(callback){
results.checkresult = SomeComputationThatTakesTime();
callback();
}
function crawl(callback){
results.crawlingResult = SomeComputationThatTakesTime();
callback();
}
function extractData(callback){
results.data = SomeComputationThatTakesTime();
callback();
}
function saveData(callback){
results.dbAcknowledgementMessage = SomeComputationThatTakesTime();
callback();
}
function displayProcessResult(){
console.log(results);
}
function process (){
checkInDatabase(function(){
crawl(function(){
extractData(function(){
saveData(function(){
displayProcessResult();
});
});
});
});
}

Yes, we just felt into the Callback Hell!
The main issue with the nested callback is that your code has a lack of readability and flexibility. It’s quite difficult to understand what the code is meant to do. And if you have to modify the process, you’ll have to rethink the whole process.

We can do much better!

NodeJs processing model is based on a event loop, so we can easily use an event-driven approach.
We build a very simple “event bus” :


var processEvents = {
taskDone: 'taskDone',
taskFaulted: 'taskFaulted',
processDone: 'processDone',
};
module.exports = processEvents;


var processEvents = require('./processEvents.js');
var events = require('events'),
util = require('util');
function ProcessEventBus() {
events.EventEmitter.call(this);
}
util.inherits(ProcessEventBus, events.EventEmitter);
ProcessEventBus.prototype.taskDone = function(data) {
this.emit(processEvents.taskDone, data);
};
ProcessEventBus.prototype.taskFaulted = function(data) {
this.emit(processEvents.taskFaulted, data);
};
ProcessEventBus.prototype.processDone = function(data) {
this.emit(processEvents.processDone, data);
};
module.exports = ProcessEventBus;

A task is a function that wrap a logical unit of work and a callback to the event bus:


var Merger = require("../../merger.js");
function mergeCrawlerResults() {
this.execute = function(context, eventBus) {
var taskCompleteCallback = function(data) {
context.crawlerMergeResults = data;
eventBus.taskDone();
}
var merger = new Merger(taskCompleteCallback);
merger.merge(context.crawledNewLinks);
}
}
module.exports = mergeCrawlerResults;

view raw

taskExample.js

hosted with ❤ by GitHub

Note that i use a context object, where task can store some data that following tasks can use, and use data previously setted.

Then we can build the processor. As we want the maximun readability, we want our process to be a declarative list of tasks. So our processor execute tasks in a synchronous way, waiting an event from the event bus before executing the next task :


var processEvents = require("./processEvents.js"),
ProcessEventBus = require("./processManager/processEventBus.js");
function Processor(process, onCompletion, options) {
options = options || {};
options.logEvents = options.hasOwnProperty('logEvents') ? options.logEvents : false;
options.context = options.hasOwnProperty('context') ? options.context : {};
var eventBus = new ProcessEventBus();
function executeNext() {
if(options.logEvents) console.log('Processor.executeNext, tasks left : ' + process.length);
if (process.length > 0) {
var nextTask = process.shift();
nextTask.execute(options.context, eventBus);
}
else
eventBus.processDone();
}
eventBus.on(processEvents.taskDone, function(data) {
if(options.logEvents) console.log('processEvents.taskDone');
executeNext();
});
eventBus.on(processEvents.processDone, function(data) {
if(options.logEvents) console.log('processEvents.processDone');
onCompletion(context);
});
this.execute = function() {
if(options.logEvents) console.log('Processor.execute, initial tasks count : ' + process.length);
executeNext();
};
}
module.exports = Processor;

view raw

processor.js

hosted with ❤ by GitHub

Then, the definition of the process itself :
We first gather all the needed components, then simply build an array of the task in the needed order:


var // infra & services
Store = require("../../dal.js"),
// process infrastructure
Processor = require("../processManager/processor.js"),
// tasks
GetStoredActiveLinks = require("./tasks/getStoredActiveLinks.js"),
GetAllLinks = require("./tasks/getAllLinks.js"),
Crawl = require("./tasks/crawl.js"),
MergeCrawlerResults = require("./tasks/mergeCrawlerResults.js"),
CheckForDelta = require("./tasks/checkForDelta.js"),
SaveNewLinks = require("./tasks/saveNewLinks.js"),
SaveLinksSummary = require("./tasks/saveLinksSummary.js");
function Process() {
var store = new Store();
var process = [
new GetStoredActiveLinks(store)
, new GetAllLinks(store)
, new Crawl()
, new MergeCrawlerResults()
, new CheckForDelta()
, new SaveNewLinks(store)
, new SaveLinksSummary(store)
];
this.execute = function() {
console.log('new process');
var onProcessComplete = function(result){ console.log("process completed")};
var processor = new Processor(process, onProcessComplete);
processor.execute();
}
};
module.exports = Process;

view raw

Process.js

hosted with ❤ by GitHub

We are done! We now have a nicely decoupled task processor, and we now focus on writing unit of work that will be wrapped in tasks, so we are now will be able to follow the Single Responsibility Principle.

Advertisement