Asynchronous programming
done right.

Without race conditions ..ions ..io on ..ns ditions.

by Piotr Pelczar (Athlan)

About me

Piotr Pelczar

  • Freelancer for 8yrs
  • PHP, Node.js, Java/Groovy
  • Zend Certified Engineer
  • IPIJ, Startups

Stay in touch

Asynchronous programming

Asynchronous actions are actions executed in a non-blocking scheme, allowing the main program flow to continue processing.

How software lives in hardware?

  • Operating systems are process based
  • Each process has assigned processor, registers, memory

How software lives in hardware?

  • Process paralelism using threads (thread pools)
  • Switching processor over processes/threads causes context switching

1. context switching = wasting time

Sync programming

In trivial, sequential approach

  • Each operation is executed sequentially:

    O(t) > O(t+1)
  • if O(t) stucks, O(t+1) waits...

Sync programming

This is cool, software flow is predictible
But not in high throughput I/O

I/O costs because of waiting time...

High throughput I/O

High throughput I/O doesn't mean:

  • Memory operations
  • Fast single-thread computing

High throughput I/O

High throughput I/O means:

  • HTTP requests
  • Database connections
  • Queue system dispatching
  • HDD operations

2. Avoid I/O blocking

2. Avoid I/O blocking

Single-threaded, event loop model

Imagine a man, who has a task:

  • Walk around
  • When bucket is full of water, just pour another bucket
  • Go to next bucket

Connections, files, etc. in OS are registered as file descriptors, wchich can be checked, if data buffer (in/out) is ready.

There is no sequences

In async programming, results appears in no sequences


operation1(); // will output "operation1 finished."
operation2(); // will output "operation2 finished."
operation3(); // will output "operation3 finished."
					

There is no sequences

operation1() would be


var amqp = require("amqp")
var eventbus = amqp.createConnection();
console.log("AMQP connecting...");

eventbus.on("ready", function() {
    console.log("AMQP connected...");

    callback();
    return;
});
					

There is no sequences

operation2() would be


var redis = require("redis")
var conn = redis.createClient(port, host, options);
console.log("Redis connecting...");

conn.auth(pass, function(err) {
    if(err)
        console.log("Redis failed...");
    else
        console.log("Redis connected...");
    
    callback();
    return;
});
					

There is no sequences

operation3() would be


var mongojs = require("mongojs");

console.log("Mongo connecting...");
var conn = mongojs.connect(connectionString); // blocking operation
console.log("Mongo connected...");

callback();
return;
					

There is no sequences

Expectations?


AMQP connecting...  // operation1()
AMQP connected...   // operation1()
Redis connecting... // operation2()
Redis failed...     // operation2()
Mongo connecting... // operation3(), blocking
Mongo connected...  // operation3()
					

There is no sequences

Expectations?

There is no sequences

The result:


AMQP connecting...  // operation1()
Redis connecting... // operation2()
Mongo connecting... // operation3(), blocking
Mongo connected...  // operation3()
Redis failed...     // operation2()
AMQP connected...   // operation1()
					

There is no sequences

So... what functions returns?

You can perform future tasks in function, so what will be returned?

function my_function() {
    operation1();
    operation2();
    operation3();

    return "value123";
}
					

value123 will be returned,
just after blocking code, without waiting for non-blocking.

Assume: Functions does NOT returns values

The function block is executed immedietally from top to bottom. You cannot rely to return value, because it is useless.

Callbacks

Callback is the reference to function.


var callbackFunction = function(result) {
    console.log("Result: %s", result)
}
					

When operation is done, the callback function is executed.


callbackFunction("test1") // "Result: test1" will be printed out
					

Callbacks

If callbackFunction is a variable (value = reference),
so can be passed it via function argument.


var callbackFunction = function() { ... }
someOtherFunction(callbackFunction);
					

function someOtherFunction(callback) {
    callback(); // execute function from argument
}
					

Callbacks

Functions can be defined as anonymous (closures)


function someOtherFunction(callback) {
    var arg1 = "test";
    callback(arg1); // execute function from argument
}
					

someOtherFunction(function(arg1) {
    console.log('done... %s', arg1);
})
					

Callbacks can be nested

Nesting callbacks makes code unreadeable:


var amqp = require('amqp');

var connection = amqp.createConnection();

connection.on('ready', function() {
    connection.exchange("ex1", function(exchange) {
        connection.queue('queue1', function(q) {
            q.bind(exchange, 'r1');

            q.subscribe(function(json, headers, info, m) {
                console.log("msg: " + JSON.stringify(json));
            });
        });
    });
});
					

Callbacks can be nested

Nesting callbacks makes code unreadeable:


