Introduction to asynchronous programming with Python and Twisted

Created: 2018-02-25 dom 20:54

1 ¿What is Twisted?

  • Event driven networking engine.
  • It's an asynchronous framework.
  • Twisted supports numerous protocols. Web server, chat, mail, DNS and more.
  • It's made-up of many sub-projects.

2 An example

def download_urls(url_list):
  for url in url_list:
      response = get(url)
      d = parse(response.text)
      mongo_client.test.pages.insert_one(d)

urls = [
    'https://en.wikipedia.org/wiki/Saguaro_National_Park',
    'https://en.wikipedia.org/wiki/Saguaro',
    'https://en.wikipedia.org/wiki/The_Power_of_Sympathy',
    'http://cienciaconelpueblo.org'
]

download_urls(urls)

2.1 Profiling (sorted by cumulative time)

profile.png

2.2 Let's time it

time_crawler.png

2.3 We can see that…

  • Most of the time is spent in get in requests.
  • Only one download (and store) at a time. It's a sequential function.
  • Mongo query is fast because is local and it's a simple query.
  • ¡ get is blocking !. Process sleeps till the response.

3 What is blocking

Blocking
A process (or thread) is waiting for some condition to be satisfied before it can continue execution. A blocked thread is doing NO USEFUL WORK
Running
A process (or thread) is stuck doing some computationally intensive work or complex computation. It's sucking CPU cycles doing USEFUL WORK.

3.1 A running example

def show_primes(lower, upper):
    primes = []
    for num in range(lower,upper + 1):
       if num > 1:
           for i in range(2,num):
               if (num % i) == 0:
                   break
           else:
               primes.append(primes)

3.2 Comparison (crawler vs. primes)

  • Timing crawler: time_crawler.png
  • Timing primes: time_primes.png

4 Going async with twisted

4.1 Straightforward implementation

from txmongo import MongoConnection
from treq import get
from twisted.internet import defer, reactor

@defer.inlineCallbacks
def download_urls(url_list):
    mongo_client = yield MongoConnection()

    for url in url_list:
        response = yield get(url)
        d = parse((yield response.text()))

        yield mongo_client.test.pages.insert(d)

download_urls(urls).addCallback(lambda ign: reactor.stop())
reactor.run()

4.2 async requests (treq) and async mongo driver (txmongo)

  • We have to use async versions of requests and mongo.

    from txmongo import MongoConnection
    from treq import get
    
    mongo_client = yield MongoConnection()
    response = yield get(url)
    d = parse((yield response.text()))
    

4.3 Inlinecallbacks

  • The function has to be declared with the inlineCallbacks decorator.
  • Inlinecallbacks means "courotine" in asyncio or tornado.

    @defer.inlineCallbacks
    def download_urls(url_list):
    

4.4 Yield

  • There are yields everywhere.
  • A yield means the coroutine is going to block.

    @defer.inlineCallbacks
      def download_urls(url_list):
          mongo_client = yield MongoConnection()
    
          for url in url_list:
              response = yield get(url)
              d = parse((yield response.text()))
    
              yield mongo_client.test.pages.insert(d)
    

4.5 But…

This code uses async constructs, but it's sequential.

¡It only downloads only one url at a time!

4.6 A better async approach

from txmongo import MongoConnection
from treq import get
from twisted.internet import defer, reactor

@defer.inlineCallbacks
def download_url(url):
    """Download and store single url"""
    response = yield get(url)
    d = parse((yield response.text()))
    yield mongo_client.test.pages.insert(d)

@defer.inlineCallbacks
def download_urls(url_list):
    mongo_client = yield MongoConnection()
    # Here, we create a coroutine per url. That means we download them
    # all at once.
    # We then wait to all coroutines to finish with "gatherResults"
    yield defer.gatherResults(download_url(url) for url in url_list)

download_urls(urls).addCallback(lambda ign: reactor.stop())
reactor.run()

4.7 A not rigorous comparison

  • Synchronous crawler time_crawler.png
  • Async but sequeantial crawler. time_twisted1.png
  • Async concurrent crawler. time_twisted2.png

5 Asyncio

5.1 Straightforward example

import asyncio
from aiohttp import ClientSession
from motor.motor_asyncio import AsyncIOMotorClient

async def download_urls(url_list):
    client = ClientSession()

    mongo_client = AsyncIOMotorClient()

    for url in url_list:
        response = await client.get(url)
        d = parse(await response.text())
        await mongo_client.test.pages.insert_one(d)

    await client.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(download_urls(urls))

5.2 Async versions of mongo and requests

  • Async versions must be used for database drivers and networking.
from aiohttp import ClientSession
from motor.motor_asyncio import AsyncIOMotorClient

5.3 async def, await

  • In python3.5 we have await (insted of yield and yield for), and async def (instead of the @coroutine decorator).
  • We can use async for and async with too.
async def download_urls(url_list):
    client = ClientSession()

    mongo_client = AsyncIOMotorClient()

    for url in url_list:
        response = await client.get(url)
        d = parse(await response.text())
        await mongo_client.test.pages.insert_one(d)

    await client.close()

5.4 A better approach

import asyncio
from aiohttp import ClientSession
from motor.motor_asyncio import AsyncIOMotorClient

async def download_url(url, client):
    response = await client.get(url)
    d = parse(await response.text())
    await mongo_client.test.pages.insert_one(d)


async def download_urls(url_list):
    async with ClientSession as client:
        mongo_client = AsyncIOMotorClient()

        await asyncio.gather(download_url(url, client) for url in url_list)

loop = asyncio.get_event_loop()
loop.run_until_complete(download_urls(urls))

6 Coroutines

corutinas.png

7 Example of web programming in twisted

import treq
from klein import Klein

app = Klein()

@app.route('/')
async def google(request):
    response = await treq.get(b'https://www.google.com')
    content = await treq.content(response)
    return content

app.run("localhost", 8080)

7.1 Explanation

  • Klein is a microframework a la Flask.
  • Flask route style.
  • No proxy objects. Every view gets the request (a lot better!!).
  • And it's compatible with async def and await.

8 Asyncio for Web (with Sanic)

from sanic import Sanic
from sanic.response import html
from aiohttp import ClientSession

app = Sanic()

@app.route("/")
async def test(request):
    client = ClientSession()
    response = await client.get('https://www.google.com')
    content = await response.text()
    return html(content)

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

8.1 Explanation

  • Sanic is a microframework /a la / Flask for AsyncIO.
  • The Sanic example and the Klein example are almost equal.

9 ¿Why not threads or processes?

  • The infamous GIL.
  • Operating system limits (max processes and threads).
  • Threads and process creation overhead.
  • Debugging threads is hard.

10 Mixing threads with twisted/asyncio

import time
from twisted.internet import reactor, threads, defer

# To be executed in thread, not coroutine.
def do_long_calculation():
    time.sleep(3)
    return 3

@defer.inlineCallbacks
def print_result():
    # Await thread temination
    x = yield threads.deferToThread(do_long_calculation)
    print(x)

print_result().addCallback(lambda ign: reactor.stop())
reactor.run()

10.1 Explanation

  • You can send long running code to a thread and await (or yield) for it.
  • The function is executed in a thread pool.
  • This way you can mix threads (and processes too) with twisted code.
  • In asyncio you can use run_in_executor to achieve the same result.

11 ¿Questions?