Learn Flask By Example

With Flask, Peewee and Pytest

Hello, welcome to this book teaching the basics of Flask web development. This book aims to help anyone who is looking to get started with Flask but doesn't know where or how to begin.

The Accompanying Github repo for this project is available here. There you will find an EPUB version if you would prefer to read on an e-reader.

This book is licensed under Creative Commons Attribution 4.0. Please share it with anyone who you think will find it useful!


Chapter 0

Introduction

Welcome to this e-book teaching how to create web applications using primarily Flask, Peewee, and Pytest.

Throughout this book we will be setting up a digital shop site which will have a distinct public front-end and a gated back-end. Your shop will have a persistent database, a comprehensive suite of automated tests, and the ability to process data separately from the main web server.

Each chapter will introduce the relevant library in a hands-on approach, rather than dumping a heap of documentation upon the reader. If you’re someone who would prefer to broaden your knowledge of a library before getting stuck-in with it, I have included appendices at the end which go over the syntax of each library in greater detail. As I am not the author of any of these libraries, there’s always a chance that the information becomes outdated, so pay attention to the version numbers at the top of each appendix.

Coverage and Assumptions

We will be covering python and the titular libraries specifically. The front-end of the site will be written using normal Javascript and Flask’s Jinja template - no fancy frameworks or webpack here.

As I use Linux on my computers, the commands will be written assuming a unix-like operating system with a bash shell. If you are on Windows, I would recommend using the Windows Subsystem for Linux (WSL) to follow along. Setting this up is outside the scope of this book, so search online for something like “WSL Install Windows 10” to get started. Familiarity with using a terminal is required, as we will need to run various elements of the web server via the command line.

I will also not be teaching the Python language itself - you will need to be familiar with the language to follow along properly. For the best experience, a reader should have a grasp of functions, classes, and importing modules. Decorators are also used a lot in Flask, so a brief understanding of those is also recommended.

With that out of the way, let’s begin with some basics.

Installing Pyenv and Poetry

While there are many ways to install Python itself, and manage its dependencies, we will be using a tool called pyenv to install a specific version of the Python language, and a tool called poetry (along with in-built virtualenvs) to install dependencies.

If you already know what you are doing, already have a python installation, or you prefer to use other tools to manage installations, feel free to skip the rest of this section.

For this book, we will be using Python 3.10.

Installing Python via Pyenv

The pyenv tool is available over on Github, and comes with a very easy install script.

Please note, I am not the author of this tool, and so have no control over the contents nor the hosting. This means the links below may disappear at some point.

Before installation, follow the instructions for getting the necessary dependencies for your operating system from this URL:

https://github.com/pyenv/pyenv/wiki#suggested-build-environment

Once you have the dependencies taken care of, installing pyenv is as simple as cloning the repo and running a script.

git clone https://github.com/pyenv/pyenv-installer

cd pyenv-installer

bin/pyenv-installer

The installer will use git to grab the necessary repos for you, and set up a .pyenv folder in your home directory to contain both pyenv itself and any installed python versions.

With pyenv taken care of, it’s time to install python 3.10:

.pyenv/bin/pyenv install 3.10.1

This command will take some time to run. If you missed any of the pre-requisite dependencies you may see some errors here. Refer back to the wiki to sort those out.

Once your install is successful, we’re done with pyenv!

Installing dependencies with poetry

Now that we have the correct version of Python installed, it’s time to grab the titular libraries. We will be using a tool called poetry to achieve this, which is easily installed using Python’s built-in pip module.

Setting up a project directory

We’ll need to make a folder to store our project. This can be anywhere on your system, and can be moved at any time, so there’s no need to worry too much.

When you’ve chosen where to store your project, create a new directory called flask-peewee-pytest and cd into it.

Creating a Virtualenv

The first step is to create a virtual environment using the pyenv-provided version of Python.

~/.pyenv/versions/3.10.1/bin/python3 -m venv env

source env/bin/activate

Now that we have the virtual environment, we can install poetry into it.

pip install --upgrade pip

pip install poetry

Before we can begin installing with poetry, we need a pyproject.toml file. Don’t worry if you don’t know what this is, because poetry can generate one for us!

poetry init

Simply follow the prompts, choosing n to skip setting dependencies interactively - we will do this via the command line shortly.

Once this command finishes you will have a nice new pyproject.toml file.

Installing dependencies

To add dependencies via poetry, simply use poetry add. To specify development-only dependencies, it’s poetry add --dev.

Let’s install the titular dependencies to our virtual environment. Since our unit tests will not be running in production, we add pytest as a development dependency:

poetry add flask peewee

poetry add --dev pytest

That’s it, we’ve now got our three main dependencies installed, and are ready to begin writing our web-app. Get your editor / IDE of choice ready for the next chapter!

Chapter 1

Flask Beginnings

With our development environment taken care of, let’s dive right in to Flask. In this chapter we’ll go over the basics of getting a Flask server up-and-running, then look into how to structure the web side of our project in a maintainable way.

Hello Flask

In keeping with programming tradition, let’s start with a “Hello Flask” implementation.

In your project folder, create a file called web_server.py and add the following:

from flask import Flask

app = Flask(__name__)
app.secret_key = "very secret"

@app.route("/")
def hello_flask():
    return "Hello, Flask!"
    
if __name__ == "__main__":
    app.run(debug=True)

Save this file and run the webserver with python3 web_server.py. You should see a message telling you that the server is available on http://127.0.0.1:5000/. Visit this URL in a browser and you’ll be presented with “Hello, Flask!”.

Note: Mac users may find that port 5000 is already in use. To get around this, add the port=8080 argument to your app.run call, and visit http://localhost:8080/.

Not too much code went into making that, so let’s go over it.

First we import Flask itself, and create an instance named “app”. Passing the __name__ argument is convention.

Once we have our Flask instance, we need to set a secret key. This is used to encrypt session data, and so in a production environment should be a lot more secure than this simple string. We will come back to this later.

With our sessions secure, we now need to add some routes to our server. This is achieved by decorating a function with the route decorator, passing the URL as the argument, so “/” in this case. Inside this function we must return something which can be recognised by Flask as an HTTP response (typically a string, a dictionary, or a rendered template, which we will see shortly).

That’s it, we now have a server with a URL set up. The if __name__ == "__main__" block allows us to run this file as a script to launch the server, while also allowing other files to import from it without also causing the server to run. We use the run function to start our server, and pass debug=True to enable the debugging interface.

To have a look at the debugging screen, add the line 1 / 0 above the return statement in your hello_flask method, then reload the page. You should see a ZeroDivisionError screen with a nice stack trace showing the problematic line.

Rendering HTML

We know how to return plain text, but real websites use HTML, so how do we do that? Flask comes with a popular templating library called Jinja supported by default, as well as a helper function to both compile the template contents and return it as an HTTP response. Let’s turn our homepage into something a bit more realistic.

Create a templates folder in the root of your project, and then create an index.html file inside.

<h1>Welcome to my shop!</h1>
<hr>
<table>
    <tr>
        <td>Toothpaste</td>
        <td>£2</td>
    </tr>
    <tr>
        <td>Toothbrush</td>
        <td>£1.50</td>
    </tr>
    <tr>
        <td>Floss</td>
        <td>£0.99</td>
    </tr>
</table>

(Note that this is not “fully-formed” HTML, but a browser should render it anyway. We will deal with this later).

Template created! Now we need to render it. Hop back over to your web_server.py file and edit your hello_flask function as follows:

@app.route("/")
def index():
    return render_template("index.html")

We’ll start by renaming the function to index, as it better describes the page we’ll be rendering.

render_template is a helper function in Flask which will perform Jinja rendering and return your HTML as an HTTP response. We will need to import this though, so go up to the top of your file and import the library from flask

form flask import Flask, render_template

Run your web server once more and reload the index page. You should see a beautiful table of prices.

Obviously it would be rather daft to have to edit HTML directly when your shop’s stock changes - this information should be stored in a database and pulled out on the server side. We’ll get to the database itself next chapter, but for now we can pretend, and learn about Jinja rendering along the way.

Using variables in Jinja

In your web_server.py file, change the index function and add a dictionary of products to prices like so:

@app.route("/")
def hello_flask():
    products = {"toothpaste": 2.00, "toothbrush": 1.50, "floss": 0.99}
    
    return render_template("index.html", products=products)

When using render_template, any arguments passed after the filename of the HTML file will be passed to the template to be used by Jinja. This is known as the “template context”.

Change the content of your templates/index.html file to the following:

<h1>Welcome to my shop!</h1>
<hr>
<table>
    {% for product_name, price in products.items() %}
    <tr>
        <td>{{product_name}}</td>
        <td>{{ "£%.2f"|format(price) }}</td>
    </tr>
    {% endfor %}
</table>

To use our variables in Jinja, we need to utilise two different sets of special syntax: the double-brace {{ }} and the percent-brace {% %}. Percent-brace tags are used to insert logic, such as for loops and if statements. The double-brace is used to insert python variables as text inside the template.

In the above example, we use the percent-brace syntax to initiate a for loop, iterating over our products variable and unpacking the keys and values into two python variables - product_name and price. We then place the product names as text into our HTML by using the double-brace syntax. We do the same for the price, but use a pipe into a format string to get the correct currency display.

The pipe character is Jinja’s own syntax (rather than python’s) which passes the variable into a function exposed by Jinja to the templates. You can think of these as reversing the order of a normal function from my_func(variable) to variable|my_func.

We now have a single page listing all of our products, so let’s add individual pages for each one. Again, it’s not manageable to create a specific route for each product, so we’ll need to make use of the database’s information in Flask.

Using variables in Flask routes

Underneath your index function, add the following:

@app.route("/<string:product_name>")
def view_product(product_name):
    products = {"toothpaste": 2.00, "toothbrush": 1.50, "floss": 0.99}

    price = None
    if product_name in products:
        price = products[product_name]
    else:
        abort(404)

    return render_template(
        "products/view_product.html", price=price, product_name=product_name
    )

We now have a route which can take a variable from the URL, as you can see from the route decorator. This product_name variable is then passed into our view_product function automatically. We have annotated this variable as a string, so that we can guarantee the type of the python variable passed.