var amqp = require('amqp');

var connection = amqp.createConnection();

connection.on('ready', function() {
    connection.exchange("ex1", function(exchange) {
        connection.queue('queue1', function(q) {
            q.bind(exchange, 'r1');

            q.subscribe(function(json, headers, info, m) {
                console.log("msg: " + JSON.stringify(json));
                
                table.update(select, data, function() {
                    table.find(select, function(err, rows) {
                        // inserted rows...
                    }
                });
            });
        });
    });
});
					

Asynchronous control flows

  • Promise design pattern
  • Libraries that manages callbacks references

Promise design pattern

  1. Client fires function that will return result in the future
    in the future, so it is a promise
  2. Function returns promise object immedietaly
    before non-blocking operations
  3. Client registers callbacks
  4. Callbacks will be fired in the future, when task is done

var resultPromise = loader.loadData(sourceFile)

resultPromise(function success(data) {
    // this function will be called while operation will succeed
}, function error(err) {
    // on fail
})
					

Promise design pattern

  1. Create deferred object
  2. Return def.promise
  3. Call resolve() or reject()

var loadData = function(sourceFile) {
    var def = deferred()
      , proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile])
    
    var commandProcessBuff = null
      , commandProcessBuffError = null;
    
    proc.stdout.on('data', function (data) { commandProcessBuff += data })
    proc.stderr.on('data', function (data) { commandProcessBuffError += data })

    proc.on('close', function (code) {
        if(null !== commandProcessBuffError)
            def.reject(commandProcessBuffError)
        else
            def.resolve(commandProcessBuff)
    })
    
    return def.promise
}
					

Promise design pattern

Async Node.js library

Provides control flows like:

  • Sequences (series)
  • Waterfalls (sequences with parameters passing)
  • Parallel (with limit)
  • Some/every conditions
  • While/until
  • Queue

Async Node.js library

Series

Async Node.js library

Series


async.series([
    function(callback) {
        // operation1
    },
    function(callback) {
        // operation2
    },
    function(callback) {
        // operation3
    }
], function() {
    console.log('all operations done')
})
					

Async Node.js library

Parallel


async.parallel([
    function(callback) {
        // operation1
    },
    function(callback) {
        // operation2
    },
    function(callback) {
        // operation3
    }
], function() {
    console.log('all operations done')
})
					

Async Node.js library

Parallel limit

Async Node.js library

Parallel limit


var tasks = [
    function(callback) {
        // operation1
    },
    function(callback) {
        // operation2
    },
    // ...
]

async.parallelLimit(tasks, 2, function() {
    console.log('all operations done')
})
					

Async Node.js library

Waterfall


async.waterfall([
    function(callback) {
        // operation1
        callback(null, arg1, arg2)
    },
    function(arg1, arg2, callback) {
        // operation2
        callback(null, foo, bar)
    },
    function(foo, bar, callback) {
        // operation3
    }
], function() {
    console.log('all operations done')
})
					

Async Node.js library

Whilst


async.doWhilst(
    function(done) {
        // operation1
        done(null, arg1, arg2)
    },
    function() {
        return pages < limit
    }
], function() {
    console.log('done')
})
					

Asynchronous programming traps

Dealing with callbacks may be tricky. Keep your code clean.

Unnamed callbacks

Keep your code clean, don't name callback function callback


function doSomething(callback) {
    return callback;
}
					

Unnamed callbacks


function doSomething(callback) {
    
    doAnotherThing(function(callback2) {
    
        doYetAnotherThing(function(callback3) {
            
            return callback();
        })
    })
}
					

Unnamed callbacks

Instead of this, name your callbacks


function doSomething(done) {
    
    doAnotherThing(function(doneFetchingFromApi) {
    
        doYetAnotherThing(function(doneWritingToDatabase) {
            
            return done();
        })
    })
}
					

Double callbacks


function doSomething(done) {

    doAnotherThing(function (err) {
    
        if (err) done(err);
        done(null, result);
      
    });
  
}
					

Callback is fired twice!

Double callbacks

Fix: Always prepend callback execution with return statement.


function doSomething(done) {

    doAnotherThing(function (err) {
    
        if (err)
            return done(err);
        
        return done(null, result);
      
    });
}
					

Normally, return ends function execution, why do not keep this rule while async.

Double callbacks

Double callbacks are very hard to debug.

The callback wrapper can be written and execute it only once.


setTimeout(function() {
        done('a')
}, 200)
setTimeout(function() {
        done('b')
}, 500)
					

Double callbacks


var CallbackOnce = function(callback) {
        this.isFired = false
        this.callback = callback
}
 
