Piotr Pelczar
Asynchronous actions are actions executed in a non-blocking scheme, allowing the main program flow to continue processing.
1. context switching = wasting time
In trivial, sequential approach
This is cool, software flow is predictible
But not in high throughput I/O
I/O costs because of waiting time...
High throughput I/O doesn't mean:
High throughput I/O means:
2. Avoid I/O blocking
2. Avoid I/O blocking
Imagine a man, who has a task:
Connections, files, etc. in OS are registered as file descriptors, wchich can be checked, if data buffer (in/out) is ready.
In async programming, results appears in no sequences
operation1(); // will output "operation1 finished."
operation2(); // will output "operation2 finished."
operation3(); // will output "operation3 finished."
operation1()
would be
var amqp = require("amqp")
var eventbus = amqp.createConnection();
console.log("AMQP connecting...");
eventbus.on("ready", function() {
console.log("AMQP connected...");
callback();
return;
});
operation2()
would be
var redis = require("redis")
var conn = redis.createClient(port, host, options);
console.log("Redis connecting...");
conn.auth(pass, function(err) {
if(err)
console.log("Redis failed...");
else
console.log("Redis connected...");
callback();
return;
});
operation3()
would be
var mongojs = require("mongojs");
console.log("Mongo connecting...");
var conn = mongojs.connect(connectionString); // blocking operation
console.log("Mongo connected...");
callback();
return;
Expectations?
AMQP connecting... // operation1()
AMQP connected... // operation1()
Redis connecting... // operation2()
Redis failed... // operation2()
Mongo connecting... // operation3(), blocking
Mongo connected... // operation3()
Expectations?
The result:
AMQP connecting... // operation1()
Redis connecting... // operation2()
Mongo connecting... // operation3(), blocking
Mongo connected... // operation3()
Redis failed... // operation2()
AMQP connected... // operation1()
function my_function() {
operation1();
operation2();
operation3();
return "value123";
}
value123
will be returned,
just after blocking code, without waiting for non-blocking.
The function block is executed immedietally from top to bottom. You cannot rely to return value, because it is useless.
Callback is the reference to function.
var callbackFunction = function(result) {
console.log("Result: %s", result)
}
When operation is done, the callback function is executed.
callbackFunction("test1") // "Result: test1" will be printed out
If callbackFunction
is a variable (value = reference),
so can be passed it via function argument.
var callbackFunction = function() { ... }
someOtherFunction(callbackFunction);
function someOtherFunction(callback) {
callback(); // execute function from argument
}
Functions can be defined as anonymous (closures)
function someOtherFunction(callback) {
var arg1 = "test";
callback(arg1); // execute function from argument
}
someOtherFunction(function(arg1) {
console.log('done... %s', arg1);
})
Nesting callbacks makes code unreadeable:
var amqp = require('amqp');
var connection = amqp.createConnection();
connection.on('ready', function() {
connection.exchange("ex1", function(exchange) {
connection.queue('queue1', function(q) {
q.bind(exchange, 'r1');
q.subscribe(function(json, headers, info, m) {
console.log("msg: " + JSON.stringify(json));
});
});
});
});
Nesting callbacks makes code unreadeable:
var amqp = require('amqp');
var connection = amqp.createConnection();
connection.on('ready', function() {
connection.exchange("ex1", function(exchange) {
connection.queue('queue1', function(q) {
q.bind(exchange, 'r1');
q.subscribe(function(json, headers, info, m) {
console.log("msg: " + JSON.stringify(json));
table.update(select, data, function() {
table.find(select, function(err, rows) {
// inserted rows...
}
});
});
});
});
});
var resultPromise = loader.loadData(sourceFile)
resultPromise(function success(data) {
// this function will be called while operation will succeed
}, function error(err) {
// on fail
})
deferred
objectdef.promise
resolve()
or reject()
var loadData = function(sourceFile) {
var def = deferred()
, proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile])
var commandProcessBuff = null
, commandProcessBuffError = null;
proc.stdout.on('data', function (data) { commandProcessBuff += data })
proc.stderr.on('data', function (data) { commandProcessBuffError += data })
proc.on('close', function (code) {
if(null !== commandProcessBuffError)
def.reject(commandProcessBuffError)
else
def.resolve(commandProcessBuff)
})
return def.promise
}
Provides control flows like:
Series
Series
async.series([
function(callback) {
// operation1
},
function(callback) {
// operation2
},
function(callback) {
// operation3
}
], function() {
console.log('all operations done')
})
Parallel
async.parallel([
function(callback) {
// operation1
},
function(callback) {
// operation2
},
function(callback) {
// operation3
}
], function() {
console.log('all operations done')
})
Parallel limit
Parallel limit
var tasks = [
function(callback) {
// operation1
},
function(callback) {
// operation2
},
// ...
]
async.parallelLimit(tasks, 2, function() {
console.log('all operations done')
})
Waterfall
async.waterfall([
function(callback) {
// operation1
callback(null, arg1, arg2)
},
function(arg1, arg2, callback) {
// operation2
callback(null, foo, bar)
},
function(foo, bar, callback) {
// operation3
}
], function() {
console.log('all operations done')
})
Whilst
async.doWhilst(
function(done) {
// operation1
done(null, arg1, arg2)
},
function() {
return pages < limit
}
], function() {
console.log('done')
})
Dealing with callbacks may be tricky. Keep your code clean.
Keep your code clean, don't name callback function callback
function doSomething(callback) {
return callback;
}
function doSomething(callback) {
doAnotherThing(function(callback2) {
doYetAnotherThing(function(callback3) {
return callback();
})
})
}
Instead of this, name your callbacks
function doSomething(done) {
doAnotherThing(function(doneFetchingFromApi) {
doYetAnotherThing(function(doneWritingToDatabase) {
return done();
})
})
}
function doSomething(done) {
doAnotherThing(function (err) {
if (err) done(err);
done(null, result);
});
}
Callback is fired twice!
Fix: Always prepend callback execution with return
statement.
function doSomething(done) {
doAnotherThing(function (err) {
if (err)
return done(err);
return done(null, result);
});
}
Normally, return ends function execution, why do not keep this rule while async.
Double callbacks are very hard to debug.
The callback wrapper can be written and execute it only once.
setTimeout(function() {
done('a')
}, 200)
setTimeout(function() {
done('b')
}, 500)
var CallbackOnce = function(callback) {
this.isFired = false
this.callback = callback
}
CallbackOnce.prototype.create = function() {
var delegate = this
return function() {
if(delegate.isFired)
return
delegate.isFired = true
delegate.callback.apply(null, arguments)
}
}
obj1 = new CallbackOnce(done)
// decorate callback
safeDone = obj1.create() // safeDone() is proxy function that passes arguments
setTimeout(function() {
safeDone('a') // safe now...
}, 200)
setTimeout(function() {
safeDone('b') // safe now...
}, 500)
Never fire callback until task is done.
function doSomething(done) {
doAnotherThing(function () {
if (condition) {
var result = null
// prepare result...
return done(result);
}
return done(null);
});
}
The ending return will be fired even if
condition pass.
Never fire callback until task is done.
function doSomething(done) {
doAnotherThing(function () {
if (condition) {
var result = null
// prepare result...
return done(result);
}
else {
return done(null);
}
});
}
Never use callback in try
clause!
function (callback) {
another_function(function (err, some_data) {
if (err)
return callback(err);
try {
callback(null, JSON.parse(some_data)); // error here
} catch(err) {
callback(new Error(some_data + ' is not a valid JSON'));
}
});
}
If callback throws an exception, then it is executed exactly twice!
Never use callback in try
clause!
function (callback) {
another_function(function (err, some_data) {
if (err)
return callback(err);
try {
var parsed = JSON.parse(some_data)
} catch(err) {
return callback(new Error(some_data + ' is not a valid JSON'));
}
callback(null, parsed);
});
}
Never use callback in try
clause!
Read docs carefully. Really.
function doSomething(done) {
var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile])
var procBuff = '';
proc.stdout.on('data', function (data) {
procBuff += data;
});
// WAT?!
proc.stderr.on('data', function (data) {
done(new Error("An error occured: " + data))
});
proc.on('close', function (code) {
done(null, procBuff);
}
}
Read docs carefully. Really.
function doSomething(done) {
var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile])
var procBuff = '';
var procBuffError = '';
proc.stdout.on('data', function (data) {
procBuff += data;
});
proc.stderr.on('data', function (data) {
proc += data;
});
proc.on('close', function (code) {
if(code !== 0) {
return done(new Error("An error occured: " + procBuffError));
}
else {
return done(null, procBuff)
}
}
}
Asynchronous logs will interweave
Logs without use context are useless...
function getResults(keyword, done) {
http.request(url, function(response) {
console.log('Fetching from API')
response.on('error', function(err) {
console.log('API error')
})
});
}
function getResults(keyword, done) {
var logContext = { keyword: keyword }
http.request(url, function(response) {
console.log(logContext, 'Fetching from API')
response.on('error', function(err) {
console.log(logContext, 'API error')
})
});
}
While running parallel in order to satisfy first-better algorithm, others should be aborted
Provide cancellation API:
var events = require('events')
function getResults(keyword) {
var def = deferred()
var eventbus = new events.EventEmitter()
var req = http.request(url, function(response) {
var err = null
, content = null
res.on('data', function(chunk) {
content += chunk;
});
response.on('close', function() {
if(err)
return def.reject(err)
else
return def.resolve(content)
})
response.on('error', function(err) {
err += err
})
});
eventbus.on('abort', function() {
req.abort()
})
return {
result: def.promise,
events: eventbus
}
}
Provide cancellation API:
var response = getResults('test')
response.result(function success() {
// ...
}, function error() {
// ...
})
// if we need
response.events.emit('abort')
Everything runs in parallel except your code.
When currently code is running, (not waiting for I/O descriptors) whole event loop is blocked.