The rest of this function searches our products dictionary for the provided string, and if found sets the value of price. If the product is not in the dictionary, we use a flask helper to return an HTTP 404 response.

Adjust your import statement at the top of the file to include this function:

from flask import Flask, render_template, abort

To make our server run we’ll need to make that new template. Go ahead and create a folder inside your templates folder called products, then create an HTML file inside called view_product.html containing the following:

<h1>Welcome to my shop</h1>
<hr>

<h2>{{product_name}}</h2>

<p>This product costs £{{"%.2f"|format(price)}}</p>

<p>Out of Stock</p>

After you save this file, your server should be able to run again, so start it up with python3 web_server.py and head on over to localhost:5000. You should see your index page.

Add the name of a product to the end of your URL, for example /toothpaste, and you should see your new template render. You can also try using a product which doesn’t exist, such as /water, to see the 404 error page.

Now we have two templates which both share the same header at the top of the page. It would be a hassle to keep copy-pasting that into each new file, and we haven’t got the proper HTML declarations in our templates either. We’ll sort that out now by looking at template inheritance.

Template Inheritance

Jinja allows templates to inherit from others using two main tags:

Let’s see these in action by creating a base template. Add a file called base.html in your templates folder:

<!DOCTYPE html>
<html>
    <head>
        <title>My Shop</title>
    </head>

    <body>
        <h1>Welcome to my shop!</h1>
        <hr>
        {% block content %}
        {% endblock %}
    </body>

</html> 

We use a block tag to assign a name and location to a piece of inheritable content. Since we have created a block named content inside our body tags, any template inheriting our base can include a {% block content %} tag to insert its contents into the page body.

Let’s alter our index.html and view_product.html templates to inherit from the base.

index.html:

{% extends "base.html" %}
{% block content %}
<table>
    {% for product_name, price in products.items() %}
    <tr>
        <td><a href="/{{product_name}}">{{product_name}}</a></td>
        <td>{{ "£%.2f"|format(price) }}</td>
    </tr>
    {% endfor %}
</table>
{% endblock %}

view_product.html:

{% extends "base.html" %}
{% block content %}
<h2>{{product_name}}</h2>

<p>This product costs £{{"%.2f"|format(price)}}</p>

<p>Out of Stock</p>
{% endblock %}

Save and re-run your webserver and visit both the homepage and a product page (you may notice we added a tags so that you can visit a product page by clicking on the product’s name). They should both look just like before, but now have fully-formed HTML and are sharing the heading. Use the “Inspect Element” or “View Source” capabilities of your web browser to have a look at the generated HTML and see this for yourself.

The extends tag takes the name of the base template in quotes, then exposes all blocks defined in that template to the current file (although you do not need to add content to all blocks defined in the base). The content which is wrapped in {% block content %} in our child templates is then inserted into the base template where its own {% block content%} tags lie.

This covers the basics of flask. We now have a working web server, which we can add routes to using the @app.route decorator. We know how to pass variables to these routes using /<string:variable>, and how to get python variables into Jinja templates.

To continue with our website, we will now look at replacing those hard-coded products dictionaries with data from an actual database, as we dive into the Peewee libarary.

Chapter 2

Using Peewee with a Database

Peewee is a library often referred to as an Object Relational Mapper (ORM). It abstracts connections to a database into regular Python objects to make querying integrate seamlessly with the rest of your backend logic.

If you’d like a detailed look into Peewee before diving in, check out Appendix 1.

Making a database

We’re going to be using SQLite in this book, because it doesn’t require any installation or set-up. As Peewee is an abstraction layer, code from this book should work fine with a more dedicated solution, such as MySQL or Postgres.

Let’s begin by creating a folder in the root directory called models, and inside add two files - __init__.py and product.py.

__init__.py:

from peewee import Model
from playhouse.sqlite_ext import SqliteExtDatabase


class BaseModel(Model):
    class Meta:
        database = SqliteExtDatabase("database.db")

product.py:

from models import BaseModel
from playhouse.sqlite_ext import *


class Product(BaseModel):
    id = AutoField()
    name = TextField()
    price = TextField()

    class Meta:
        table_name = "product"

In our init file we import Peewee’s own Model class and its class for connecting to a Sqlite database. Using these we can define a BaseModel class which will act as a parent class for each of our models. Inside this class, we include another named Meta which is used by Peewee to point the models to their database. As we are using Sqlite for this book, we can simply point this to a file named database.db.

Our Product model imports our new BaseModel class and inherits from it. The column datatypes from Peewee’s Sqlite extension are also brought in.

Models are linked to their database columns by providing class attribues which are instances of Peewee’s column types. Here we have an AutoField - used to denote a primary key - and two TextFields to store each product’s name and price.

Another Meta subclass is used on our Product model to indicate its table name in the Sqlite database.

Speaking of which, let’s get this prepared now.

Database Migrations

Create a new folder in the root of your project called migrations. Create a file in here called V1__create_product_table.sql.

create table product (id integer primary key autoincrement, name text, price text);

insert into product values (1, "Toothpaste", "2.0"), (2, "Toothbrush", "1.50"), (3, "Floss", "0.99");

Great, now we have our first database migration! You may be wondering about that file name, so I shall explain. This naming schema is used by a product named FlywayDB, an open source generic database migration tool. By naming our migrations as such, we have the option to use this tool to handle our database if we want. Since Flyway uses plain SQL files to handle its migrations, it is possible to run our migrations manually if you would prefer not to set this tool up.

Setting up Flyway

Search the web for FlywayDB’s download page and grab the relevant version for your operating system. Make sure to extract the folder inside your project’s root directory. Rename the extracted folder, which should be flyway-x.y.z to just flyway for simplicity.

Inside the flyway folder you should have another named conf, containing a file flyway.conf. Open up this file and remove the default contents, then enter the following:

flyway.url=jdbc:sqlite:database.db
flyway.user=
flyway.password=
flyway.locations=migrations

Just one more thing before we can start using flyway - the first migration it finds is expected to be the “baseline” of the table, which means its initial state before any flyway migrations. If we run flyway now, it will assume this is V1 and not create our table, so we need to create an empry file inside the migrations folder called V0__baseline.sql.

We are now ready! Make sure you are in the project root and run the following command:

flyway/flyway migrate

If successful, you should see some confirmation output like the following:

Flyway is up to date
Flyway Community Edition 8.5.2 by Redgate
See what's new here: https://flywaydb.org/documentation/learnmore/releaseNotes#8.5.2

Database: jdbc:sqlite:database.db (SQLite 3.34)
Successfully validated 2 migrations (execution time 00:00.006s)
Creating Schema History table "main"."flyway_schema_history" ...
Current version of schema "main": << Empty Schema >>
Migrating schema "main" to version "0 - baseline"
Migrating schema "main" to version "1 - create product table"
Successfully applied 2 migrations to schema "main", now at version v1 (execution time 00:00.014s)

Congratulations, you now have a database!

Skipping Flyway and Running Migrations Manually

If you’d prefer to skip setting up Flyway, you can run the sqlite migrations from the command line directly.

First execute the sqlite3 database.db command to drop into a sqlite shell for your database. Then run each file like so:

.read migrations/Vx__name_of_file.sql

If you receive an error about the sqlite3 command not being found, you may need to install it on your computer first.

Using our Database

Now that our database exists, and has contents, we can begin using our Peewee models in our Flask endpoints.

Open up web_server.py and change your two routes to the following:

@app.route("/")
def index():
    products = Product.select()
    return render_template("index.html", products=products)


@app.route("/<string:product_name>")
def view_product(product_name):
    product = Product.get_or_none(Product.name == product_name)

    if not product:
        return abort(404)

    return render_template(
        "products/view_product.html", price=product.price, product_name=product_name
    )

We’ll also need to import our Product model at the top:

from models.product import Product

If you are familiar with SQL syntax, Peewee’s methods should feel familiar. We use Product.select() to grab all product records from our database - the equivalent of SELECT * FROM product. This returns an iterable of objects representing our database rows, with each class attribute we defined in models/product.py containing the data from our row.

In our view_product endpoint we need to fetch a specific row, so we use Peewee’s get_or_none function. This function needs to be supplied with any conditions on which to select the correct product. Since we have the product’s name as our product_name variable, we pass this as a condition to Peewee with Product.name == product_name. This tells Peewee’s query to find a row with a name record matching what’s in our product_name variable. This is the equivalent of SELECT * from product WHERE name = product_name (and “product_name” is actually the contents of the variable).

Since the get_or_none method returns None if no matching record is found, we can use this to return our abort(404) if a non-existing product name is passed.

The arguments to render_template update to pass the product variable’s price attribute to the price context.

Our templates won’t quite render yet, since they are expecting dicts for the product information. Let’s start with index.html, change the contents of your table to the following:

<table>
    {% for product in products %}
    <tr>
        <td><a href="/{{product.name}}">{{product.name}}</a></td>
        <td>{{ "£%.2f"|format(product.price|float) }}</td>
    </tr>
    {% endfor %}
</table>

We are now iterating over python objects representing our database rows, rather than dicts, so we no longer need .items(), and can use the dot syntax to access attributes.

Since price is a String in our database, it must be passed to the float filter before it can be formatted via the format filter.

And now in view_product.html:

<h2>{{product_name}}</h2>

<p>This product costs £{{"%.2f"|format(price|float)}}</p>

<p>Out of Stock</p>

Just the price to be converted to a float once again in this file.

With all of those changes in place, you can now run your webserver with python3 web_server.py and visit your shop once again. It should work exactly as before.

That’s all it takes to get a database into our site! Before we start adding functionality, we should first go over way to ensure our site will always work as intended. This leads us nicely to our last titular module - Pytest.

Chapter 3

Testing with Pytest

Automated tests allow us to ensure that our site’s expected functionality will stay the same as we continue adding features. This prevents us from having to manually use each aspect of our site every time we make changes.

If you’d like a detailed look into Pytest before diving in, check out Appendix 2.

To get started with Pytest, create a folder in the root directory of your project named tests and inside create a file named helpers.py.