CallbackOnce.prototype.create = function() {
        var delegate = this
 
        return function() {
                if(delegate.isFired)
                        return
 
                delegate.isFired = true
                delegate.callback.apply(null, arguments)
        }
}
					

Double callbacks


obj1 = new CallbackOnce(done)

// decorate callback
safeDone = obj1.create() // safeDone() is proxy function that passes arguments
 
setTimeout(function() {
        safeDone('a') // safe now...
}, 200)
setTimeout(function() {
        safeDone('b') // safe now...
}, 500)
					

Unexpected callbacks

Never fire callback until task is done.


function doSomething(done) {

    doAnotherThing(function () {
        
        if (condition) {
            var result = null
            // prepare result...
            
            return done(result);
        }
        
        return done(null);
        
    });
}
					

The ending return will be fired even if condition pass.

Unexpected callbacks

Never fire callback until task is done.


function doSomething(done) {

    doAnotherThing(function () {
        
        if (condition) {
            var result = null
            // prepare result...
            
            return done(result);
        }
        else {
            return done(null);
        }
        
    });
}
					

Unexpected callbacks

Never use callback in try clause!


function (callback) {
    another_function(function (err, some_data) {
        if (err)
            return callback(err);
        
        try {
            callback(null, JSON.parse(some_data)); // error here
        } catch(err) {
            callback(new Error(some_data + ' is not a valid JSON'));
        }
    });
}
					

If callback throws an exception, then it is executed exactly twice!

Unexpected callbacks

Never use callback in try clause!


function (callback) {
    another_function(function (err, some_data) {
        if (err)
            return callback(err);
        
        try {
            var parsed = JSON.parse(some_data)
        } catch(err) {
            return callback(new Error(some_data + ' is not a valid JSON'));
        }
        
        callback(null, parsed);
    });
}
					

Unexpected callbacks

Never use callback in try clause!

Take care of events

Read docs carefully. Really.


function doSomething(done) {

    var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile])
    var procBuff = '';
    
    proc.stdout.on('data', function (data) {
        procBuff += data;
    });
    
    // WAT?!
    proc.stderr.on('data', function (data) {
        done(new Error("An error occured: " + data))
    });
    
    proc.on('close', function (code) {
        done(null, procBuff);
    }
}
					

Take care of events

Read docs carefully. Really.


function doSomething(done) {

    var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile])
    var procBuff = '';
    var procBuffError = '';

    proc.stdout.on('data', function (data) {
        procBuff += data;
    });

    proc.stderr.on('data', function (data) {
        proc += data;
    });

    proc.on('close', function (code) {
        if(code !== 0) {
            return done(new Error("An error occured: " + procBuffError));
        }
        else {
            return done(null, procBuff)
        }
    }

}
					

Unreadable and logs

  • Keep in mind, that asynchronous logs will interweave
  • There are not sequenced
  • Or there will be same log strings

Unexpected callbacks

Asynchronous logs will interweave

Unreadable and logs

Logs without use context are useless...


function getResults(keyword, done) {
    http.request(url, function(response) {
        console.log('Fetching from API')
        
        response.on('error', function(err) {
            console.log('API error')
        })
    });
}
					

Unreadable and logs


function getResults(keyword, done) {
    var logContext = { keyword: keyword }
    
    http.request(url, function(response) {
        console.log(logContext, 'Fetching from API')
        
        response.on('error', function(err) {
            console.log(logContext, 'API error')
        })
    });
}
					

Unreadable and logs

  • Centralize your logs - use logstash
  • And make them searcheable - Elasticsearch + Kibana

Too many opened background-tasks

While running parallel in order to satisfy first-better algorithm, others should be aborted

Too many opened background-tasks

Provide cancellation API:


var events = require('events')

function getResults(keyword) {
    var def = deferred()
    var eventbus = new events.EventEmitter()
    
    var req = http.request(url, function(response) {
        var err = null
          , content = null
        
        res.on('data', function(chunk) {
            content += chunk;
        });
        response.on('close', function() {
            if(err)
                return def.reject(err)
            else
                return def.resolve(content)
        })
        response.on('error', function(err) {
            err += err
        })
    });
    
    eventbus.on('abort', function() {
        req.abort()
    })
    
    return {
        result: def.promise,
        events: eventbus
    }
}
					

Too many opened background-tasks

Provide cancellation API:


var response = getResults('test')

response.result(function success() {
    // ...
}, function error() {
    // ...
})

// if we need
response.events.emit('abort')
					

Everything runs in parallel except your code.

 

When currently code is running, (not waiting for I/O descriptors) whole event loop is blocked.

THE END

by Piotr Pelczar

Q&A

by Piotr Pelczar