In this article, I’ll demonstrate how to buid a simple and maintainable process manager for Node.js, leveraging its Event Loop.
The main idea is have a core processor that will be able to run a serie of tasks, in a synchronous way.
As you may know, Javascript is by essence an asynchronous language.
Let take simple example :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function CrawlingAWebPageAndGetLinks(){ | |
// do the stuff the method pretend; | |
} | |
(function program(){ | |
var result = CrawlingAWebPageAndGetLinks(); | |
console.log(result); | |
})(); |
As the execution flow is asynchroneous, the result variable will not be valued before the crawling method ends, and the usage of the variable will occurs before its valuation
The standard solution in javascript to handle this issue is to use a callback method :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function CrawlingAWebPageAndGetLinks(callback){ | |
// do the stuff the method pretend; | |
var value = Crawler.Get("https://www.google.com/search?q=callback%20hell"); | |
// then use | |
callback(value); | |
} | |
(function program(){ | |
var callback = function(v){ | |
console.log(v); | |
}; | |
var result = CrawlingAWebPageAndGetLinks(callback); | |
})(); |
Callbacks are really handy, but what happen if if I have a process with several tasks in it?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var results = {}; | |
function checkInDatabase(callback){ | |
results.checkresult = SomeComputationThatTakesTime(); | |
callback(); | |
} | |
function crawl(callback){ | |
results.crawlingResult = SomeComputationThatTakesTime(); | |
callback(); | |
} | |
function extractData(callback){ | |
results.data = SomeComputationThatTakesTime(); | |
callback(); | |
} | |
function saveData(callback){ | |
results.dbAcknowledgementMessage = SomeComputationThatTakesTime(); | |
callback(); | |
} | |
function displayProcessResult(){ | |
console.log(results); | |
} | |
function process (){ | |
checkInDatabase(function(){ | |
crawl(function(){ | |
extractData(function(){ | |
saveData(function(){ | |
displayProcessResult(); | |
}); | |
}); | |
}); | |
}); | |
} |
Yes, we just felt into the Callback Hell!
The main issue with the nested callback is that your code has a lack of readability and flexibility. It’s quite difficult to understand what the code is meant to do. And if you have to modify the process, you’ll have to rethink the whole process.
We can do much better!
NodeJs processing model is based on a event loop, so we can easily use an event-driven approach.
We build a very simple “event bus” :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var processEvents = { | |
taskDone: 'taskDone', | |
taskFaulted: 'taskFaulted', | |
processDone: 'processDone', | |
}; | |
module.exports = processEvents; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var processEvents = require('./processEvents.js'); | |
var events = require('events'), | |
util = require('util'); | |
function ProcessEventBus() { | |
events.EventEmitter.call(this); | |
} | |
util.inherits(ProcessEventBus, events.EventEmitter); | |
ProcessEventBus.prototype.taskDone = function(data) { | |
this.emit(processEvents.taskDone, data); | |
}; | |
ProcessEventBus.prototype.taskFaulted = function(data) { | |
this.emit(processEvents.taskFaulted, data); | |
}; | |
ProcessEventBus.prototype.processDone = function(data) { | |
this.emit(processEvents.processDone, data); | |
}; | |
module.exports = ProcessEventBus; |
A task is a function that wrap a logical unit of work and a callback to the event bus:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Merger = require("../../merger.js"); | |
function mergeCrawlerResults() { | |
this.execute = function(context, eventBus) { | |
var taskCompleteCallback = function(data) { | |
context.crawlerMergeResults = data; | |
eventBus.taskDone(); | |
} | |
var merger = new Merger(taskCompleteCallback); | |
merger.merge(context.crawledNewLinks); | |
} | |
} | |
module.exports = mergeCrawlerResults; |
Note that i use a context object, where task can store some data that following tasks can use, and use data previously setted.
Then we can build the processor. As we want the maximun readability, we want our process to be a declarative list of tasks. So our processor execute tasks in a synchronous way, waiting an event from the event bus before executing the next task :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var processEvents = require("./processEvents.js"), | |
ProcessEventBus = require("./processManager/processEventBus.js"); | |
function Processor(process, onCompletion, options) { | |
options = options || {}; | |
options.logEvents = options.hasOwnProperty('logEvents') ? options.logEvents : false; | |
options.context = options.hasOwnProperty('context') ? options.context : {}; | |
var eventBus = new ProcessEventBus(); | |
function executeNext() { | |
if(options.logEvents) console.log('Processor.executeNext, tasks left : ' + process.length); | |
if (process.length > 0) { | |
var nextTask = process.shift(); | |
nextTask.execute(options.context, eventBus); | |
} | |
else | |
eventBus.processDone(); | |
} | |
eventBus.on(processEvents.taskDone, function(data) { | |
if(options.logEvents) console.log('processEvents.taskDone'); | |
executeNext(); | |
}); | |
eventBus.on(processEvents.processDone, function(data) { | |
if(options.logEvents) console.log('processEvents.processDone'); | |
onCompletion(context); | |
}); | |
this.execute = function() { | |
if(options.logEvents) console.log('Processor.execute, initial tasks count : ' + process.length); | |
executeNext(); | |
}; | |
} | |
module.exports = Processor; |
Then, the definition of the process itself :
We first gather all the needed components, then simply build an array of the task in the needed order:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var // infra & services | |
Store = require("../../dal.js"), | |
// process infrastructure | |
Processor = require("../processManager/processor.js"), | |
// tasks | |
GetStoredActiveLinks = require("./tasks/getStoredActiveLinks.js"), | |
GetAllLinks = require("./tasks/getAllLinks.js"), | |
Crawl = require("./tasks/crawl.js"), | |
MergeCrawlerResults = require("./tasks/mergeCrawlerResults.js"), | |
CheckForDelta = require("./tasks/checkForDelta.js"), | |
SaveNewLinks = require("./tasks/saveNewLinks.js"), | |
SaveLinksSummary = require("./tasks/saveLinksSummary.js"); | |
function Process() { | |
var store = new Store(); | |
var process = [ | |
new GetStoredActiveLinks(store) | |
, new GetAllLinks(store) | |
, new Crawl() | |
, new MergeCrawlerResults() | |
, new CheckForDelta() | |
, new SaveNewLinks(store) | |
, new SaveLinksSummary(store) | |
]; | |
this.execute = function() { | |
console.log('new process'); | |
var onProcessComplete = function(result){ console.log("process completed")}; | |
var processor = new Processor(process, onProcessComplete); | |
processor.execute(); | |
} | |
}; | |
module.exports = Process; |
We are done! We now have a nicely decoupled task processor, and we now focus on writing unit of work that will be wrapped in tasks, so we are now will be able to follow the Single Responsibility Principle.