from models.product import Product

__all__ = ["create_test_product"]


def create_test_product(name: str, price: str):
    p = Product()
    p.name = name
    p.price = price
    p.save()

    return p

All helper functions for our tests will be stored in this file. This avoids having to either re-define them in multiple tests, or elongate each test function by creating models manually.

We start off with just one function - create_test_product - which will create a record in our Product database table with the supplied name and price. Creating a record in Peewee can be done by creating an instance of a model class. You can then assign values to each of the defined class attributes representing the data to be stored for each column. Once the necessary fields are populated, the save method will write this new row to the database.

We also define an __all__ variable, which is a special python variable to allow a cleaner use of import * statements. When a file uses from tests.helpers import * it will normally import all definitions contained in that file. This means it would import the Product class in this file’s current state. However, when we define __all__ only the definitions inside this list will be imported. This means, in the file’s current state, any test using from tests.helpers import * will only be importing our create_test_product function, and not the Product class.

With this database helper function in place, we now need the ability to switch our Flask server into test mode. Create another file in the tests folder, this time named __init__.py:

from web_server import app

app.config.update(
    {
        "TESTING": True,
    }
)

testing_app = app.test_client()

This file will create a new variable called testing_app which is just our Flask server in testing mode. We can now use methods such as get and post to issue requests to our server inside tests.

Let’s now create our first test. We will load up the index page of the site and check each product is displayed.

Create a file named test_products.py in your tests folder:

from tests import testing_app
from tests.helpers import *


def test_index_displays_product_names():
    create_test_product("Toothbrush", "2.00")
    create_test_product("Toothpaste", "1.00")

    res = testing_app.get("/")

    assert res.status_code == 200
    
    res_data = res.get_data().decode("utf-8")
    
    assert "Toothbrush" in res_data
    assert "Toothpaste" in res_data

We grab our testing_app server and use a star-import to pull in our create_test_product helper.

The test_index_displays_product_names function begins by creating two products in our database, then uses testing_app to issue a request to the index page of our site. We then use an assert statement to ensure that the HTTP status code of the response was 200 Successful.

Once we know the request was successful, we then get the HTML returned by the endpoint using res.get_data() and decode it into a UTF-8 string. We can then test that this string contains the names of our two test products.

We are now ready to run this test - but wait! Our database models are still pointing to our main database, which could be our production database. Obviously we need an ephemeral copy which can be populated and checked-against for each test, and then cleared away ready for the next. This both prevents data from ending up in our real database, and allows unit tests to run without needing to keep track of which previous tests may have added or removed records.

There are multiple ways to solve this problem, but the way I find easiest is to use a decorator above each test which passes in only the tables required for this particular test to run.

Head back to tests/helpers.py and add in the following:

from functools import wraps
from playhouse.sqlite_ext import SqliteExtDatabase
...

__all__ = ["create_test_product", "with_test_db"]

def with_test_db(dbs: tuple):
    def decorator(func):
        @wraps(func)
        def test_db_closure(*args, **kwargs):
            test_db = SqliteExtDatabase(":memory:")
            with test_db.bind_ctx(dbs):
                test_db.create_tables(dbs)
                try:
                    func(*args, **kwargs)
                finally:
                    test_db.drop_tables(dbs)
                    test_db.close()

        return test_db_closure

    return decorator
    
...

This function takes a tuple of Peewee models, creates a new, in-memory Sqlite database and binds it to the provided models. It then runs the decorated function, drops all tables in the database, and closes the connection.

Head back to test_products.py and wrap our test with the new decorator:

from models.product import Product
...

@with_test_db((Product,))
def test_index_displays_product_names():
    ...

Note the trailing comma inside our tuple. Without this, python will interpret our Product model as a single independent argument, rather than as part of a one-item tuple.

Now we are finally ready to run our test. Open up a command line at the root of your project and run the pytest command. Hopefully you should see a success message saying 1 passed in green.

If you see an error about packages not being found, you may need to adjust the PYTHONPATH environment variable. In Linux or MacOS running bash or zsh, this should be done like so:

export PYTHONPATH=.

pytest

Congratulations, you have now written your first test for this project! Let’s not get ahead of ourselves though, there’s still another page to test. Let’s add more tests to test_products:

@with_test_db((Product,))
def test_view_product_shows_product_name_and_price():
    create_test_product("Toothbrush", "2.00")

    res = testing_app.get("/Toothbrush")

    assert res.status_code == 200

    res_data = res.get_data().decode("utf-8")
    
    assert "Toothbrush" in res_data
    assert "£2.00" in res_data


@with_test_db((Product,))
def test_view_product_shows_404_if_not_found():
    create_test_product("Toothbrush", "2.00")

    res = testing_app.get("/Bananas")

    assert res.status_code == 404

These new tests will both load the view_product route. The first will load a valid product and check the name and price are displayed on the page. The second loads a non-existent product and asserts that the HTTP response is a 404.

Head back to your command line and run your tests again with pytest. You should now see 3 passed in green.

Great, now we can assure that our site’s basic functionality is in place just by running the pytest command at any time. Let’s now progress the abilities of our site by adding a separate section for admininstrators, where they can enter details of new products without writing SQL migrations.

Chapter 4

Adding Admin Controls

Database migrations are typically used to control the structure of your database tables, rather than their content. Our initial migration added three products only to get us back to the same point we were at before, when our data was in dictionaries. If we want to continue adding products, we should build some way to do it within the application itself.

In order to achieve this, we will now create a separate section of our shop which will serve as an admin panel. We will gate this section behind a login system, as here users will be able to add / remove products from our shop, and change their prices.

We will start by creating two new pages on our site - /admin for the admin section, and /admin/login for an admin user to log in.

Following the current conventions of our code, you may think that these routes would go in the web_server.py file, since that’s where our other pages are held. However, as our application grows, this file will become much too big. As well as this, our admin URLs will always want to begin with /admin/ to denote that they are separate from the main site, and will all want to be kept behind a login system. It would be rather easy to forget either of these things, and risk exposing admin capabilities to the public.

Luckily, Flask provides a solution to both of these matters - Blueprints. A Flask Blueprint is a way of grouping together sections of a website under a common name. These sections can all contain a common URL prefix, and can have guards implemented before a request completes. We can use these two features to ensure that all of our admin functionality lives at /admin/* and is behind a login system.

Let’s begin by creating a folder in the root of our project called web. This will serve as the store for all things related to the web interface for our shop. Inside this folder create two more - views and blueprints. Inside blueprints create an __init__.py file:

from flask import Blueprint

site_blueprint = Blueprint("site", __name__)
admin_blueprint = Blueprint("admin", __name__, url_prefix="/admin")

We import the Blueprints class from flask and create two instances, site_blueprint and admin_blueprint. The first argument will be the name of the blueprint, which you will see used later when we use Flask’s url_for helper. The second argument is __name__ by convention. Our admin blueprint has a third argument, url_prefix. This is used to ensure that all URLs for this blueprint begin with /admin/ automatically.

Let’s create some placeholder routes for our admin section. We’ll start with an index and a login page. Inside your web/views folder, create two more - site and admin. This is where our URL routes will lie (Flask calls them “views”, but personally I think the term “routes” is clearer. I will stick with Flask’s terminology for our project, however).

Inside both site and admin create a file named __init__.py. This is where our guards will live. The public site needs no guards, but the admin will need to make sure a user is logged in before displaying a page.

Inside your new admin folder create a file named admin.py containing the following:

from web.blueprints import admin_blueprint


@admin_blueprint.route("/")
def admin_index():
    return "Welcome to the admin"


@admin_blueprint.route("/login", methods=["GET", "POST"])
def admin_login():
    return "please log in"

Here are our two placeholder views. As you can see, blueprints have routes added to them using the same decorator syntax as we have in our web_server.py file. We create a function named admin_index which serves as the entry-point to our admin section. The route "/" will combine with the url_prefix we specified on the Blueprint so that this page rests on /admin. Similarly, our admin_login route will be /admin/login.

Our login route has the methods argument passed to its decorator. This allows this route to be accessed by both GET and POST HTTP methods. When this argument is not provided, Flask assumes the route is meant only for GET requests, and any POST requests will be responded to with a 405 Method Not Allowed.

Our admin views are now in place, but before we flesh them out, let’s also pull our site routes out of web_server.py and into our new site folder.

Create a file at web/views/site/site.py and move over your views from web_server.py. Import site_blueprint at the top, and change the decorators from @app to @site_blueprint:

from flask import abort, render_template

from models.product import Product
from web.blueprints import site_blueprint


@site_blueprint.route("/")
def index():
    products = Product.select()
    return render_template("index.html", products=products)


@site_blueprint.route("/<string:product_name>")
def view_product(product_name):
    product = Product.get_or_none(Product.name == product_name)

    if not product:
        return abort(404)

    return render_template(
        "products/view_product.html", price=product.price, product_name=product_name
    )

Great, now all of our views are in place. To avoid long chains of imports, let’s consolidate our modules. Open up web/views/site/__init__.py and add this:

from . import site

Now do the same in web/views/admin/__init__.py:

from . import admin

Finally, create web/views/__init__.py and add this:

from . import admin, site

Now our views are ready to be imported in web_server.py. Open that up and change it to the following:

from flask import Flask

app = Flask(__name__)
app.secret_key = "very secret"

from web.views import admin, site
from web.blueprints import site_blueprint, admin_blueprint

app.register_blueprint(site_blueprint)
app.register_blueprint(admin_blueprint)


if __name__ == "__main__":
    print(app.url_map)
    app.run(debug=True)

Now, after creating our app we pull in all of our views from both the admin and site sections. This causes our Blueprints to “fill up” with the required routes. Now we pull in said Blueprints and use app.register_blueprint to tell our main Flask app that they exist.

That’s it, this file is now much cleaner, and our code has become modularised. We can now keep a nice separation between the public shop and the private admin section.

Speaking of which, let’s go add that guard to our admin

Route Guards in Flask

As well as route, Flask Blueprints have access to other decorators which can modify the lifecycle of a web request. One such example is before_request, which is used to perform logic before a web request is passed to our view. We can use this to check if a user is logged in when they try to access /admin, and redirect them to our login page if not.

Before we change our code, open up a browser and go to localhost:5000/admin. You should see our “Welcome to the admin” placeholder message. This means you have successfully navigated to our admin section without authenticating.

Now open up web/views/admin/__init__.py and add the following:

from flask import request, session, redirect, url_for

from web.blueprints import admin_blueprint

from . import admin


@admin_blueprint.before_request
def admin_before_request():
    if request.endpoint != "admin.admin_login":
        if "logged_in" not in session:
            return redirect(url_for("admin.admin_login"))

We pull in a couple of helpers from Flask, as well as our admin_blueprint, then define a function under @admin_blueprint.before_request. This function checks Flask’s request object to see if the endpoint attribute is anything other than admin.admin_login. If it is, and there is not a flag called "logged_in" in Flask’s session, we force a redirect to the admin.admin_login endpoint.

Don’t worry about the endpoint naming, or Flask’s session, just yet. We will cover them shortly.

Now save this file and try once again to get to localhost:5000/admin. You should hopefully be redirected to /admin/login, and see a different placeholder message. This means our admin section is protected against unauthenticated users!

Adding an Admin

Now that our page is guarded, we need to get ourselves a user so that we can unlock it. To achieve this we’ll need a new database table - admin_users. Create a new file in the migrations folder called V2__add_admin_user_table.sql:

CREATE TABLE admin_user (id integer primary key autoincrement, username text, password text);

Run the flyway/flyway migrate command in your project root folder and you will now have a second table. Time for our Peewee model. Create models/admin_user.py:

from models import BaseModel
from playhouse.sqlite_ext import *


class AdminUser(BaseModel):
    id = AutoField()
    username = TextField()
    password = TextField()

    class Meta:
        table_name = "admin_user"

Much like our Product class, the AdminUser contains an id field and two text fields.

Now we need an admin user, but as previously mentioned, it’s best to just use migrations for table structure, and not data. So, let’s use the Python interactive shell to make us our first admin user. Open a terminal in the root of your project, ensure your virtualenv is sourced, and run python3.

First we’ll create an AdminUser and assign them a username. I will use “admin”, but you can use whatever name you’d like.

Python 3.10.0 (default, Oct  4 2021, 00:00:00) [GCC 11.2.1 20210728 (Red Hat 11.2.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from models.admin_user import AdminUser
>>> user = AdminUser()
>>> user.username = "admin"

Now our user needs a password. Since it’s bad security practise to store password in plain-text, we will store our password as a hash. Flask itself is built upon a library known as werkzeug which provides functions for securely hashing and checking passwords. For example, to hash the password “admin”:

>>> from werkzeug.security import generate_password_hash
>>> generate_password_hash('admin')
'pbkdf2:sha256:260000$aOwDEpVMd5aO0oie$2916af44ac8b1e79d8a1ac74882f1602da2bae33fc690491c5014fe98f186ed4'

Pick a nice password for your admin user, and set the hash of this password as the AdminUser’s password attribute:

>>> user.password = generate_password_hash("admin")

Now we can write this row to our database with save():

>>> user.save()
1

And we can confirm the record is now in our database:

>>> u = AdminUser.get()
>>> u.username
'admin'
>>> u.password
'pbkdf2:sha256:260000$O0eSD3qnpfXpqVYn$4ca214264ba24b66ffc01b5ec801d50314439f24e947df03d4f763d4a58e5704'
>>> 

Logging in

With an admin user created, we can now create a simple login screen and update our view accordingly. Create a folder named admin in your templates folder, then create two files inside of it - login.html and index.html.

Let’s start with our login form, so open up login.html:

<h1>Log in</h1>

{% for msg in get_flashed_messages() %}
    <h2 style="color:red">{{msg}}</h2>
{% endfor %}

<hr />

<form name="login_form" method="POST">
    <label for="username">Username</label>
    <input type="text" name="username" id="username">

    <br />
    <br />

    <label for="password">Password</label>
    <input type="password" name="password" id="password">

    <br />
    <br />

    <input type="submit" value="Log In">
</form>

This is mostly a standard HTML form. At the top you will notice a call to get_flashed_messages. Flash messages are simply a short message which should be viewed only once by the user. They are typically used to convey the success or failure of a single action. We use a style attribute to colour ours red, since it will only show when the login attempt is unsuccessful.

Now we have the template, let’s get our view rendering it. Open up web/views/admin/admin.py:

from flask import request, session, redirect, url_for, render_template, flash
from werkzeug.security import check_password_hash

from models.admin_user import AdminUser
from web.blueprints import admin_blueprint


@admin_blueprint.route("/")
def admin_index():
    return render_template("admin/index.html")


@admin_blueprint.route("/login", methods=["GET", "POST"])
def admin_login():
    if request.method == "POST":
        username = request.form.get("username")
        password = request.form.get("password")

        user = AdminUser.get_or_none(AdminUser.username == username)
        if user:
            password_correct = check_password_hash(user.password, password)

            if password_correct:
                session["logged_in"] = True
                session["admin_user_id"] = user.id

                return redirect(url_for("admin.admin_index"))
            else:
                flash("Please try again!")
        else:
            flash("Please try again")



    return render_template("admin/login.html")

We add a couple of new imports from flask, our old friend render_template and the new flash function to display the aforementioned flash messages.

The check_password_hash function is brought in from werkzeug so that we can test the password hash from our new AdminUser instance.

Our admin_index function is changed to render a template rather than returning a string.

The admin_login route is now fully fleshed-out. We begin by checking the method of our request object so that we can tell whether we are on the initial page-load (a GET request) or if we have the data from our form submitted (a POST request). If we have a POST request, we use the form attribute of our request object to extract the POSTed data. The get method acts like it does for a normal dict, it returns the value if present, or None if not.

To validate the credentials, we try and grab an AdminUser from our table by matching on the username. If we have a user with this username, we then use check_password_hash, passing the AdminUser’s stored password and the password supplied by the form. If the passwords match then the submitted details are both correct, and we can log the user in. We achieve this by setting the "logged_in" session variable, and also store the AdminUser’s ID, then redirect the user off to the index.

If the username was not found, or the passwords did not match, the flash function is used to display an error message, and the login.html template will be rendered once again.

Before we can test this, let’s put a heading into admin/index.html so that we can see when we’re logged in successfully:

<h1>Welcome to the admin!</h1>

<p>You are logged in</p>

Re-run your web_server.py file and head over to http://localhost:5000/admin. You should get redirected to the login form.

First try entering incorrect details in to have a look at the flash message. Then put in your correct login details and see yourself sent back to the index page. Hooray!

Editing Data

Now that our admin page is guarded by the login system, we can start adding the functionality to create, read, update, and delete our database records. This is often referred to by the acronym CRUD.

Let’s start with “read” by showing a list of Products on the admin index. Change your view to the following:

...

@admin_blueprint.route("/")
def admin_index():
    all_products = Product.select().order_by(Product.name.asc())

    return render_template("admin/index.html", all_products=all_products)

...

We are passing a generator of all products, ordered by their name attribute, to our template. Now let’s display them on admin/index.html:

<h1>Welcome to the admin!</h1>
<hr />
<h2>All Products</h2>
<br />

<table>
    <thead>
        <tr>
            <th>Name</th>
            <th>Price</th>
            <th>Actions</th>
        </tr>
    </thead>
    <tbody>
    {% for product in all_products %}
        <tr>
            <td>
                <a href="/admin/{{product.id}}">
                    {{product.name|title}}
                </a>
            </td>
            <td>
                £{{"%.2f"|format(product.price|float)}}
            </td>
            <td>
                <button onclick="deleteProduct({{product.id}})">Delete</button>
            </td>
        </li>
    {% endfor %}
    </tbody>
</table>

<script>
    function deleteProduct(productId) {
        console.log("deleting product", productId);
    }
</script>

After our welcome message we display a table of all available products, showing their name, price, and some actions. Clicking a product’s name will take you to its update page (which we will create next), and each product has a “Delete” button which we can use later to remove a product from our database.

For the moment, this button will just log the clicked product’s ID to the javascript console.

Reload your admin index and you should see your table with your three products listed.

Let’s start by building the edit page, so we can click on those product names. Create a new template file templates/admin/products/edit_product.html:

<h1>Edit {{product.name}}</h1>

<hr>

<form name="product_form" action="{{url_for('admin.save_product')}}" method="POST">
    <input type="hidden" name="product_id" value="{{product.id}}">

    <label for="name">Name:</label>
    <input type="text" name="name" id="name" value="{{product.name}}">

    <br/>
    <br/>

    <label for="price">Price:</label>
    <input type="text" name="price" id="price" value="{{product.price}}">

    <br/>
    <br/>

    <input type="submit" value="Save">
</form>

A basic HTML form with two text inputs, one for the product’s name and one for its price. We also have a hidden input which will send this product’s ID.

We can’t use this page yet, as the admin.save_product view doesn’t exist yet, so hop back over to views/admin/admin.py:


...

@admin_blueprint.route("/<int:product_id>")
def edit_product(product_id: int):
    product = Product.get_or_none(Product.id == product_id)
    if not product:
        flash("Product not found")

        return redirect(url_for("admin.admin_index"))

    return render_template("admin/products/edit_product.html", product=product)


@admin_blueprint.route("/save-product", methods=["POST"])
def save_product():
    product = Product()

    product_id = request.form.get("product_id")
    if product_id:
        product = Product.get_or_none(Product.id == product_id)
        if not product:
            return redirect(url_for("admin.admin_index"))

    name = request.form.get("name")
    price = request.form.get("price")

    product.name = name
    product.price = price
    product.save()

    return redirect(url_for("admin.edit_product", product_id=product.id))

First our edit_product view. This view takes a product ID from the URL and makes sure it is an integer. We then use Peewee to try and fetch a product with this ID from the database. If it’s not found, we send the user back to the admin index, since we can’t show its form. If we find it, we then render the edit_form.html template, passing the product as context.

The save_product view begins by creating a new Product instance. This will allow us to re-use this view later when we build the “create” functionality. If a product_id is passed in our POST data, then the blank product is replaced by the result of a get_or_none on the provided ID. If no product is found, we return the user to the admin index.

Once we have our Product instance, we can get the POSTed data from the request.form object and assign the values to the instance’s name and price attributes. We then call save() to write the changes to our database, before reloading the edit_product page.

Your site should be in working condition now, so go ahead and restart your web_server.py command and view http://localhost:8080/admin in your browser. After logging in, click a product’s name and you’ll see its edit form. Try changing the name and price and pressing Save. You should see your changes persist after saving, and back on the admin index page.

Groovy! Now let’s add the “create” functionality. Put this very simple new view into views/admin/admin.py:

@admin_blueprint.route("/create-product")
def create_product():
    return render_template("/admin/products/create_product.html")

And now create yourself a template at templates/admin/products/create_product.html:

<h1>Create a Product</h1>

<hr>

<form name="product_form" action="{{url_for('admin.save_product')}}" method="POST">
    <label for="name">Name:</label>
    <input type="text" name="name" id="name">

    <br/>
    <br/>

    <label for="price">Price:</label>
    <input type="text" name="price" id="price">

    <br/>
    <br/>

    <input type="submit" value="Save">
</form>

This should look familiar, since it’s mostly just the edit_form again, but without the hidden input or value attributes.

Now we need a link to this on our index, so open up templates/admin/index.html and add this below your table:

...

<hr />

<a href="{{url_for('admin.create_product')}}">Create new Product</a>

Reload your admin index and try clicking on this link. You should be greeted with a blank form. Fill it in and create yourself a new product. After saving, your product should appear on the admin index amongst the others. Neat!

Just one step left now - “delete”. Still in your admin index template, alter the deleteProduct javascript function at the bottom to the following:

    async function deleteProduct(productId) {
        var fd = new FormData();
        fd.set('product_id', productId);

        var response = await fetch(
            '{{url_for("admin.delete_product")}}',
            {
                method: 'POST',
                body: fd,
            } 
        );

        var res_json = await response.json()
        alert(res_json.message);

        location.reload();
    }

This function uses the fetch syntax to send a POST request to the admin.delete_product view containing form data with the selected product’s ID. That view doesn’t exist yet, so open up views/admin/admin.py for one final time and add the following:

@admin_blueprint.route("/delete-product", methods=["POST"])
def delete_product():
    product_id = request.form.get("product_id")
    product = Product.get_or_none(Product.id == product_id)
    if not product:
        return {"success": False, "message": "Product not found"}, 400

    Product.delete().where(Product.id == product_id).execute()

    return {"success": True, "message": "Product Deleted"}

This view grabs the product_id from the POST data and uses it to fetch a matching Product instance. If not successful, an error response is returned (the 400 after the JSON is the HTTP status code of the request). If the product was found Peewee’s delete().where().execute() functionality is used to delete the product from our database.

Reload your web_server and site then give one of those delete buttons a whack. You should see a popup saying “Product Deleted” and the page should update with the product no longer in your table.

Great - that’s all parts of the CRUD functionality completed! With this out of the way, we can now go on to the most important part of any shop - the ability to make sales.

Chapter 5

Adding Purchase Abilities

Now that we have the ability to create products to list on our shop, let’s add the ability for a customer to purchase them. We’ll create a Cart which the visitor can use to store items, then a Checkout page where they can enter an email address to complete their order.

Creating a Cart

We can use Flask’s session capabilities to give each user a Cart which will follow them around the site. The session is essentially just a regular python dict which will be stored as an encrypted cookie in the user’s browser, and thus will be available in every request they make. This allows us to store data semi-permanently against each individual user.

To use Flask’s session we simply import it and start assigning values. We have seen this in action when creating our login system for the admin.

Let’s now display our cart on the site. Open up templates/base.html and change the contents of the <body> tag to the following:

    <body>
        <h1>Welcome to my shop!</h1>
        <hr>
        Items in Cart: <span id="cart-items">{{cart|length}}</span>

        <a href="/checkout">Checkout</a>
        <hr>

        {% block content %}
        {% endblock%}
    </body>

Now the number of items in our cart will display underneath our header. Let’s create and pass a Cart through in our index page.

from flask import ..., session

@site_blueprint.route("/")
def index():
    products = Product.select()
    if "cart" not in session:
        session["cart"] = "1"
        
    return render_template("index.html", products=products, cart=cart)

Reload your site’s index page and you should see “Items in Cart: 1” at the top, as well as a link to a checkout page. Now try changing the "1" to "123" and you should see “Items in Cart: 3”.

Great, now we can keep track of the number of items in our Cart. However, if you load another page, you’ll get an error from Jinja because cart is missing. To avoid having to remember to pass this variable in every single view, let’s create a global template context for our site views, so we will always have our cart.

Global Jinja Context

Open up views/site/__init__.py and change it to this:

from . import site
from web.blueprints import site_blueprint

@site_blueprint.context_processor
def inject_variables():
    from flask import session

    if "cart" not in session:
        session["cart"] = []

    return {"cart": session["cart"]}

Here we use the context_processor decorator to inject variables into all templates under the site_blueprint. This function grabs the session from Flask and adds an empty list named "cart" if it doesn’t already exist. Then it returns a dictionary, which will be the default context for any templates using Flask’s render_template function within this blueprint.

Save this and open a product page on your site. You should now still see your Cart items in the header. Now we can remove the Cart-related code from our index view again.

@site_blueprint.route("/")
def index():
    products = Product.select()
    return render_template("index.html", products=products)

Adding To Cart

Now we have a Cart, let’s get items added. Firstly, we’ll need to clear the string-version of our session variable. If you know how, clear the cookies for localhost on your browser. If you don’t, you can add the line session["cart"] = [] to your index view, load the index page, then remove the line.

With that taken care of, open up templates/products/view_product.html and change your content block to the following:

<h2>{{product.name}}</h2>

<p>This product costs £{{"%.2f"|format(product.price|float)}}</p>

<button onclick="addToCart({{product.id}})">Add to Cart</button>

<script>
    async function addToCart(productId) {
        var fd = new FormData();
        fd.set('product_id', productId);

        var response = await fetch(
            '{{url_for("site.add_product_to_cart")}}', 
            {
                method: 'POST', 
                body: fd,
            }
        );

        var res_json = await response.json();
        var cartItems = res_json.cart_items;
        document.getElementById("cart-items").innerText = cartItems;
    }
</script>

We’re adding a button underneath our product’s information which will add this product to our Cart. This is achieved using an onClick function called addToCart which will call some javascript to send a request to our backend.

The addToCart function creates a FormData object to wrap up our chosen Product’s ID and then POSTs it to a new endpoint called add_product_to_cart using fetch. The server’s response is then parsed and we update the count of our Cart items using the value returned as cart_items.

Let’s create this new endpoint now. Open up views/site/site.py and add the following:

@site_blueprint.route("/add-product-to-cart", methods=["POST"])
def add_product_to_cart():
    product_id = request.form.get("product_id", type=int)
    if not product_id:
        return {"success": False, "cart_items": len(session["cart"])}

    current_cart = session["cart"]
    current_cart.append(product_id)
    session["cart"] = current_cart

    return {"success": True, "cart_items": len(session["cart"])}

In this endpoint we check request.form for the product_id sent by our Javascript’s FormData object and parse it as an integer using type=int. If it isn’t found, we return an unsuccessful response which does not change the items in the Cart.

If the product is found we can then add its ID to our Cart. We do this by copying the cart from our session into a current_cart temporary list, appending the Product’s ID to it, then assigning this back to the session’s Cart.

We then return a successful response which includes the cart_items key which the Javascript can use to update the count in our header.

With this new endpoint in place, go ahead and load a product page in your browser. Click the “Add to Cart” button a few times and see your “Items in Cart” heading increase. Navigate to another Product page and see your Cart’s status follow you around.

Great! Now we have a working Cart, so let’s add a way for a user to Checkout.

Checking Out

When checking out, we will display all of the items in our user’s Cart and sum up the total price. There will then be a place for the user to enter their email address and complete the purchase. In a real online-shop this would be where the user makes an account and enters their payment information, but we’ve already learned about account creation, and taking payments is out of scope for this book.

We’ll need a new database table for storing any completed orders, so let’s go ahead and create that now. Make a file in your migrations folder called V3__add_order_table.sql containing the following:

create table "order" (id integer primary key autoincrement, timestamp_created string, email string, products string);

Here "order" is in double-quotes because it is a reserved word in SQL.

Run flyway/flyway migrate to create yourself a new table. Now we’ll need the Peewee model, too. Make a file in your models folder called order.py and add this new class:

from models import BaseModel
from playhouse.sqlite_ext import *


class Order(BaseModel):
    id = AutoField()
    timestamp_created = DateTimeField()
    email = TextField()
    products = JSONField()

We haven’t seen DateTimeField or JSONField yet. These are used to tell Peewee that the string values in the database can be interpreted as datetime objects or json dicts, respectively. The data in SQLite will remain a regular string, however.

Now that we can store a record of our orders, we’re in position to add the functionality to our shop. Let’s create a new view in our views/site/site.py file for checking out:

@site_blueprint.route("/checkout", methods=["GET", "POST"])
def checkout():
    cart_items = []
    cart_products = {}
    total_price = 0
    for product_id in session["cart"]:
        p = Product.get_or_none(Product.id == product_id)
        if p.name not in cart_products:
            cart_products[p.name] = {"total": float(p.price), "quantity": 1}
        else:
            cart_products[p.name]["total"] += float(p.price)
            cart_products[p.name]["quantity"] += 1

    for name, price_info in cart_products.items():
        cart_items.append(
            {
                "name": name,
                "quantity": price_info["quantity"],
                "total_price": price_info["total"],
            }
        )
        total_price += price_info["total"]

    return render_template(
        "checkout.html", cart_items=cart_items, total_price=total_price
    )

There’s quite a lot of logic here, but it’s mostly just processing the Cart items into a user-readable format.

We create three variables called cart_items, cart_products and total_price. Next we loop through all of the Product IDs in our Cart and fetch the Product instance for each.

If we haven’t come across this Product before we create a dict inside of cart_products which pairs the Product’s name to another dict of its price and a quantity of 1. If we already have this product in our cart_products we simply increase the total price by its unit cost and increase the quantity by one.

Once we’ve built up this mapping of Produt names to price info, we can loop through our cart_products and append a dict displaying the information to our cart_items list, as well as summing up the cost of each Product in our total_price variable.

With the data processed, we can then pass our cart_items and total_price variables to a new template. Speaking of which, let’s create templates/checkout.html now:

{% extends "base.html" %}

{% block content %}
    <h1>Checkout</h1>
    <hr>

    <table>
        <tr>
            <th>Product</th>
            <th>Quantity</th>
            <th>Total</th>
        </tr>
        {% for item in cart_items %}
            <tr>
                <td>{{item.name}}</td>
                <td>{{item.quantity}}</td>
                <td>£{{"%.2f"|format(item.total_price)}}</td>
            </tr>
        {% endfor %}
        <tr>
            <td>-</td>
            <td>-</td>
            <td>-</td>
        </tr>
        <tr>
            <td><b>Total</b></td>
            <td></td>
            <td><b>£{{"%.2f"|format(total_price)}}</b></td>
        </tr>
    </table>
{% endblock %}

In this template we can loop through our cart_items and display them neatly in a table. At the bottom of the table, we add a row full of dashes as a separator, then the total price in bold.

If you want to make the table look a bit nicer, you can add the following styles above your h1 heading:

    <style>
        table {
            border-collapse: collapse;
        }
        table tr td {
            border: 1px solid black;
            text-align: center;
            padding: 8px;
        }
    </style>

Load up your shop and add some items to your Cart, then click the “Checkout” link in the header. You should see a nice breakdown of all items in your Cart, and a total price at the bottom.

Now we need a way for the purchase to complete, so add the following underneath your table:

    <br>
    <hr>

    <h2>Complete your Purchase</h2>

    <form name="checkout-form" method="POST">
        <label for="email">Email Address</label>
        <input type="email" name="email">

        <br>
        <br>

        <input type="submit" value="Checkout">
    </form>

This will add a form which takes the user’s email address. We’ll now need to process this on the server, so head back to views/site/site.py and add the following lines just above your return statement:

    if request.method == "POST":
        user_email = request.form.get("email")
        order = Order()
        order.email = user_email
        order.products = cart_products
        order.timestamp_created = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        order.save()

        session["recent_order_id"] = order.id

        return redirect(url_for("site.complete"))

Then add an import at the top of the file:

from datetime import datetime

On a POST request, generated by a user submitting the form, we grab the provided email from request.form and create an instance of our Order class. This order is populated with the email address, our cart_products dict, and the current date and time.

The ID assigned to the order is then saved to the session and we redirect to the site.complete endpoint, which we can write now:

@site_blueprint.route("/complete")
def complete():
    if "recent_order_id" not in session:
        return redirect(url_for("site.index"))

    order = Order.get_or_none(Order.id == session["recent_order_id"])

    session.pop("recent_order_id")
    session["cart"] = []

    return render_template("complete.html", order=order)

If there’s no recent order we just redirect the user back to the index, as there’s no use for this page. Otherwise, we fetch the order details from our database using the ID we saved in the session, then remove the recent_order_id using pop and clear out the user’s Cart.

Our templates/complete.html template will be as follows:

{% extends "base.html" %}

{% block content %}
    <h1>Order Complete!</h1>
    <hr>
    <h2>Thanks for your order!</h2>
    <br>
    <h3>Confirmation will be sent to {{order.email}}</h3>
    <hr>

    <table>
        <tr>
            <th>Product</th>
            <th>Quantity</th>
            <th>Price</th>
        </tr>
        {% for name, price_info in order.products.items() %}
            <tr>
                <td>{{name}}</td>
                <td>{{price_info["quantity"]}}</td>
                <td>£{{"%.2f"|format(price_info["total"])}}</td>
            </tr>
        {% endfor %}
    </table>

    <hr>

    <a href="/">Continue Shopping</a>
{% endblock %}

The complete.html template simply thanks the user for their order, notifies them about incoming email confirmation, and renders the same table of their purchased products as the Checkout page.

Load up your shop in your browser and visit the Checkout page again. Enter something into the email input at the bottom and press the “Checkout” buton. You should see the confirmation of your purchase.

That’s it for our online shop! We’ve now got a Cart which follows the user around between page-loads, and a checkout system which collects and stores orders along with the customer’s email address.

The next chapter will focus on writing some more automated tests for this new functionality to ensure it always works as expected.

Chapter 6

More Testing

Now that our admin and site are fully-functional, we should add some automated tests to ensure that our site’s core functionality will still work as we add features in the future.

We’ll start with our admin section, since we need to ensure it’s only available when a user is logged in.

Testing the Admin

As we have two distinct parts to our site, we should split our test suite up to match. Create two folders in your tests folder - site and admin. Then move your test_index.py file into site and rename it to test_site.py. Now create a test_admin.py file inside your tests/admin folder and add the following:

from models.admin_user import AdminUser
from models.product import Product
from tests.helpers import *
from tests import testing_app

def test_index_redirects_to_login_if_not_logged_in():
    res = testing_app.get("/admin/")

    assert res.status_code == 302
    assert b"/login" in res.get_data()

This first test tries to load the /admin page and asserts that the response is a 302 (redirect). We also see the URL we are being sent to, and we assert that it is the /login page. You will notice the b in front of our "/login" string - this is because HTTP responses are bytes. You can check a response’s content with either a bytes string (like above) or by adding .decode("utf-8") to res.get_data() (as we did in chapter 3). We’ll be using bytes strings in this chapter, as it’s shorter to write.

def test_index_does_not_redirect_if_logged_in():
    with testing_app.session_transaction() as s:
        s.clear()
        s["logged_in"] = True
        s["admin_user_id"] = 1

    res = testing_app.get("/admin/")

    assert res.status_code == 200

This test uses the session_transaction method to let us modify the Flask session before our request. We use this to set the "logged_in" and "admin_user_id" values so that we are now logged in as an admin. We once again load the /admin route and assert that it is successful (rather than a redirect).

Logging in with the session_transaction method will be needed for a lot of our methods, so let’s create a helper at the top of this file to perform this.

def admin_login():
    with testing_app.session_transaction() as s:
        s.clear()
        s["logged_in"] = True
        s["admin_user_id"] = 1

Before we use this, however, we should assure our login functionality works as intended:

from models.admin_user import AdminUser
...

@with_test_db((AdminUser,))
def test_login_logs_user_in_with_correct_details():
    with testing_app.session_transaction() as s:
        s.clear()

    create_test_admin_user("admin", "admin")

    form_data = {"username": "admin", "password": "admin"}

    res = testing_app.post("/admin/login", data=form_data, follow_redirects=True)

    assert res.status_code == 200
    assert b"Welcome to the admin" in res.get_data()


@with_test_db((AdminUser,))
def test_login_shows_error_with_incorrect_details():
    with testing_app.session_transaction() as s:
        s.clear()

    create_test_admin_user("admin", "admin")

    form_data = {"username": "admin", "password": "password"}

    res = testing_app.post("/admin/login", data=form_data, follow_redirects=True)

    assert res.status_code == 200
    assert b"Please try again!" in res.get_data()

Before testing our login system we need to ensure there is no session, so we use session_transaction to ensure the session has been cleared. We then need to make an AdminUser which can be authenticated against. In the first test, we create our form data with the correct login details and use the testing_app.post method to send this to the login URL. The data argument is used to send our form_data as POST data, and the follow_redirects argument will make the res response contain the page we are redirected to, rather than a 302 status code. We then check the page we are taken to and assert that it contains the text from our admin header.

On the second test, since we send the wrong details, we check that the response contains our flashed error message.

Before we can run these first few tests we’ll need that helper function to create an AdminUser. Open up tests/helpers.py and we’ll add helpers for our other models:

from datetime import datetime
from models.admin_user import AdminUser
from models.order import Order
...

def create_test_admin_user(username: str, password: str):
    from werkzeug.security import generate_password_hash

    u = AdminUser()
    u.username = username
    u.password = generate_password_hash(password)
    u.save()

    return u


def create_test_order(email: str, products: dict):
    o = Order()
    o.email = email
    o.timestamp_created = datetime.now().format("%Y-%m-%d %H:%M:%S")
    o.products = products
    o.save()

    return o

Now we have the ability to make AdminUser and Order instances easily. Back to tests/admin/test_admin.py, let’s write tests for the CRUD functionality for Products.

@with_test_db((Product,))
def test_create_product_shows_form():
    admin_login()

    res = testing_app.get(f"/admin/create-product")
    res_data = res.get_data()

    assert b'<form name="product_form"' in res_data


@with_test_db((Product,))
def test_edit_product_shows_populated_form():
    admin_login()

    p = create_test_product("Floss", "1.50")

    res = testing_app.get(f"/admin/{p.id}")
    res_data = res.get_data()

    assert b"Floss" in res_data
    assert b"1.50" in res_data
    assert b'<form name="product_form"' in res_data

To check creating we use our admin_login helper to set up our session, then load the /create-product URL and check that our form is there.

For editing we first need a product, so we create one using the helper. We then visit the admin URL for that product and check the name and price are shown, as well as the form.

@with_test_db((Product,))
def test_save_product_as_create():
    admin_login()

    assert Product.select().count() == 0

    form_data = {"name": "Floss", "price": "1.50"}

    testing_app.post("/admin/save-product", data=form_data)

    assert Product.select().count() == 1

    p = Product.get()
    assert p.name == "Floss"
    assert p.price == "1.50"


@with_test_db((Product,))
def test_save_product_as_edit():
    admin_login()

    p = create_test_product("Toothbrush", "2.99")

    assert Product.select().count() == 1

    form_data = {"product_id": p.id, "name": "Floss", "price": "1.50"}

    testing_app.post("/admin/save-product", data=form_data)

    assert Product.select().count() == 1

    p = Product.get()
    assert p.name == "Floss"
    assert p.price == "1.50"

We test saving in the context of both a create and an edit. For create we first check there are no instances in our database, using Peewee’s select().count(), then POST some form data to our /save-product URL. We now check that the database has an entry, and that the data in that entry is the same as from our form data.

Updating requires an existing product, so we create one and assert that it’s the only record in our database. Then we post form data with different values and check that a new instance was not created. We finish by asserting that the form data has replaced the original data for this record.

@with_test_db((Product,))
def test_delete_product():
    admin_login()

    p = create_test_product("Floss", "1.50")

    assert Product.select().count() == 1

    form_data = {"product_id": p.id}

    testing_app.post("/admin/delete-product", data=form_data)

    assert Product.select().count() == 0

Finally, deleting a product is tested by creating an instance then POSTing its ID to our /delete-product URL. We then check that the row has been removed from our database.

Great, that’s all of our admin section tested! We can now be sure that adding new functionality won’t change the expected behaviour of any of these URLs.

Testing the Shop

Since we last wrote tests for the shop we’ve added a Cart and Checkout functionality. Let’s get tests for these features going. Open up tests/site/test_site.py:

@with_test_db((Product,))
def test_add_product_to_cart():
    p = create_test_product("Floss", "1.50")
    p2 = create_test_product("Toothbrush", "2.99")

    with testing_app as app_with_session:
        app_with_session.get("/")

        from flask import session

        assert "cart" in session
        assert session["cart"] == []

        res = app_with_session.post("/add-product-to-cart", data={"product_id": p.id})

        assert session["cart"] == [p.id]
        assert res.get_json()["cart_items"] == 1

        res = app_with_session.post("/add-product-to-cart", data={"product_id": p2.id})

        assert session["cart"] == [p.id, p2.id]
        assert res.get_json()["cart_items"] == 2

In order to test our Cart we need some products to add to it, so we create two using our helper.

The Flask session is not usually available to be imported in tests, but we can use a with block around our testing_app which will allow us to import and check the contents of the session whilst inside.

Using this, we fire a request to the index page in order to create an empty cart in our session. We can then import this session and check for its presence.

A couple of requests to /add-product-to-cart are fired off and the session’s cart is checked for their IDs.

@with_test_db((Order, Product))
def test_checkout_get():
    p = create_test_product("Floss", "1.50")
    p2 = create_test_product("Toothbrush", "2.99")

    with testing_app.session_transaction() as s:
        s.clear()
        s["cart"] = [p.id, p.id, p2.id]

    res = testing_app.get("/checkout")
    res_data = res.get_data()

    assert b"Floss" in res_data
    assert b"Toothbrush" in res_data
    assert b"3.00" in res_data
    assert b"2.99" in res_data

To test our Checkout page we create two products and use a session_transaction to put them into our Cart. We then load the /checkout page and get its contents. The names and prices of our products are checked for inside (we look for "3.00" since we have 2 instances of our Floss at 1.50).

@with_test_db((Order, Product))
def test_checkout_post():
    p = create_test_product("Floss", "1.50")
    p2 = create_test_product("Toothbrush", "2.99")

    assert Order.select().count() == 0

    with testing_app.session_transaction() as s:
        s.clear()
        s["cart"] = [p.id, p.id, p2.id]

    res = testing_app.post(
        "/checkout", data={"email": "a@a.com"}, follow_redirects=True
    )
    res_data = res.get_data()

    assert b"Floss" in res_data
    assert b"Toothbrush" in res_data
    assert b"3.00" in res_data
    assert b"2.99" in res_data
    assert b"sent to a@a.com" in res_data

    assert Order.select().count() == 1

    o = Order.get()
    assert o.email == "a@a.com"
    assert o.products == {
        "Floss": {"total": 3.0, "quantity": 2},
        "Toothbrush": {"total": 2.99, "quantity": 1},
    }

Placing an order is done by a POST request to /checkout with products in our Cart and an email address sent via the form. We begin by creating our products again and adding them to our Cart. We also check the Order table to ensure it’s empty.

After our POST request containing our email address, we check again for the same product information, as well as the email confirmation message.

We finish by ensuring the Order has been created, and check it holds the correct data.

With that, our site is fully tested! We have the ability to quickly and easily check that the core behaviours of our site and admin remain intact as we add or remove features.

To finish off our project, we will have a brief look at how we can run code which makes use of our site’s data but outside of the context of a web request, using a library called Celery.

Chapter 7

Background Tasks with Celery

Sometimes we may have tasks which do not necessarily need to run before we provide feedback to the user. For example, a lot of online shops will not make customers sit and wait while their system sends the confirmation email. This allows developers to do potentially-slow data processing tasks without leaving the user staring at a “loading…” message. One way we achieve this with a Flask website is to make use of a python library named Celery.

Celery allows us to take advantage of separate “workers” running tasks independently of the web application but still using its data. These workers run in separate processes, meaning heavy processing tasks will not affect any web requests.

Heavy Processing

Let’s emulate a heavy processing task by adding a confirmation email to our Checkout endpoint. Create a folder in the root of your project called tasks and inside make a python file called send_email.py:

import time

def send_confirmation_email(email: str):
    time.sleep(5)  # emulate processing
    print(f"sending email to {email}")

This function sleeps for 5 seconds to emulate a complex database query, then prints a message about emailing the provided email. Sending an email is complex, and therefore out of scope for the purposes of this book, so we’ll make do with this print statement.

Now open up web/views/site/site.py and add a call to this function in your complete view:

from tasks.send_email import send_confirmation_email
...

def complete():
    ...
    send_confirmation_email(order.email)

    return render_template("complete.html", order=order)

Open up your shop’s website and add some items to your Cart, then Checkout. You will notice your site hangs for a while before showing you the confirmation page. This is likely to be frustrating to users, who may leave the site, or refresh the page thinking something is wrong and accidentally place two orders.

Let’s now add Celery to our project so that we can remedy this potential issue.

Adding Celery

In order to run background tasks Celery itself needs a way to manage a queue. For this example we will be using Redis since it is very easy to set up and use.

In a new terminal window, source your virtualenv with source env/bin/activate then run poetry add celery celery[redis] to install Celery and its dependencies.

While this is installing we can bring in Redis. Specific instructions for doing this depend on the Operating System you are using, so search the web for “Redis install my_operating_system” and find the official docs. On Linux, Redis is most likely available in your distro’s repositories. MacOS can use homebrew, and Windows users can use the WSL terminal they are using to follow along with this book.

With Redis installed, running the server is as easy as executing the command redis-server. You can leave this running while we set up Celery in our code.

Stop your web server if you haven’t already and open up web_server.py in your editor. Change the beginning of the file to look like so:

from celery import Celery
from flask import Flask

app = Flask(__name__)
app.secret_key = "very secret"

celery = Celery("My Shop", broker="redis://localhost:6379/0")
from web.views import admin, site
...

Now open your tasks/send_email.py file and we can turn the send_confirmation_email function into a Celery task:

import time

from web_server import celery


@celery.task
def send_confirmation_email(email: str):
    time.sleep(5)
    print(f"sending email to {email}")

Turning a synchronous function into an async one is as easy as importing celery and decorating the function with @celery.task. Now our Celery worker can see this task and will be able to process it independently of our web server.

To run a Celery worker we’ll need an entry point to our application. Create a file called run.py in the root of the project:

from web_server import celery

And now we are set up to run a Celery worker! Open up a new terminal and source your virtualenv. Now run the following command:

celery -A run worker -l info -E

This points Celery to our new run.py file, tells it to run a worker, sets the loglevel to info so that we can see what it’s doing, and finally uses -E to tell the worker to log events about each job.

One last thing now - we need to tell our complete view to call this function asynchronously using the Celery worker. We do this by calling the delay function which has now been added by the @celery.task decorator. Open up web/views/site/site.py and change the line in complete:

    send_confirmation_email.delay(order.email)

Now our slow function will be executed by the Celery worker, meaning the customer will see the Complete page without waiting for their confirmation email to be sent.

Start up your web server with python3 web_server.py and buy some products on your website. You should notice the Complete page does not hang any more. Open the terminal running your celery command and you should see a message similar to this:

[2022-05-02 07:31:27,561: WARNING/ForkPoolWorker-8] sending email to test@example.com

Congratulations, you have successfully run a background task from your website!

Adjusting the Tests

Since we should not have to run Redis in order to test our application, we’ll need to stop our server from trying to connect during tests. We can detect if we are testing using this line:

if "pytest" in sys.modules:

Open up factory.py and change the creation of the celery instance to the following:

import sys
...

if "pytest" in sys.modules:
    celery = Celery("My Shop", broker="memory://")
else:
    celery = Celery("My Shop", broker="redis://localhost:6379/0")

This will tell Celery to use an in-memory queue during unit tests. This does not work when actually trying to use Celery in practise, which is why we had to install Redis.

With Celery taken care of, our tests should now run successfully. However, we can actually now check that the confirmation email task is called when an order completes.

Mocking Celery’s Delay

In a new terminal window, source your virtualenv and run poetry add --dev pytest-mock. This will allow us to use Mocking to test if a function has been called during the execution of another.

First, we’ll create a mock of the delay function in our tests/helpers.py file:

...
__all__ = [
    ...
    "mock_send_confirmation_email_delay",
    "with_test_db",
]
...

def mock_send_confirmation_email_delay(mocker):
    from tasks.send_email import send_confirmation_email

    mocker.patch.object(send_confirmation_email, "delay", autospec=True)

We use the mocker fixture (which will be passed to this function) to patch the delay function with a mock. This will then allow us to inspect whether the function was called during the test.

Now open up tests/site/test_site.py and change your test_checkout_post function to the following:

@with_test_db((Order, Product))
def test_checkout_post(mocker):
    mock_send_confirmation_email_delay(mocker)
    p = create_test_product("Floss", "1.50")
    p2 = create_test_product("Toothbrush", "2.99")

    assert Order.select().count() == 0

    with testing_app.session_transaction() as s:
        s.clear()
        s["cart"] = [p.id, p.id, p2.id]

    res = testing_app.post(
        "/checkout", data={"email": "a@a.com"}, follow_redirects=True
    )
    res_data = res.get_data()

    assert b"Floss" in res_data
    assert b"Toothbrush" in res_data
    assert b"3.00" in res_data
    assert b"2.99" in res_data
    assert b"sent to a@a.com" in res_data

    assert Order.select().count() == 1

    o = Order.get()
    assert o.email == "a@a.com"
    assert o.products == {
        "Floss": {"total": 3.0, "quantity": 2},
        "Toothbrush": {"total": 2.99, "quantity": 1},
    }

    from tasks.send_email import send_confirmation_email

    send_confirmation_email.delay.assert_called_once()
    assert send_confirmation_email.delay.call_args[0][0] == "a@a.com"

The test now takes an argument called mocker, which will be injected by the pytest-mock library we installed. This is passed to the mock_send_confirmation_email_delay helper function to carry out the mocking.

Once we have asserted that the contents of the page are as expected, we have two new asserts at the end of our test. We first use assert_called_once to check that our send_confirmation_email function was sent to the Celery worker. Then we can use its call_args attribute to check that the argument passed to this function was the email address we entered. The [0][0] is used to get the first call to this function, then the first argument of that call. Since there is only one call, and only one argument, this is all we need to check.

Fantastic, we’ve now got a way of running background tasks outside of a web request to avoid slowing down the user’s page load, and we have an automated test to ensure they are called with the correct arguments!

With that, our project is now complete! We have ourselves a website split into two sections - an admin protected by a login, and a public-facing shop. We can process and store customer orders in a database, and process the data within outside of a web request context using Celery. Our whole application has a suite of automated tests which ensure that the critical functionality is behaving as it should.

Appendix 1 - Peewee

(Back to Chapter 2)

In these appendices I will be covering some more complex parts of the mentioned libraries which I couldn’t fit into our simple example, but still deem important to know. These can be read either before following the book’s examples, or after finishing the project.

As always, the official Peewee docs are going to be more accurate than what is written here.

These examples are correct for Peewee version 3.14.8.

Basics

Selecting

Call the .select method on your model class:

Book.select()

This is the equivalent of SELECT * FROM book;.

This function returns an iterable of instances of your Model class (Book in this example). To make use of them, you will then need to consume the iterable with either a for loop or list comprehension:

books = Book.select()

for book in books:
    # `book` is now an instance of the `Book` model class. 
    print(book.title)
    
# OR:

books = Book.select()
all_books = [b for b in books]

Selecting specific fields

All examples in this book’s project select all fields from our tables, as they are small. This can be a performance hit as tables grow larger.

To select only specific columns, pass the model name, a dot, and the attribute as arguments to your .select call. For example:

Book.select(Book.author, Book.title)

This query selects only the author and title columns.

Conditions

To filter what you are selecting, pass regular python conditional statements to a .where call:

Book.select().where(Book.title == "Flask by Example")

Book.select().where(Book.price >= 5.00)

To pass multiple conditions, wrap each in brackets and separate them with an ampersand & for AND, or a pipe | for OR.

For example:

Book.select().where( (Book.price >= 5.00) & (Book.published == True) )

Is the equivalent of:

SELECT * FROM book WHERE book.price >= 5.00 AND book.published = true;

For an OR query:

Book.select().where( (Book.title == "Flask By Example") | (Book.title == "Tkinter By Exmaple") )

Is the equivalent of:

SELECT * FROM book WHERE book.title = 'Flask By Example' OR book.title = 'Tkinter By Example';

You can combine both AND and OR using brackets, just like in regular SQL:

Book.select().where(
    (
        (Book.date_published > "2020-01-01")
    ) | (
        (Book.date_published < "2020-01-01") & (Book.revenue >= 100) 
    )
)

The above selects all books which are either newer than Jan 1st 2020, or both older than Jan 1st 2020 and have a revenue above 100. In SQL this would be:

SELECT * FROM book WHERE (book.date_published > '2020-01-01') OR (book.date_published < '2020-01-01' AND book.revenue >= 100);

Getting a Single Instance

If there should only be a single instance of a row matching your condition, you can use the .get method to grab it directly. Arguments passed to this method are the same conditionals as would be passed to a .where call.

For example:

flask_book = Book.get(Book.name == "Flask By Example")

Here your flask_book variable will be an instance of your Book model.

This will throw a ModelNotFound exception if the criteria do not match and no record could be found.

If you aren’t certain that there is a matching row in your database, you can instead use .get_or_none, which will return a Model instance if a match is found, or None if not.

maybe_book = Book.get_or_none(Book.title == "Flask By Example")

if not maybe_book:
    print("No book with that name")

Advanced

Aliases

You can append .alias to an attribute inside a call to .select to alias the field. This is useful in situations where multiple tables have a column with the same name. For example:

Book.select(Book.author, Book.title.alias("book_title"))

This selects a Book’s author, and aliases its title to book_title.

Joins

Join two tables by appending a call to .join after a .select. The call to join takes two arguments - the Model class to join with, and a condition on which to perform the join, named on. To select fields from a joined table, simply pass the Model’s name and attribute to select as normal.

For example:

Book.select(
    Book.title, Book.author, Author.date_of_birth
).join(
    Author, on=(Author.name == Book.author)
)

This example performs the query:

SELECT book.title, book.author, author.date_of_birth
FROM book
JOIN author ON author.name = book.author;

Join types

Join types are available as constants which can be imported from Peewee like so:

from peewee import JOIN

These are then passed as the second argument to your .join call:

from peewee import JOIN

Book.select(
    Book.title, Book.author, Author.date_of_birth
).join(
    Author, JOIN.INNER, on=(Author.name == Book.author)
)

See the Peewee documentation for an up-to-date list of available joins.

Functions

Functions are found in a module named fn. If you know the name of the function you would like to use in regular SQL, it is probably called the same thing in the Peewee module.

For example, to use a MAX function:

from peewee import fn

most_copies_sold = Book.select(fn.MAX(Book.copies_sold))

You can also alias these function calls:

from peewee import fn

books = Book.select(fn.SUM(Book.revenue).alias("total_revenue"), Book.title).group_by(Book.title)

Group By, Order By, Limit, Offset

These functionalities are all offered by functions of the same name. For example:

books = Book.select(Book.title, fn.SUM(Book.revenue)).group_by(Book.title).order_by(Book.title.asc()).limit(5).offset(5)

As above, ordering is done by specifying the Model class and attibute, then appending .asc() or .desc() for ascending or descending. To alter where NULLs appear, pass nulls="LAST" or nulls="FIRST" to .asc or .desc.

Appendix 2 - Pytest

(Back to Chapter 3)

In these appendices I will be covering some more complex parts of the mentioned libraries which I couldn’t fit into our simple example, but still deem important to know. These can be read either before following the book’s examples, or after finishing the project.

As always, the official Pytest docs are going to be more accurate than what is written here.

These examples are correct for Pytest version 7.0.0.

Basics

Tests by default live in a folder called tests. Within that folder, any python files which begin with test_ will be run by Pytest. Similarly, any functions inside those files which begin with test_ will be detected and executed as tests.

Running tests is achieved by executing the command pytest in the same directory as your tests folder. If you specify a file as the second argument, e.g. pytest tests/models/test_models.py, only that file will be run.

Some of the flags you may want to pass to the command are:

You can pass multiple flags in one go, for example pytest -sxv.

Advanced

Fixtures

Fixtures are a rather deep topic, which you can find on the Pytest docs in more detail than I could ever write. However, for the purpose of understanding this book, you can think of a fixture as an object injected into a test which requires it.

The two fixtures I find myself using the most are mocker for Mocking and monkeypatch for Patching.

Mocking

Mocking is replacing a piece of code with a dummy object which keeps track of how many times it was called and with what arguments. This dummy function will not execute the original code.

Mocking in Pytest is made very easy with the pytest-mock library. With this installed simply add a parameter named mocker to your test functions and a Mock object will be injected.

Mocking a Function

To mock a function, pass a string representing the import path to the function to mocker.patch, like so:

mocker.patch("helpers.emails.send_email")

Now in your test, import this function and use a method to check it was / wasn’t called:

def test_send_email_called(mocker):
    mocker.patch("helpers.emails.send_email")
    
    function_which_should_send_email()
    
    from helpers.emails import send_email
    
    send_email.assert_called_once()

Mocking an Object

To mock a method of an object, rather than a standalone function, use mocker.patch.object. Pass the object as the first argument, and a string of the method to mock as the second.

mocker.patch.object(Book, "get_total_revenue")

You can now check the method was called in much the same way as a function:

def test_get_total_revenue(mocker):
    from models.book import Book
    
    mocker.patch.object(Book, "get_total_revenue")
    
    get_all_book_revenues()
        
    Book.get_total_revenue.assert_called_once()

Spying

Spying is like Mocking, except the original functionality of the mocked function will execute. This is useful during integration tests, where subsequent function calls rely on the results of the one you wish to mock.

Spying is also handled by the pytest-mock module, and is available in the mocker fixture.

def test_get_total_revenue(mocker):
    from models.book import Book
    
    book_spy = mocker.spy(Book, "get_total_revenue")
    
    get_all_book_revenues()
        
    book_spy.assert_called_once()

Patching

Patching is changing the behaviour of a function or method temporarily for the duration of a single test. This is useful for integration tests which rely on functions gathering data which is not static, such as the current date or a request to an external API.

Patching is achieved using the monkeypatch fixture built in to Pytest.

Patching a function

Patching a function is a bit more complicated than mocking, but the syntax is generally similar. The monkeypatch object uses the setattr method to patch a function, which requires the function-to-patch as the first argument, and the replacement function as the second.

Instead of passing the function directly, however, you need to pass its __code__ attribute:

def patched_send_email():
    print("sending")
    
monkeypatch.setattr("helpers.emails.send_email.__code__", patched_send_email.__code__)

Patching an Object

Patching an object does not require the use of __code__, and is also achieved via setattr. The first argument is the object, the second is a string of the method to patch, and the third is the replacement function:

from models.book import Book

def patched_get_total_rev():
    return 50.00

monkeypatch.setattr(Book, "get_total_revenue", patched_get_total_rev)

Useful Links