Learn Flask By Example
With Flask, Peewee and Pytest
Hello, welcome to this book teaching the basics of Flask web development. This book aims to help anyone who is looking to get started with Flask but doesn't know where or how to begin.
The Accompanying Github repo for this project is available here. There you will find an EPUB version if you would prefer to read on an e-reader.
This book is licensed under Creative Commons Attribution 4.0. Please share it with anyone who you think will find it useful!
Chapter 0
Introduction
Welcome to this e-book teaching how to create web applications using primarily Flask, Peewee, and Pytest.
Throughout this book we will be setting up a digital shop site which will have a distinct public front-end and a gated back-end. Your shop will have a persistent database, a comprehensive suite of automated tests, and the ability to process data separately from the main web server.
Each chapter will introduce the relevant library in a hands-on approach, rather than dumping a heap of documentation upon the reader. If you’re someone who would prefer to broaden your knowledge of a library before getting stuck-in with it, I have included appendices at the end which go over the syntax of each library in greater detail. As I am not the author of any of these libraries, there’s always a chance that the information becomes outdated, so pay attention to the version numbers at the top of each appendix.
Coverage and Assumptions
We will be covering python and the titular libraries specifically. The front-end of the site will be written using normal Javascript and Flask’s Jinja template - no fancy frameworks or webpack here.
As I use Linux on my computers, the commands will be written assuming a unix-like operating system with a bash
shell. If you are on Windows, I would recommend using the Windows Subsystem for Linux (WSL) to follow along. Setting this up is outside the scope of this book, so search online for something like “WSL Install Windows 10” to get started. Familiarity with using a terminal is required, as we will need to run various elements of the web server via the command line.
I will also not be teaching the Python language itself - you will need to be familiar with the language to follow along properly. For the best experience, a reader should have a grasp of functions, classes, and importing modules. Decorators are also used a lot in Flask, so a brief understanding of those is also recommended.
With that out of the way, let’s begin with some basics.
Installing Pyenv and Poetry
While there are many ways to install Python itself, and manage its dependencies, we will be using a tool called pyenv
to install a specific version of the Python language, and a tool called poetry
(along with in-built virtualenvs) to install dependencies.
If you already know what you are doing, already have a python installation, or you prefer to use other tools to manage installations, feel free to skip the rest of this section.
For this book, we will be using Python 3.10.
Installing Python via Pyenv
The pyenv
tool is available over on Github, and comes with a very easy install script.
Please note, I am not the author of this tool, and so have no control over the contents nor the hosting. This means the links below may disappear at some point.
Before installation, follow the instructions for getting the necessary dependencies for your operating system from this URL:
https://github.com/pyenv/pyenv/wiki#suggested-build-environment
Once you have the dependencies taken care of, installing pyenv is as simple as cloning the repo and running a script.
git clone https://github.com/pyenv/pyenv-installer
cd pyenv-installer
bin/pyenv-installer
The installer will use git
to grab the necessary repos for you, and set up a .pyenv
folder in your home directory to contain both pyenv itself and any installed python versions.
With pyenv
taken care of, it’s time to install python 3.10:
.pyenv/bin/pyenv install 3.10.1
This command will take some time to run. If you missed any of the pre-requisite dependencies you may see some errors here. Refer back to the wiki to sort those out.
Once your install is successful, we’re done with pyenv!
Installing dependencies with poetry
Now that we have the correct version of Python installed, it’s time to grab the titular libraries. We will be using a tool called poetry
to achieve this, which is easily installed using Python’s built-in pip
module.
Setting up a project directory
We’ll need to make a folder to store our project. This can be anywhere on your system, and can be moved at any time, so there’s no need to worry too much.
When you’ve chosen where to store your project, create a new directory called flask-peewee-pytest
and cd
into it.
Creating a Virtualenv
The first step is to create a virtual environment using the pyenv-provided version of Python.
~/.pyenv/versions/3.10.1/bin/python3 -m venv env
source env/bin/activate
Now that we have the virtual environment, we can install poetry
into it.
pip install --upgrade pip
pip install poetry
Before we can begin installing with poetry, we need a pyproject.toml
file. Don’t worry if you don’t know what this is, because poetry
can generate one for us!
poetry init
Simply follow the prompts, choosing n
to skip setting dependencies interactively - we will do this via the command line shortly.
Once this command finishes you will have a nice new pyproject.toml
file.
Installing dependencies
To add dependencies via poetry, simply use poetry add
. To specify development-only dependencies, it’s poetry add --dev
.
Let’s install the titular dependencies to our virtual environment. Since our unit tests will not be running in production, we add pytest
as a development dependency:
poetry add flask peewee
poetry add --dev pytest
That’s it, we’ve now got our three main dependencies installed, and are ready to begin writing our web-app. Get your editor / IDE of choice ready for the next chapter!
Chapter 1
Flask Beginnings
With our development environment taken care of, let’s dive right in to Flask. In this chapter we’ll go over the basics of getting a Flask server up-and-running, then look into how to structure the web side of our project in a maintainable way.
Hello Flask
In keeping with programming tradition, let’s start with a “Hello Flask” implementation.
In your project folder, create a file called web_server.py
and add the following:
from flask import Flask
= Flask(__name__)
app = "very secret"
app.secret_key
@app.route("/")
def hello_flask():
return "Hello, Flask!"
if __name__ == "__main__":
=True) app.run(debug
Save this file and run the webserver with python3 web_server.py
. You should see a message telling you that the server is available on http://127.0.0.1:5000/
. Visit this URL in a browser and you’ll be presented with “Hello, Flask!”.
Note: Mac users may find that port 5000 is already in use. To get around this, add the
port=8080
argument to yourapp.run
call, and visithttp://localhost:8080/
.
Not too much code went into making that, so let’s go over it.
First we import Flask
itself, and create an instance named “app”. Passing the __name__
argument is convention.
Once we have our Flask instance, we need to set a secret key. This is used to encrypt session data, and so in a production environment should be a lot more secure than this simple string. We will come back to this later.
With our sessions secure, we now need to add some routes to our server. This is achieved by decorating a function with the route
decorator, passing the URL as the argument, so “/” in this case. Inside this function we must return something which can be recognised by Flask as an HTTP response (typically a string, a dictionary, or a rendered template, which we will see shortly).
That’s it, we now have a server with a URL set up. The if __name__ == "__main__"
block allows us to run this file as a script to launch the server, while also allowing other files to import from it without also causing the server to run. We use the run
function to start our server, and pass debug=True
to enable the debugging interface.
To have a look at the debugging screen, add the line 1 / 0
above the return
statement in your hello_flask
method, then reload the page. You should see a ZeroDivisionError
screen with a nice stack trace showing the problematic line.
Rendering HTML
We know how to return plain text, but real websites use HTML, so how do we do that? Flask comes with a popular templating library called Jinja
supported by default, as well as a helper function to both compile the template contents and return it as an HTTP response. Let’s turn our homepage into something a bit more realistic.
Create a templates
folder in the root of your project, and then create an index.html
file inside.
<h1>Welcome to my shop!</h1>
<hr>
<table>
<tr>
<td>Toothpaste</td>
<td>£2</td>
</tr>
<tr>
<td>Toothbrush</td>
<td>£1.50</td>
</tr>
<tr>
<td>Floss</td>
<td>£0.99</td>
</tr>
</table>
(Note that this is not “fully-formed” HTML, but a browser should render it anyway. We will deal with this later).
Template created! Now we need to render it. Hop back over to your web_server.py
file and edit your hello_flask
function as follows:
@app.route("/")
def index():
return render_template("index.html")
We’ll start by renaming the function to index
, as it better describes the page we’ll be rendering.
render_template
is a helper function in Flask which will perform Jinja rendering and return your HTML as an HTTP response. We will need to import this though, so go up to the top of your file and import the library from flask
import Flask, render_template form flask
Run your web server once more and reload the index page. You should see a beautiful table of prices.
Obviously it would be rather daft to have to edit HTML directly when your shop’s stock changes - this information should be stored in a database and pulled out on the server side. We’ll get to the database itself next chapter, but for now we can pretend, and learn about Jinja rendering along the way.
Using variables in Jinja
In your web_server.py
file, change the index
function and add a dictionary of products to prices like so:
@app.route("/")
def hello_flask():
= {"toothpaste": 2.00, "toothbrush": 1.50, "floss": 0.99}
products
return render_template("index.html", products=products)
When using render_template
, any arguments passed after the filename of the HTML file will be passed to the template to be used by Jinja. This is known as the “template context”.
Change the content of your templates/index.html
file to the following:
<h1>Welcome to my shop!</h1>
<hr>
<table>
{% for product_name, price in products.items() %}<tr>
<td>{{product_name}}</td>
<td>{{ "£%.2f"|format(price) }}</td>
</tr>
{% endfor %}</table>
To use our variables in Jinja, we need to utilise two different sets of special syntax: the double-brace {{
}}
and the percent-brace {%
%}
. Percent-brace tags are used to insert logic, such as for
loops and if
statements. The double-brace is used to insert python variables as text inside the template.
In the above example, we use the percent-brace syntax to initiate a for
loop, iterating over our products
variable and unpacking the keys and values into two python variables - product_name
and price
. We then place the product names as text into our HTML by using the double-brace syntax. We do the same for the price, but use a pipe into a format string to get the correct currency display.
The pipe character is Jinja’s own syntax (rather than python’s) which passes the variable into a function exposed by Jinja to the templates. You can think of these as reversing the order of a normal function from my_func(variable)
to variable|my_func
.
We now have a single page listing all of our products, so let’s add individual pages for each one. Again, it’s not manageable to create a specific route for each product, so we’ll need to make use of the database’s information in Flask.
Using variables in Flask routes
Underneath your index
function, add the following:
@app.route("/<string:product_name>")
def view_product(product_name):
= {"toothpaste": 2.00, "toothbrush": 1.50, "floss": 0.99}
products
= None
price if product_name in products:
= products[product_name]
price else:
404)
abort(
return render_template(
"products/view_product.html", price=price, product_name=product_name
)
We now have a route which can take a variable from the URL, as you can see from the route
decorator. This product_name
variable is then passed into our view_product
function automatically. We have annotated this variable as a string
, so that we can guarantee the type of the python variable passed.
The rest of this function searches our products
dictionary for the provided string, and if found sets the value of price
. If the product is not in the dictionary, we use a flask helper to return an HTTP 404 response.
Adjust your import statement at the top of the file to include this function:
from flask import Flask, render_template, abort
To make our server run we’ll need to make that new template. Go ahead and create a folder inside your templates
folder called products
, then create an HTML file inside called view_product.html
containing the following:
<h1>Welcome to my shop</h1>
<hr>
<h2>{{product_name}}</h2>
<p>This product costs £{{"%.2f"|format(price)}}</p>
<p>Out of Stock</p>
After you save this file, your server should be able to run again, so start it up with python3 web_server.py
and head on over to localhost:5000
. You should see your index page.
Add the name of a product to the end of your URL, for example /toothpaste
, and you should see your new template render. You can also try using a product which doesn’t exist, such as /water
, to see the 404 error page.
Now we have two templates which both share the same header at the top of the page. It would be a hassle to keep copy-pasting that into each new file, and we haven’t got the proper HTML declarations in our templates either. We’ll sort that out now by looking at template inheritance.
Template Inheritance
Jinja allows templates to inherit from others using two main tags:
{% extends %}
{% block %}
Let’s see these in action by creating a base template. Add a file called base.html
in your templates
folder:
<!DOCTYPE html>
<html>
<head>
<title>My Shop</title>
</head>
<body>
<h1>Welcome to my shop!</h1>
<hr>
{% block content %}
{% endblock %}</body>
</html>
We use a block
tag to assign a name and location to a piece of inheritable content. Since we have created a block
named content
inside our body
tags, any template inheriting our base can include a {% block content %}
tag to insert its contents into the page body
.
Let’s alter our index.html
and view_product.html
templates to inherit from the base.
index.html
:
{% extends "base.html" %}
{% block content %}<table>
{% for product_name, price in products.items() %}<tr>
<td><a href="/{{product_name}}">{{product_name}}</a></td>
<td>{{ "£%.2f"|format(price) }}</td>
</tr>
{% endfor %}</table>
{% endblock %}
view_product.html
:
{% extends "base.html" %}
{% block content %}<h2>{{product_name}}</h2>
<p>This product costs £{{"%.2f"|format(price)}}</p>
<p>Out of Stock</p>
{% endblock %}
Save and re-run your webserver and visit both the homepage and a product page (you may notice we added a
tags so that you can visit a product page by clicking on the product’s name). They should both look just like before, but now have fully-formed HTML and are sharing the heading. Use the “Inspect Element” or “View Source” capabilities of your web browser to have a look at the generated HTML and see this for yourself.
The extends
tag takes the name of the base template in quotes, then exposes all blocks defined in that template to the current file (although you do not need to add content to all blocks defined in the base). The content which is wrapped in {% block content %}
in our child templates is then inserted into the base template where its own {% block content%}
tags lie.
This covers the basics of flask. We now have a working web server, which we can add routes to using the @app.route
decorator. We know how to pass variables to these routes using /<string:variable>
, and how to get python variables into Jinja templates.
To continue with our website, we will now look at replacing those hard-coded products
dictionaries with data from an actual database, as we dive into the Peewee
libarary.
Chapter 2
Using Peewee with a Database
Peewee is a library often referred to as an Object Relational Mapper (ORM). It abstracts connections to a database into regular Python objects to make querying integrate seamlessly with the rest of your backend logic.
If you’d like a detailed look into Peewee before diving in, check out Appendix 1.
Making a database
We’re going to be using SQLite in this book, because it doesn’t require any installation or set-up. As Peewee is an abstraction layer, code from this book should work fine with a more dedicated solution, such as MySQL or Postgres.
Let’s begin by creating a folder in the root directory called models
, and inside add two files - __init__.py
and product.py
.
__init__.py
:
from peewee import Model
from playhouse.sqlite_ext import SqliteExtDatabase
class BaseModel(Model):
class Meta:
= SqliteExtDatabase("database.db") database
product.py
:
from models import BaseModel
from playhouse.sqlite_ext import *
class Product(BaseModel):
id = AutoField()
= TextField()
name = TextField()
price
class Meta:
= "product" table_name
In our init file we import Peewee’s own Model
class and its class for connecting to a Sqlite database. Using these we can define a BaseModel
class which will act as a parent class for each of our models. Inside this class, we include another named Meta
which is used by Peewee to point the models to their database. As we are using Sqlite for this book, we can simply point this to a file named database.db
.
Our Product
model imports our new BaseModel
class and inherits from it. The column datatypes from Peewee’s Sqlite extension are also brought in.
Models are linked to their database columns by providing class attribues which are instances of Peewee’s column types. Here we have an AutoField
- used to denote a primary key - and two TextField
s to store each product’s name and price.
Another Meta
subclass is used on our Product
model to indicate its table name in the Sqlite database.
Speaking of which, let’s get this prepared now.
Database Migrations
Create a new folder in the root of your project called migrations
. Create a file in here called V1__create_product_table.sql
.
create table product (id integer primary key autoincrement, name text, price text);
insert into product values (1, "Toothpaste", "2.0"), (2, "Toothbrush", "1.50"), (3, "Floss", "0.99");
Great, now we have our first database migration! You may be wondering about that file name, so I shall explain. This naming schema is used by a product named FlywayDB, an open source generic database migration tool. By naming our migrations as such, we have the option to use this tool to handle our database if we want. Since Flyway uses plain SQL files to handle its migrations, it is possible to run our migrations manually if you would prefer not to set this tool up.
Setting up Flyway
Search the web for FlywayDB’s download page and grab the relevant version for your operating system. Make sure to extract the folder inside your project’s root directory. Rename the extracted folder, which should be flyway-x.y.z
to just flyway
for simplicity.
Inside the flyway
folder you should have another named conf
, containing a file flyway.conf
. Open up this file and remove the default contents, then enter the following:
flyway.url=jdbc:sqlite:database.db
flyway.user=
flyway.password=
flyway.locations=migrations
Just one more thing before we can start using flyway
- the first migration it finds is expected to be the “baseline” of the table, which means its initial state before any flyway migrations. If we run flyway now, it will assume this is V1 and not create our table, so we need to create an empry file inside the migrations
folder called V0__baseline.sql
.
We are now ready! Make sure you are in the project root and run the following command:
flyway/flyway migrate
If successful, you should see some confirmation output like the following:
Flyway is up to date
Flyway Community Edition 8.5.2 by Redgate
See what's new here: https://flywaydb.org/documentation/learnmore/releaseNotes#8.5.2
Database: jdbc:sqlite:database.db (SQLite 3.34)
Successfully validated 2 migrations (execution time 00:00.006s)
Creating Schema History table "main"."flyway_schema_history" ...
Current version of schema "main": << Empty Schema >>
Migrating schema "main" to version "0 - baseline"
Migrating schema "main" to version "1 - create product table"
Successfully applied 2 migrations to schema "main", now at version v1 (execution time 00:00.014s)
Congratulations, you now have a database!
Skipping Flyway and Running Migrations Manually
If you’d prefer to skip setting up Flyway, you can run the sqlite migrations from the command line directly.
First execute the sqlite3 database.db
command to drop into a sqlite shell for your database. Then run each file like so:
.read migrations/Vx__name_of_file.sql
If you receive an error about the sqlite3
command not being found, you may need to install it on your computer first.
Using our Database
Now that our database exists, and has contents, we can begin using our Peewee models in our Flask endpoints.
Open up web_server.py
and change your two routes to the following:
@app.route("/")
def index():
= Product.select()
products return render_template("index.html", products=products)
@app.route("/<string:product_name>")
def view_product(product_name):
= Product.get_or_none(Product.name == product_name)
product
if not product:
return abort(404)
return render_template(
"products/view_product.html", price=product.price, product_name=product_name
)
We’ll also need to import our Product
model at the top:
from models.product import Product
If you are familiar with SQL syntax, Peewee’s methods should feel familiar. We use Product.select()
to grab all product records from our database - the equivalent of SELECT * FROM product
. This returns an iterable
of objects representing our database rows, with each class attribute we defined in models/product.py
containing the data from our row.
In our view_product
endpoint we need to fetch a specific row, so we use Peewee’s get_or_none
function. This function needs to be supplied with any conditions on which to select the correct product. Since we have the product’s name as our product_name
variable, we pass this as a condition to Peewee with Product.name == product_name
. This tells Peewee’s query to find a row with a name
record matching what’s in our product_name
variable. This is the equivalent of SELECT * from product WHERE name = product_name
(and “product_name” is actually the contents of the variable).
Since the get_or_none
method returns None
if no matching record is found, we can use this to return our abort(404)
if a non-existing product name is passed.
The arguments to render_template
update to pass the product
variable’s price
attribute to the price
context.
Our templates won’t quite render yet, since they are expecting dict
s for the product information. Let’s start with index.html
, change the contents of your table
to the following:
<table>
{% for product in products %}<tr>
<td><a href="/{{product.name}}">{{product.name}}</a></td>
<td>{{ "£%.2f"|format(product.price|float) }}</td>
</tr>
{% endfor %}</table>
We are now iterating over python objects representing our database rows, rather than dict
s, so we no longer need .items()
, and can use the dot syntax to access attributes.
Since price
is a String in our database, it must be passed to the float
filter before it can be formatted via the format
filter.
And now in view_product.html
:
<h2>{{product_name}}</h2>
<p>This product costs £{{"%.2f"|format(price|float)}}</p>
<p>Out of Stock</p>
Just the price
to be converted to a float
once again in this file.
With all of those changes in place, you can now run your webserver with python3 web_server.py
and visit your shop once again. It should work exactly as before.
That’s all it takes to get a database into our site! Before we start adding functionality, we should first go over way to ensure our site will always work as intended. This leads us nicely to our last titular module - Pytest.
Chapter 3
Testing with Pytest
Automated tests allow us to ensure that our site’s expected functionality will stay the same as we continue adding features. This prevents us from having to manually use each aspect of our site every time we make changes.
If you’d like a detailed look into Pytest before diving in, check out Appendix 2.
To get started with Pytest, create a folder in the root directory of your project named tests
and inside create a file named helpers.py
.
from models.product import Product
= ["create_test_product"]
__all__
def create_test_product(name: str, price: str):
= Product()
p = name
p.name = price
p.price
p.save()
return p
All helper functions for our tests will be stored in this file. This avoids having to either re-define them in multiple tests, or elongate each test function by creating models manually.
We start off with just one function - create_test_product
- which will create a record in our Product database table with the supplied name and price. Creating a record in Peewee can be done by creating an instance of a model class. You can then assign values to each of the defined class attributes representing the data to be stored for each column. Once the necessary fields are populated, the save
method will write this new row to the database.
We also define an __all__
variable, which is a special python variable to allow a cleaner use of import *
statements. When a file uses from tests.helpers import *
it will normally import all definitions contained in that file. This means it would import the Product
class in this file’s current state. However, when we define __all__
only the definitions inside this list will be imported. This means, in the file’s current state, any test using from tests.helpers import *
will only be importing our create_test_product
function, and not the Product
class.
With this database helper function in place, we now need the ability to switch our Flask server into test mode. Create another file in the tests
folder, this time named __init__.py
:
from web_server import app
app.config.update(
{"TESTING": True,
}
)
= app.test_client() testing_app
This file will create a new variable called testing_app
which is just our Flask server in testing mode. We can now use methods such as get
and post
to issue requests to our server inside tests.
Let’s now create our first test. We will load up the index page of the site and check each product is displayed.
Create a file named test_products.py
in your tests
folder:
from tests import testing_app
from tests.helpers import *
def test_index_displays_product_names():
"Toothbrush", "2.00")
create_test_product("Toothpaste", "1.00")
create_test_product(
= testing_app.get("/")
res
assert res.status_code == 200
= res.get_data().decode("utf-8")
res_data
assert "Toothbrush" in res_data
assert "Toothpaste" in res_data
We grab our testing_app
server and use a star-import to pull in our create_test_product
helper.
The test_index_displays_product_names
function begins by creating two products in our database, then uses testing_app
to issue a request to the index page of our site. We then use an assert
statement to ensure that the HTTP status code of the response was 200 Successful.
Once we know the request was successful, we then get the HTML returned by the endpoint using res.get_data()
and decode it into a UTF-8 string. We can then test that this string contains the names of our two test products.
We are now ready to run this test - but wait! Our database models are still pointing to our main database, which could be our production database. Obviously we need an ephemeral copy which can be populated and checked-against for each test, and then cleared away ready for the next. This both prevents data from ending up in our real database, and allows unit tests to run without needing to keep track of which previous tests may have added or removed records.
There are multiple ways to solve this problem, but the way I find easiest is to use a decorator above each test which passes in only the tables required for this particular test to run.
Head back to tests/helpers.py
and add in the following:
from functools import wraps
from playhouse.sqlite_ext import SqliteExtDatabase
...
= ["create_test_product", "with_test_db"]
__all__
def with_test_db(dbs: tuple):
def decorator(func):
@wraps(func)
def test_db_closure(*args, **kwargs):
= SqliteExtDatabase(":memory:")
test_db with test_db.bind_ctx(dbs):
test_db.create_tables(dbs)try:
*args, **kwargs)
func(finally:
test_db.drop_tables(dbs)
test_db.close()
return test_db_closure
return decorator
...
This function takes a tuple
of Peewee models, creates a new, in-memory Sqlite database and binds it to the provided models. It then runs the decorated function, drops all tables in the database, and closes the connection.
Head back to test_products.py
and wrap our test with the new decorator:
from models.product import Product
...
@with_test_db((Product,))
def test_index_displays_product_names():
...
Note the trailing comma inside our tuple. Without this, python will interpret our Product
model as a single independent argument, rather than as part of a one-item tuple.
Now we are finally ready to run our test. Open up a command line at the root of your project and run the pytest
command. Hopefully you should see a success message saying 1 passed
in green.
If you see an error about packages not being found, you may need to adjust the PYTHONPATH
environment variable. In Linux or MacOS running bash or zsh, this should be done like so:
export PYTHONPATH=.
pytest
Congratulations, you have now written your first test for this project! Let’s not get ahead of ourselves though, there’s still another page to test. Let’s add more tests to test_products
:
@with_test_db((Product,))
def test_view_product_shows_product_name_and_price():
"Toothbrush", "2.00")
create_test_product(
= testing_app.get("/Toothbrush")
res
assert res.status_code == 200
= res.get_data().decode("utf-8")
res_data
assert "Toothbrush" in res_data
assert "£2.00" in res_data
@with_test_db((Product,))
def test_view_product_shows_404_if_not_found():
"Toothbrush", "2.00")
create_test_product(
= testing_app.get("/Bananas")
res
assert res.status_code == 404
These new tests will both load the view_product
route. The first will load a valid product and check the name and price are displayed on the page. The second loads a non-existent product and asserts that the HTTP response is a 404.
Head back to your command line and run your tests again with pytest
. You should now see 3 passed
in green.
Great, now we can assure that our site’s basic functionality is in place just by running the pytest
command at any time. Let’s now progress the abilities of our site by adding a separate section for admininstrators, where they can enter details of new products without writing SQL migrations.
Chapter 4
Adding Admin Controls
Database migrations are typically used to control the structure of your database tables, rather than their content. Our initial migration added three products only to get us back to the same point we were at before, when our data was in dictionaries. If we want to continue adding products, we should build some way to do it within the application itself.
In order to achieve this, we will now create a separate section of our shop which will serve as an admin panel. We will gate this section behind a login system, as here users will be able to add / remove products from our shop, and change their prices.
We will start by creating two new pages on our site - /admin
for the admin section, and /admin/login
for an admin user to log in.
Following the current conventions of our code, you may think that these routes would go in the web_server.py
file, since that’s where our other pages are held. However, as our application grows, this file will become much too big. As well as this, our admin URLs will always want to begin with /admin/
to denote that they are separate from the main site, and will all want to be kept behind a login system. It would be rather easy to forget either of these things, and risk exposing admin capabilities to the public.
Luckily, Flask provides a solution to both of these matters - Blueprints. A Flask Blueprint is a way of grouping together sections of a website under a common name. These sections can all contain a common URL prefix, and can have guards implemented before a request completes. We can use these two features to ensure that all of our admin functionality lives at /admin/*
and is behind a login system.
Let’s begin by creating a folder in the root of our project called web
. This will serve as the store for all things related to the web interface for our shop. Inside this folder create two more - views
and blueprints
. Inside blueprints
create an __init__.py
file:
from flask import Blueprint
= Blueprint("site", __name__)
site_blueprint = Blueprint("admin", __name__, url_prefix="/admin") admin_blueprint
We import the Blueprints
class from flask
and create two instances, site_blueprint
and admin_blueprint
. The first argument will be the name of the blueprint, which you will see used later when we use Flask’s url_for
helper. The second argument is __name__
by convention. Our admin blueprint has a third argument, url_prefix
. This is used to ensure that all URLs for this blueprint begin with /admin/
automatically.
Let’s create some placeholder routes for our admin section. We’ll start with an index and a login page. Inside your web/views
folder, create two more - site
and admin
. This is where our URL routes will lie (Flask calls them “views”, but personally I think the term “routes” is clearer. I will stick with Flask’s terminology for our project, however).
Inside both site
and admin
create a file named __init__.py
. This is where our guards will live. The public site needs no guards, but the admin will need to make sure a user is logged in before displaying a page.
Inside your new admin
folder create a file named admin.py
containing the following:
from web.blueprints import admin_blueprint
@admin_blueprint.route("/")
def admin_index():
return "Welcome to the admin"
@admin_blueprint.route("/login", methods=["GET", "POST"])
def admin_login():
return "please log in"
Here are our two placeholder views. As you can see, blueprints have routes added to them using the same decorator syntax as we have in our web_server.py
file. We create a function named admin_index
which serves as the entry-point to our admin section. The route "/"
will combine with the url_prefix
we specified on the Blueprint so that this page rests on /admin
. Similarly, our admin_login
route will be /admin/login
.
Our login route has the methods
argument passed to its decorator. This allows this route to be accessed by both GET and POST HTTP methods. When this argument is not provided, Flask assumes the route is meant only for GET requests, and any POST requests will be responded to with a 405 Method Not Allowed.
Our admin views are now in place, but before we flesh them out, let’s also pull our site routes out of web_server.py
and into our new site
folder.
Create a file at web/views/site/site.py
and move over your views from web_server.py
. Import site_blueprint
at the top, and change the decorators from @app
to @site_blueprint
:
from flask import abort, render_template
from models.product import Product
from web.blueprints import site_blueprint
@site_blueprint.route("/")
def index():
= Product.select()
products return render_template("index.html", products=products)
@site_blueprint.route("/<string:product_name>")
def view_product(product_name):
= Product.get_or_none(Product.name == product_name)
product
if not product:
return abort(404)
return render_template(
"products/view_product.html", price=product.price, product_name=product_name
)
Great, now all of our views are in place. To avoid long chains of imports, let’s consolidate our modules. Open up web/views/site/__init__.py
and add this:
from . import site
Now do the same in web/views/admin/__init__.py
:
from . import admin
Finally, create web/views/__init__.py
and add this:
from . import admin, site
Now our views are ready to be imported in web_server.py
. Open that up and change it to the following:
from flask import Flask
= Flask(__name__)
app = "very secret"
app.secret_key
from web.views import admin, site
from web.blueprints import site_blueprint, admin_blueprint
app.register_blueprint(site_blueprint)
app.register_blueprint(admin_blueprint)
if __name__ == "__main__":
print(app.url_map)
=True) app.run(debug
Now, after creating our app
we pull in all of our views from both the admin
and site
sections. This causes our Blueprints to “fill up” with the required routes. Now we pull in said Blueprints and use app.register_blueprint
to tell our main Flask app that they exist.
That’s it, this file is now much cleaner, and our code has become modularised. We can now keep a nice separation between the public shop and the private admin section.
Speaking of which, let’s go add that guard to our admin
Route Guards in Flask
As well as route
, Flask Blueprints have access to other decorators which can modify the lifecycle of a web request. One such example is before_request
, which is used to perform logic before a web request is passed to our view. We can use this to check if a user is logged in when they try to access /admin
, and redirect them to our login page if not.
Before we change our code, open up a browser and go to localhost:5000/admin
. You should see our “Welcome to the admin” placeholder message. This means you have successfully navigated to our admin section without authenticating.
Now open up web/views/admin/__init__.py
and add the following:
from flask import request, session, redirect, url_for
from web.blueprints import admin_blueprint
from . import admin
@admin_blueprint.before_request
def admin_before_request():
if request.endpoint != "admin.admin_login":
if "logged_in" not in session:
return redirect(url_for("admin.admin_login"))
We pull in a couple of helpers from Flask, as well as our admin_blueprint
, then define a function under @admin_blueprint.before_request
. This function checks Flask’s request
object to see if the endpoint
attribute is anything other than admin.admin_login
. If it is, and there is not a flag called "logged_in"
in Flask’s session
, we force a redirect to the admin.admin_login
endpoint.
Don’t worry about the endpoint naming, or Flask’s session
, just yet. We will cover them shortly.
Now save this file and try once again to get to localhost:5000/admin
. You should hopefully be redirected to /admin/login
, and see a different placeholder message. This means our admin section is protected against unauthenticated users!
Adding an Admin
Now that our page is guarded, we need to get ourselves a user so that we can unlock it. To achieve this we’ll need a new database table - admin_users
. Create a new file in the migrations
folder called V2__add_admin_user_table.sql
:
CREATE TABLE admin_user (id integer primary key autoincrement, username text, password text);
Run the flyway/flyway migrate
command in your project root folder and you will now have a second table. Time for our Peewee model. Create models/admin_user.py
:
from models import BaseModel
from playhouse.sqlite_ext import *
class AdminUser(BaseModel):
id = AutoField()
= TextField()
username = TextField()
password
class Meta:
= "admin_user" table_name
Much like our Product
class, the AdminUser
contains an id
field and two text
fields.
Now we need an admin user, but as previously mentioned, it’s best to just use migrations for table structure, and not data. So, let’s use the Python interactive shell to make us our first admin user. Open a terminal in the root of your project, ensure your virtualenv is sourced, and run python3
.
First we’ll create an AdminUser
and assign them a username. I will use “admin”, but you can use whatever name you’d like.
Python 3.10.0 (default, Oct 4 2021, 00:00:00) [GCC 11.2.1 20210728 (Red Hat 11.2.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from models.admin_user import AdminUser
>>> user = AdminUser()
>>> user.username = "admin"
Now our user needs a password. Since it’s bad security practise to store password in plain-text, we will store our password as a hash. Flask itself is built upon a library known as werkzeug
which provides functions for securely hashing and checking passwords. For example, to hash the password “admin”:
>>> from werkzeug.security import generate_password_hash
>>> generate_password_hash('admin')
'pbkdf2:sha256:260000$aOwDEpVMd5aO0oie$2916af44ac8b1e79d8a1ac74882f1602da2bae33fc690491c5014fe98f186ed4'
Pick a nice password for your admin user, and set the hash of this password as the AdminUser
’s password
attribute:
>>> user.password = generate_password_hash("admin")
Now we can write this row to our database with save()
:
>>> user.save()
1
And we can confirm the record is now in our database:
>>> u = AdminUser.get()
>>> u.username
'admin'
>>> u.password
'pbkdf2:sha256:260000$O0eSD3qnpfXpqVYn$4ca214264ba24b66ffc01b5ec801d50314439f24e947df03d4f763d4a58e5704'
>>>
Logging in
With an admin user created, we can now create a simple login screen and update our view accordingly. Create a folder named admin
in your templates
folder, then create two files inside of it - login.html
and index.html
.
Let’s start with our login form, so open up login.html
:
<h1>Log in</h1>
{% for msg in get_flashed_messages() %}<h2 style="color:red">{{msg}}</h2>
{% endfor %}
<hr />
<form name="login_form" method="POST">
<label for="username">Username</label>
<input type="text" name="username" id="username">
<br />
<br />
<label for="password">Password</label>
<input type="password" name="password" id="password">
<br />
<br />
<input type="submit" value="Log In">
</form>
This is mostly a standard HTML form. At the top you will notice a call to get_flashed_messages
. Flash messages are simply a short message which should be viewed only once by the user. They are typically used to convey the success or failure of a single action. We use a style
attribute to colour ours red, since it will only show when the login attempt is unsuccessful.
Now we have the template, let’s get our view rendering it. Open up web/views/admin/admin.py
:
from flask import request, session, redirect, url_for, render_template, flash
from werkzeug.security import check_password_hash
from models.admin_user import AdminUser
from web.blueprints import admin_blueprint
@admin_blueprint.route("/")
def admin_index():
return render_template("admin/index.html")
@admin_blueprint.route("/login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
= request.form.get("username")
username = request.form.get("password")
password
= AdminUser.get_or_none(AdminUser.username == username)
user if user:
= check_password_hash(user.password, password)
password_correct
if password_correct:
"logged_in"] = True
session["admin_user_id"] = user.id
session[
return redirect(url_for("admin.admin_index"))
else:
"Please try again!")
flash(else:
"Please try again")
flash(
return render_template("admin/login.html")
We add a couple of new imports from flask
, our old friend render_template
and the new flash
function to display the aforementioned flash messages.
The check_password_hash
function is brought in from werkzeug
so that we can test the password hash from our new AdminUser
instance.
Our admin_index
function is changed to render a template rather than returning a string.
The admin_login
route is now fully fleshed-out. We begin by checking the method
of our request
object so that we can tell whether we are on the initial page-load (a GET request) or if we have the data from our form submitted (a POST request). If we have a POST request, we use the form
attribute of our request
object to extract the POSTed data. The get
method acts like it does for a normal dict, it returns the value if present, or None
if not.
To validate the credentials, we try and grab an AdminUser
from our table by matching on the username
. If we have a user with this username, we then use check_password_hash
, passing the AdminUser
’s stored password and the password supplied by the form. If the passwords match then the submitted details are both correct, and we can log the user in. We achieve this by setting the "logged_in"
session variable, and also store the AdminUser
’s ID, then redirect the user off to the index.
If the username was not found, or the passwords did not match, the flash
function is used to display an error message, and the login.html
template will be rendered once again.
Before we can test this, let’s put a heading into admin/index.html
so that we can see when we’re logged in successfully:
<h1>Welcome to the admin!</h1>
<p>You are logged in</p>
Re-run your web_server.py
file and head over to http://localhost:5000/admin
. You should get redirected to the login form.
First try entering incorrect details in to have a look at the flash
message. Then put in your correct login details and see yourself sent back to the index page. Hooray!
Editing Data
Now that our admin page is guarded by the login system, we can start adding the functionality to create, read, update, and delete our database records. This is often referred to by the acronym CRUD.
Let’s start with “read” by showing a list of Product
s on the admin index. Change your view to the following:
...
@admin_blueprint.route("/")
def admin_index():
= Product.select().order_by(Product.name.asc())
all_products
return render_template("admin/index.html", all_products=all_products)
...
We are passing a generator of all products, ordered by their name
attribute, to our template. Now let’s display them on admin/index.html
:
<h1>Welcome to the admin!</h1>
<hr />
<h2>All Products</h2>
<br />
<table>
<thead>
<tr>
<th>Name</th>
<th>Price</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for product in all_products %}<tr>
<td>
<a href="/admin/{{product.id}}">
{{product.name|title}}</a>
</td>
<td>
£{{"%.2f"|format(product.price|float)}}</td>
<td>
<button onclick="deleteProduct({{product.id}})">Delete</button>
</td>
</li>
{% endfor %}</tbody>
</table>
<script>
function deleteProduct(productId) {
console.log("deleting product", productId);
}</script>
After our welcome message we display a table of all available products, showing their name, price, and some actions. Clicking a product’s name will take you to its update page (which we will create next), and each product has a “Delete” button which we can use later to remove a product from our database.
For the moment, this button will just log the clicked product’s ID to the javascript console.
Reload your admin index and you should see your table with your three products listed.
Let’s start by building the edit page, so we can click on those product names. Create a new template file templates/admin/products/edit_product.html
:
<h1>Edit {{product.name}}</h1>
<hr>
<form name="product_form" action="{{url_for('admin.save_product')}}" method="POST">
<input type="hidden" name="product_id" value="{{product.id}}">
<label for="name">Name:</label>
<input type="text" name="name" id="name" value="{{product.name}}">
<br/>
<br/>
<label for="price">Price:</label>
<input type="text" name="price" id="price" value="{{product.price}}">
<br/>
<br/>
<input type="submit" value="Save">
</form>
A basic HTML form with two text inputs, one for the product’s name and one for its price. We also have a hidden input which will send this product’s ID.
We can’t use this page yet, as the admin.save_product
view doesn’t exist yet, so hop back over to views/admin/admin.py
:
...
@admin_blueprint.route("/<int:product_id>")
def edit_product(product_id: int):
= Product.get_or_none(Product.id == product_id)
product if not product:
"Product not found")
flash(
return redirect(url_for("admin.admin_index"))
return render_template("admin/products/edit_product.html", product=product)
@admin_blueprint.route("/save-product", methods=["POST"])
def save_product():
= Product()
product
= request.form.get("product_id")
product_id if product_id:
= Product.get_or_none(Product.id == product_id)
product if not product:
return redirect(url_for("admin.admin_index"))
= request.form.get("name")
name = request.form.get("price")
price
= name
product.name = price
product.price
product.save()
return redirect(url_for("admin.edit_product", product_id=product.id))
First our edit_product
view. This view takes a product ID from the URL and makes sure it is an integer. We then use Peewee to try and fetch a product with this ID from the database. If it’s not found, we send the user back to the admin index, since we can’t show its form. If we find it, we then render the edit_form.html
template, passing the product as context.
The save_product
view begins by creating a new Product
instance. This will allow us to re-use this view later when we build the “create” functionality. If a product_id
is passed in our POST data, then the blank product is replaced by the result of a get_or_none
on the provided ID. If no product is found, we return the user to the admin index.
Once we have our Product
instance, we can get the POSTed data from the request.form
object and assign the values to the instance’s name
and price
attributes. We then call save()
to write the changes to our database, before reloading the edit_product
page.
Your site should be in working condition now, so go ahead and restart your web_server.py
command and view http://localhost:8080/admin
in your browser. After logging in, click a product’s name and you’ll see its edit form. Try changing the name and price and pressing Save. You should see your changes persist after saving, and back on the admin index page.
Groovy! Now let’s add the “create” functionality. Put this very simple new view into views/admin/admin.py
:
@admin_blueprint.route("/create-product")
def create_product():
return render_template("/admin/products/create_product.html")
And now create yourself a template at templates/admin/products/create_product.html
:
<h1>Create a Product</h1>
<hr>
<form name="product_form" action="{{url_for('admin.save_product')}}" method="POST">
<label for="name">Name:</label>
<input type="text" name="name" id="name">
<br/>
<br/>
<label for="price">Price:</label>
<input type="text" name="price" id="price">
<br/>
<br/>
<input type="submit" value="Save">
</form>
This should look familiar, since it’s mostly just the edit_form
again, but without the hidden input or value
attributes.
Now we need a link to this on our index, so open up templates/admin/index.html
and add this below your table:
...
<hr />
<a href="{{url_for('admin.create_product')}}">Create new Product</a>
Reload your admin index and try clicking on this link. You should be greeted with a blank form. Fill it in and create yourself a new product. After saving, your product should appear on the admin index amongst the others. Neat!
Just one step left now - “delete”. Still in your admin index template, alter the deleteProduct
javascript function at the bottom to the following:
async function deleteProduct(productId) {
var fd = new FormData();
.set('product_id', productId);
fd
var response = await fetch(
'{{url_for("admin.delete_product")}}',
{method: 'POST',
body: fd,
} ;
)
var res_json = await response.json()
alert(res_json.message);
.reload();
location }
This function uses the fetch
syntax to send a POST request to the admin.delete_product
view containing form data with the selected product’s ID. That view doesn’t exist yet, so open up views/admin/admin.py
for one final time and add the following:
@admin_blueprint.route("/delete-product", methods=["POST"])
def delete_product():
= request.form.get("product_id")
product_id = Product.get_or_none(Product.id == product_id)
product if not product:
return {"success": False, "message": "Product not found"}, 400
id == product_id).execute()
Product.delete().where(Product.
return {"success": True, "message": "Product Deleted"}
This view grabs the product_id
from the POST data and uses it to fetch a matching Product
instance. If not successful, an error response is returned (the 400
after the JSON is the HTTP status code of the request). If the product was found Peewee’s delete().where().execute()
functionality is used to delete the product from our database.
Reload your web_server
and site then give one of those delete buttons a whack. You should see a popup saying “Product Deleted” and the page should update with the product no longer in your table.
Great - that’s all parts of the CRUD functionality completed! With this out of the way, we can now go on to the most important part of any shop - the ability to make sales.
Chapter 5
Adding Purchase Abilities
Now that we have the ability to create products to list on our shop, let’s add the ability for a customer to purchase them. We’ll create a Cart which the visitor can use to store items, then a Checkout page where they can enter an email address to complete their order.
Creating a Cart
We can use Flask’s session
capabilities to give each user a Cart which will follow them around the site. The session
is essentially just a regular python dict
which will be stored as an encrypted cookie in the user’s browser, and thus will be available in every request they make. This allows us to store data semi-permanently against each individual user.
To use Flask’s session
we simply import it and start assigning values. We have seen this in action when creating our login system for the admin.
Let’s now display our cart on the site. Open up templates/base.html
and change the contents of the <body>
tag to the following:
<body>
<h1>Welcome to my shop!</h1>
<hr>
<span id="cart-items">{{cart|length}}</span>
Items in Cart:
<a href="/checkout">Checkout</a>
<hr>
{% block content %}
{% endblock%}</body>
Now the number of items in our cart will display underneath our header. Let’s create and pass a Cart through in our index page.
from flask import ..., session
@site_blueprint.route("/")
def index():
= Product.select()
products if "cart" not in session:
"cart"] = "1"
session[
return render_template("index.html", products=products, cart=cart)
Reload your site’s index page and you should see “Items in Cart: 1” at the top, as well as a link to a checkout page. Now try changing the "1"
to "123"
and you should see “Items in Cart: 3”.
Great, now we can keep track of the number of items in our Cart. However, if you load another page, you’ll get an error from Jinja because cart
is missing. To avoid having to remember to pass this variable in every single view, let’s create a global template context for our site views, so we will always have our cart
.
Global Jinja Context
Open up views/site/__init__.py
and change it to this:
from . import site
from web.blueprints import site_blueprint
@site_blueprint.context_processor
def inject_variables():
from flask import session
if "cart" not in session:
"cart"] = []
session[
return {"cart": session["cart"]}
Here we use the context_processor
decorator to inject variables into all templates under the site_blueprint
. This function grabs the session
from Flask and adds an empty list named "cart"
if it doesn’t already exist. Then it returns a dictionary, which will be the default context for any templates using Flask’s render_template
function within this blueprint.
Save this and open a product page on your site. You should now still see your Cart items in the header. Now we can remove the Cart-related code from our index
view again.
@site_blueprint.route("/")
def index():
= Product.select()
products return render_template("index.html", products=products)
Adding To Cart
Now we have a Cart, let’s get items added. Firstly, we’ll need to clear the string-version of our session variable. If you know how, clear the cookies for localhost
on your browser. If you don’t, you can add the line session["cart"] = []
to your index
view, load the index page, then remove the line.
With that taken care of, open up templates/products/view_product.html
and change your content
block to the following:
<h2>{{product.name}}</h2>
<p>This product costs £{{"%.2f"|format(product.price|float)}}</p>
<button onclick="addToCart({{product.id}})">Add to Cart</button>
<script>
async function addToCart(productId) {
var fd = new FormData();
.set('product_id', productId);
fd
var response = await fetch(
'{{url_for("site.add_product_to_cart")}}',
{method: 'POST',
body: fd,
};
)
var res_json = await response.json();
var cartItems = res_json.cart_items;
document.getElementById("cart-items").innerText = cartItems;
}</script>
We’re adding a button underneath our product’s information which will add this product to our Cart. This is achieved using an onClick
function called addToCart
which will call some javascript to send a request to our backend.
The addToCart
function creates a FormData
object to wrap up our chosen Product’s ID and then POSTs it to a new endpoint called add_product_to_cart
using fetch
. The server’s response is then parsed and we update the count of our Cart items using the value returned as cart_items
.
Let’s create this new endpoint now. Open up views/site/site.py
and add the following:
@site_blueprint.route("/add-product-to-cart", methods=["POST"])
def add_product_to_cart():
= request.form.get("product_id", type=int)
product_id if not product_id:
return {"success": False, "cart_items": len(session["cart"])}
= session["cart"]
current_cart
current_cart.append(product_id)"cart"] = current_cart
session[
return {"success": True, "cart_items": len(session["cart"])}
In this endpoint we check request.form
for the product_id
sent by our Javascript’s FormData
object and parse it as an integer using type=int
. If it isn’t found, we return an unsuccessful response which does not change the items in the Cart.
If the product is found we can then add its ID to our Cart. We do this by copying the cart from our session into a current_cart
temporary list, appending the Product’s ID to it, then assigning this back to the session’s Cart.
We then return a successful response which includes the cart_items
key which the Javascript can use to update the count in our header.
With this new endpoint in place, go ahead and load a product page in your browser. Click the “Add to Cart” button a few times and see your “Items in Cart” heading increase. Navigate to another Product page and see your Cart’s status follow you around.
Great! Now we have a working Cart, so let’s add a way for a user to Checkout.
Checking Out
When checking out, we will display all of the items in our user’s Cart and sum up the total price. There will then be a place for the user to enter their email address and complete the purchase. In a real online-shop this would be where the user makes an account and enters their payment information, but we’ve already learned about account creation, and taking payments is out of scope for this book.
We’ll need a new database table for storing any completed orders, so let’s go ahead and create that now. Make a file in your migrations
folder called V3__add_order_table.sql
containing the following:
create table "order" (id integer primary key autoincrement, timestamp_created string, email string, products string);
Here "order"
is in double-quotes because it is a reserved word in SQL.
Run flyway/flyway migrate
to create yourself a new table. Now we’ll need the Peewee model, too. Make a file in your models
folder called order.py
and add this new class:
from models import BaseModel
from playhouse.sqlite_ext import *
class Order(BaseModel):
id = AutoField()
= DateTimeField()
timestamp_created = TextField()
email = JSONField() products
We haven’t seen DateTimeField
or JSONField
yet. These are used to tell Peewee that the string values in the database can be interpreted as datetime
objects or json
dicts, respectively. The data in SQLite will remain a regular string, however.
Now that we can store a record of our orders, we’re in position to add the functionality to our shop. Let’s create a new view in our views/site/site.py
file for checking out:
@site_blueprint.route("/checkout", methods=["GET", "POST"])
def checkout():
= []
cart_items = {}
cart_products = 0
total_price for product_id in session["cart"]:
= Product.get_or_none(Product.id == product_id)
p if p.name not in cart_products:
= {"total": float(p.price), "quantity": 1}
cart_products[p.name] else:
"total"] += float(p.price)
cart_products[p.name]["quantity"] += 1
cart_products[p.name][
for name, price_info in cart_products.items():
cart_items.append(
{"name": name,
"quantity": price_info["quantity"],
"total_price": price_info["total"],
}
)+= price_info["total"]
total_price
return render_template(
"checkout.html", cart_items=cart_items, total_price=total_price
)
There’s quite a lot of logic here, but it’s mostly just processing the Cart items into a user-readable format.
We create three variables called cart_items
, cart_products
and total_price
. Next we loop through all of the Product IDs in our Cart and fetch the Product
instance for each.
If we haven’t come across this Product before we create a dict
inside of cart_products
which pairs the Product’s name
to another dict
of its price
and a quantity
of 1. If we already have this product in our cart_products
we simply increase the total price by its unit cost and increase the quantity by one.
Once we’ve built up this mapping of Produt names to price info, we can loop through our cart_products
and append a dict
displaying the information to our cart_items
list, as well as summing up the cost of each Product in our total_price
variable.
With the data processed, we can then pass our cart_items
and total_price
variables to a new template. Speaking of which, let’s create templates/checkout.html
now:
{% extends "base.html" %}
{% block content %}<h1>Checkout</h1>
<hr>
<table>
<tr>
<th>Product</th>
<th>Quantity</th>
<th>Total</th>
</tr>
{% for item in cart_items %}<tr>
<td>{{item.name}}</td>
<td>{{item.quantity}}</td>
<td>£{{"%.2f"|format(item.total_price)}}</td>
</tr>
{% endfor %}<tr>
<td>-</td>
<td>-</td>
<td>-</td>
</tr>
<tr>
<td><b>Total</b></td>
<td></td>
<td><b>£{{"%.2f"|format(total_price)}}</b></td>
</tr>
</table>
{% endblock %}
In this template we can loop through our cart_items
and display them neatly in a table. At the bottom of the table, we add a row full of dashes as a separator, then the total price in bold.
If you want to make the table look a bit nicer, you can add the following styles above your h1
heading:
<style>
table {border-collapse: collapse;
}
table tr td {border: 1px solid black;
text-align: center;
padding: 8px;
}</style>
Load up your shop and add some items to your Cart, then click the “Checkout” link in the header. You should see a nice breakdown of all items in your Cart, and a total price at the bottom.
Now we need a way for the purchase to complete, so add the following underneath your table:
<br>
<hr>
<h2>Complete your Purchase</h2>
<form name="checkout-form" method="POST">
<label for="email">Email Address</label>
<input type="email" name="email">
<br>
<br>
<input type="submit" value="Checkout">
</form>
This will add a form which takes the user’s email address. We’ll now need to process this on the server, so head back to views/site/site.py
and add the following lines just above your return
statement:
if request.method == "POST":
= request.form.get("email")
user_email = Order()
order = user_email
order.email = cart_products
order.products = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
order.timestamp_created
order.save()
"recent_order_id"] = order.id
session[
return redirect(url_for("site.complete"))
Then add an import at the top of the file:
from datetime import datetime
On a POST request, generated by a user submitting the form, we grab the provided email
from request.form
and create an instance of our Order
class. This order is populated with the email address, our cart_products
dict
, and the current date and time.
The ID assigned to the order is then saved to the session
and we redirect to the site.complete
endpoint, which we can write now:
@site_blueprint.route("/complete")
def complete():
if "recent_order_id" not in session:
return redirect(url_for("site.index"))
= Order.get_or_none(Order.id == session["recent_order_id"])
order
"recent_order_id")
session.pop("cart"] = []
session[
return render_template("complete.html", order=order)
If there’s no recent order we just redirect the user back to the index, as there’s no use for this page. Otherwise, we fetch the order details from our database using the ID we saved in the session
, then remove the recent_order_id
using pop
and clear out the user’s Cart.
Our templates/complete.html
template will be as follows:
{% extends "base.html" %}
{% block content %}<h1>Order Complete!</h1>
<hr>
<h2>Thanks for your order!</h2>
<br>
<h3>Confirmation will be sent to {{order.email}}</h3>
<hr>
<table>
<tr>
<th>Product</th>
<th>Quantity</th>
<th>Price</th>
</tr>
{% for name, price_info in order.products.items() %}<tr>
<td>{{name}}</td>
<td>{{price_info["quantity"]}}</td>
<td>£{{"%.2f"|format(price_info["total"])}}</td>
</tr>
{% endfor %}</table>
<hr>
<a href="/">Continue Shopping</a>
{% endblock %}
The complete.html
template simply thanks the user for their order, notifies them about incoming email confirmation, and renders the same table of their purchased products as the Checkout page.
Load up your shop in your browser and visit the Checkout page again. Enter something into the email input at the bottom and press the “Checkout” buton. You should see the confirmation of your purchase.
That’s it for our online shop! We’ve now got a Cart which follows the user around between page-loads, and a checkout system which collects and stores orders along with the customer’s email address.
The next chapter will focus on writing some more automated tests for this new functionality to ensure it always works as expected.
Chapter 6
More Testing
Now that our admin and site are fully-functional, we should add some automated tests to ensure that our site’s core functionality will still work as we add features in the future.
We’ll start with our admin section, since we need to ensure it’s only available when a user is logged in.
Testing the Admin
As we have two distinct parts to our site, we should split our test suite up to match. Create two folders in your tests
folder - site
and admin
. Then move your test_index.py
file into site
and rename it to test_site.py
. Now create a test_admin.py
file inside your tests/admin
folder and add the following:
from models.admin_user import AdminUser
from models.product import Product
from tests.helpers import *
from tests import testing_app
def test_index_redirects_to_login_if_not_logged_in():
= testing_app.get("/admin/")
res
assert res.status_code == 302
assert b"/login" in res.get_data()
This first test tries to load the /admin
page and asserts that the response is a 302 (redirect). We also see the URL we are being sent to, and we assert that it is the /login
page. You will notice the b
in front of our "/login"
string - this is because HTTP responses are bytes. You can check a response’s content with either a bytes string (like above) or by adding .decode("utf-8")
to res.get_data()
(as we did in chapter 3). We’ll be using bytes strings in this chapter, as it’s shorter to write.
def test_index_does_not_redirect_if_logged_in():
with testing_app.session_transaction() as s:
s.clear()"logged_in"] = True
s["admin_user_id"] = 1
s[
= testing_app.get("/admin/")
res
assert res.status_code == 200
This test uses the session_transaction
method to let us modify the Flask session before our request. We use this to set the "logged_in"
and "admin_user_id"
values so that we are now logged in as an admin. We once again load the /admin
route and assert that it is successful (rather than a redirect).
Logging in with the session_transaction
method will be needed for a lot of our methods, so let’s create a helper at the top of this file to perform this.
def admin_login():
with testing_app.session_transaction() as s:
s.clear()"logged_in"] = True
s["admin_user_id"] = 1 s[
Before we use this, however, we should assure our login functionality works as intended:
from models.admin_user import AdminUser
...
@with_test_db((AdminUser,))
def test_login_logs_user_in_with_correct_details():
with testing_app.session_transaction() as s:
s.clear()
"admin", "admin")
create_test_admin_user(
= {"username": "admin", "password": "admin"}
form_data
= testing_app.post("/admin/login", data=form_data, follow_redirects=True)
res
assert res.status_code == 200
assert b"Welcome to the admin" in res.get_data()
@with_test_db((AdminUser,))
def test_login_shows_error_with_incorrect_details():
with testing_app.session_transaction() as s:
s.clear()
"admin", "admin")
create_test_admin_user(
= {"username": "admin", "password": "password"}
form_data
= testing_app.post("/admin/login", data=form_data, follow_redirects=True)
res
assert res.status_code == 200
assert b"Please try again!" in res.get_data()
Before testing our login system we need to ensure there is no session, so we use session_transaction
to ensure the session has been cleared. We then need to make an AdminUser
which can be authenticated against. In the first test, we create our form data with the correct login details and use the testing_app.post
method to send this to the login URL. The data
argument is used to send our form_data
as POST data, and the follow_redirects
argument will make the res
response contain the page we are redirected to, rather than a 302 status code. We then check the page we are taken to and assert
that it contains the text from our admin header.
On the second test, since we send the wrong details, we check that the response contains our flash
ed error message.
Before we can run these first few tests we’ll need that helper function to create an AdminUser
. Open up tests/helpers.py
and we’ll add helpers for our other models:
from datetime import datetime
from models.admin_user import AdminUser
from models.order import Order
...
def create_test_admin_user(username: str, password: str):
from werkzeug.security import generate_password_hash
= AdminUser()
u = username
u.username = generate_password_hash(password)
u.password
u.save()
return u
def create_test_order(email: str, products: dict):
= Order()
o = email
o.email = datetime.now().format("%Y-%m-%d %H:%M:%S")
o.timestamp_created = products
o.products
o.save()
return o
Now we have the ability to make AdminUser
and Order
instances easily. Back to tests/admin/test_admin.py
, let’s write tests for the CRUD functionality for Product
s.
@with_test_db((Product,))
def test_create_product_shows_form():
admin_login()
= testing_app.get(f"/admin/create-product")
res = res.get_data()
res_data
assert b'<form name="product_form"' in res_data
@with_test_db((Product,))
def test_edit_product_shows_populated_form():
admin_login()
= create_test_product("Floss", "1.50")
p
= testing_app.get(f"/admin/{p.id}")
res = res.get_data()
res_data
assert b"Floss" in res_data
assert b"1.50" in res_data
assert b'<form name="product_form"' in res_data
To check creating we use our admin_login
helper to set up our session, then load the /create-product
URL and check that our form is there.
For editing we first need a product, so we create one using the helper. We then visit the admin URL for that product and check the name and price are shown, as well as the form.
@with_test_db((Product,))
def test_save_product_as_create():
admin_login()
assert Product.select().count() == 0
= {"name": "Floss", "price": "1.50"}
form_data
"/admin/save-product", data=form_data)
testing_app.post(
assert Product.select().count() == 1
= Product.get()
p assert p.name == "Floss"
assert p.price == "1.50"
@with_test_db((Product,))
def test_save_product_as_edit():
admin_login()
= create_test_product("Toothbrush", "2.99")
p
assert Product.select().count() == 1
= {"product_id": p.id, "name": "Floss", "price": "1.50"}
form_data
"/admin/save-product", data=form_data)
testing_app.post(
assert Product.select().count() == 1
= Product.get()
p assert p.name == "Floss"
assert p.price == "1.50"
We test saving in the context of both a create and an edit. For create we first check there are no instances in our database, using Peewee’s select().count()
, then POST some form data to our /save-product
URL. We now check that the database has an entry, and that the data in that entry is the same as from our form data.
Updating requires an existing product, so we create one and assert that it’s the only record in our database. Then we post form data with different values and check that a new instance was not created. We finish by asserting that the form data has replaced the original data for this record.
@with_test_db((Product,))
def test_delete_product():
admin_login()
= create_test_product("Floss", "1.50")
p
assert Product.select().count() == 1
= {"product_id": p.id}
form_data
"/admin/delete-product", data=form_data)
testing_app.post(
assert Product.select().count() == 0
Finally, deleting a product is tested by creating an instance then POSTing its ID to our /delete-product
URL. We then check that the row has been removed from our database.
Great, that’s all of our admin section tested! We can now be sure that adding new functionality won’t change the expected behaviour of any of these URLs.
Testing the Shop
Since we last wrote tests for the shop we’ve added a Cart and Checkout functionality. Let’s get tests for these features going. Open up tests/site/test_site.py
:
@with_test_db((Product,))
def test_add_product_to_cart():
= create_test_product("Floss", "1.50")
p = create_test_product("Toothbrush", "2.99")
p2
with testing_app as app_with_session:
"/")
app_with_session.get(
from flask import session
assert "cart" in session
assert session["cart"] == []
= app_with_session.post("/add-product-to-cart", data={"product_id": p.id})
res
assert session["cart"] == [p.id]
assert res.get_json()["cart_items"] == 1
= app_with_session.post("/add-product-to-cart", data={"product_id": p2.id})
res
assert session["cart"] == [p.id, p2.id]
assert res.get_json()["cart_items"] == 2
In order to test our Cart we need some products to add to it, so we create two using our helper.
The Flask session is not usually available to be imported in tests, but we can use a with
block around our testing_app
which will allow us to import and check the contents of the session
whilst inside.
Using this, we fire a request to the index page in order to create an empty cart
in our session
. We can then import this session
and check for its presence.
A couple of requests to /add-product-to-cart
are fired off and the session
’s cart
is checked for their IDs.
@with_test_db((Order, Product))
def test_checkout_get():
= create_test_product("Floss", "1.50")
p = create_test_product("Toothbrush", "2.99")
p2
with testing_app.session_transaction() as s:
s.clear()"cart"] = [p.id, p.id, p2.id]
s[
= testing_app.get("/checkout")
res = res.get_data()
res_data
assert b"Floss" in res_data
assert b"Toothbrush" in res_data
assert b"3.00" in res_data
assert b"2.99" in res_data
To test our Checkout page we create two products and use a session_transaction
to put them into our Cart. We then load the /checkout
page and get its contents. The names and prices of our products are checked for inside (we look for "3.00"
since we have 2 instances of our Floss at 1.50).
@with_test_db((Order, Product))
def test_checkout_post():
= create_test_product("Floss", "1.50")
p = create_test_product("Toothbrush", "2.99")
p2
assert Order.select().count() == 0
with testing_app.session_transaction() as s:
s.clear()"cart"] = [p.id, p.id, p2.id]
s[
= testing_app.post(
res "/checkout", data={"email": "a@a.com"}, follow_redirects=True
)= res.get_data()
res_data
assert b"Floss" in res_data
assert b"Toothbrush" in res_data
assert b"3.00" in res_data
assert b"2.99" in res_data
assert b"sent to a@a.com" in res_data
assert Order.select().count() == 1
= Order.get()
o assert o.email == "a@a.com"
assert o.products == {
"Floss": {"total": 3.0, "quantity": 2},
"Toothbrush": {"total": 2.99, "quantity": 1},
}
Placing an order is done by a POST request to /checkout
with products in our Cart and an email address sent via the form. We begin by creating our products again and adding them to our Cart. We also check the Order
table to ensure it’s empty.
After our POST request containing our email address, we check again for the same product information, as well as the email confirmation message.
We finish by ensuring the Order
has been created, and check it holds the correct data.
With that, our site is fully tested! We have the ability to quickly and easily check that the core behaviours of our site and admin remain intact as we add or remove features.
To finish off our project, we will have a brief look at how we can run code which makes use of our site’s data but outside of the context of a web request, using a library called Celery.
Chapter 7
Background Tasks with Celery
Sometimes we may have tasks which do not necessarily need to run before we provide feedback to the user. For example, a lot of online shops will not make customers sit and wait while their system sends the confirmation email. This allows developers to do potentially-slow data processing tasks without leaving the user staring at a “loading…” message. One way we achieve this with a Flask website is to make use of a python library named Celery.
Celery allows us to take advantage of separate “workers” running tasks independently of the web application but still using its data. These workers run in separate processes, meaning heavy processing tasks will not affect any web requests.
Heavy Processing
Let’s emulate a heavy processing task by adding a confirmation email to our Checkout endpoint. Create a folder in the root of your project called tasks
and inside make a python file called send_email.py
:
import time
def send_confirmation_email(email: str):
5) # emulate processing
time.sleep(print(f"sending email to {email}")
This function sleeps for 5 seconds to emulate a complex database query, then prints a message about emailing the provided email
. Sending an email is complex, and therefore out of scope for the purposes of this book, so we’ll make do with this print
statement.
Now open up web/views/site/site.py
and add a call to this function in your complete
view:
from tasks.send_email import send_confirmation_email
...
def complete():
...
send_confirmation_email(order.email)
return render_template("complete.html", order=order)
Open up your shop’s website and add some items to your Cart, then Checkout. You will notice your site hangs for a while before showing you the confirmation page. This is likely to be frustrating to users, who may leave the site, or refresh the page thinking something is wrong and accidentally place two orders.
Let’s now add Celery to our project so that we can remedy this potential issue.
Adding Celery
In order to run background tasks Celery itself needs a way to manage a queue. For this example we will be using Redis since it is very easy to set up and use.
In a new terminal window, source your virtualenv with source env/bin/activate
then run poetry add celery celery[redis]
to install Celery and its dependencies.
While this is installing we can bring in Redis. Specific instructions for doing this depend on the Operating System you are using, so search the web for “Redis install my_operating_system
” and find the official docs. On Linux, Redis is most likely available in your distro’s repositories. MacOS can use homebrew
, and Windows users can use the WSL terminal they are using to follow along with this book.
With Redis installed, running the server is as easy as executing the command redis-server
. You can leave this running while we set up Celery in our code.
Stop your web server if you haven’t already and open up web_server.py
in your editor. Change the beginning of the file to look like so:
from celery import Celery
from flask import Flask
= Flask(__name__)
app = "very secret"
app.secret_key
= Celery("My Shop", broker="redis://localhost:6379/0")
celery from web.views import admin, site
...
Now open your tasks/send_email.py
file and we can turn the send_confirmation_email
function into a Celery task:
import time
from web_server import celery
@celery.task
def send_confirmation_email(email: str):
5)
time.sleep(print(f"sending email to {email}")
Turning a synchronous function into an async one is as easy as importing celery
and decorating the function with @celery.task
. Now our Celery worker can see this task and will be able to process it independently of our web server.
To run a Celery worker we’ll need an entry point to our application. Create a file called run.py
in the root of the project:
from web_server import celery
And now we are set up to run a Celery worker! Open up a new terminal and source your virtualenv. Now run the following command:
celery -A run worker -l info -E
This points Celery to our new run.py
file, tells it to run a worker
, sets the loglevel
to info
so that we can see what it’s doing, and finally uses -E
to tell the worker to log events about each job.
One last thing now - we need to tell our complete
view to call this function asynchronously using the Celery worker. We do this by calling the delay
function which has now been added by the @celery.task
decorator. Open up web/views/site/site.py
and change the line in complete
:
send_confirmation_email.delay(order.email)
Now our slow function will be executed by the Celery worker, meaning the customer will see the Complete page without waiting for their confirmation email to be sent.
Start up your web server with python3 web_server.py
and buy some products on your website. You should notice the Complete page does not hang any more. Open the terminal running your celery
command and you should see a message similar to this:
[2022-05-02 07:31:27,561: WARNING/ForkPoolWorker-8] sending email to test@example.com
Congratulations, you have successfully run a background task from your website!
Adjusting the Tests
Since we should not have to run Redis in order to test our application, we’ll need to stop our server from trying to connect during tests. We can detect if we are testing using this line:
if "pytest" in sys.modules:
Open up factory.py
and change the creation of the celery
instance to the following:
import sys
...
if "pytest" in sys.modules:
= Celery("My Shop", broker="memory://")
celery else:
= Celery("My Shop", broker="redis://localhost:6379/0") celery
This will tell Celery to use an in-memory queue during unit tests. This does not work when actually trying to use Celery in practise, which is why we had to install Redis.
With Celery taken care of, our tests should now run successfully. However, we can actually now check that the confirmation email task is called when an order completes.
Mocking Celery’s Delay
In a new terminal window, source your virtualenv and run poetry add --dev pytest-mock
. This will allow us to use Mocking to test if a function has been called during the execution of another.
First, we’ll create a mock of the delay
function in our tests/helpers.py
file:
...= [
__all__
..."mock_send_confirmation_email_delay",
"with_test_db",
]
...
def mock_send_confirmation_email_delay(mocker):
from tasks.send_email import send_confirmation_email
object(send_confirmation_email, "delay", autospec=True) mocker.patch.
We use the mocker
fixture (which will be passed to this function) to patch
the delay
function with a mock. This will then allow us to inspect whether the function was called during the test.
Now open up tests/site/test_site.py
and change your test_checkout_post
function to the following:
@with_test_db((Order, Product))
def test_checkout_post(mocker):
mock_send_confirmation_email_delay(mocker)= create_test_product("Floss", "1.50")
p = create_test_product("Toothbrush", "2.99")
p2
assert Order.select().count() == 0
with testing_app.session_transaction() as s:
s.clear()"cart"] = [p.id, p.id, p2.id]
s[
= testing_app.post(
res "/checkout", data={"email": "a@a.com"}, follow_redirects=True
)= res.get_data()
res_data
assert b"Floss" in res_data
assert b"Toothbrush" in res_data
assert b"3.00" in res_data
assert b"2.99" in res_data
assert b"sent to a@a.com" in res_data
assert Order.select().count() == 1
= Order.get()
o assert o.email == "a@a.com"
assert o.products == {
"Floss": {"total": 3.0, "quantity": 2},
"Toothbrush": {"total": 2.99, "quantity": 1},
}
from tasks.send_email import send_confirmation_email
send_confirmation_email.delay.assert_called_once()assert send_confirmation_email.delay.call_args[0][0] == "a@a.com"
The test now takes an argument called mocker
, which will be injected by the pytest-mock
library we installed. This is passed to the mock_send_confirmation_email_delay
helper function to carry out the mocking.
Once we have asserted that the contents of the page are as expected, we have two new asserts at the end of our test. We first use assert_called_once
to check that our send_confirmation_email
function was sent to the Celery worker. Then we can use its call_args
attribute to check that the argument passed to this function was the email address we entered. The [0][0]
is used to get the first call to this function, then the first argument of that call. Since there is only one call, and only one argument, this is all we need to check.
Fantastic, we’ve now got a way of running background tasks outside of a web request to avoid slowing down the user’s page load, and we have an automated test to ensure they are called with the correct arguments!
With that, our project is now complete! We have ourselves a website split into two sections - an admin protected by a login, and a public-facing shop. We can process and store customer orders in a database, and process the data within outside of a web request context using Celery. Our whole application has a suite of automated tests which ensure that the critical functionality is behaving as it should.
Appendix 1 - Peewee
In these appendices I will be covering some more complex parts of the mentioned libraries which I couldn’t fit into our simple example, but still deem important to know. These can be read either before following the book’s examples, or after finishing the project.
As always, the official Peewee docs are going to be more accurate than what is written here.
These examples are correct for Peewee version 3.14.8.
Basics
Selecting
Call the .select
method on your model class:
Book.select()
This is the equivalent of SELECT * FROM book;
.
This function returns an iterable
of instances of your Model class (Book
in this example). To make use of them, you will then need to consume the iterable with either a for
loop or list comprehension:
= Book.select()
books
for book in books:
# `book` is now an instance of the `Book` model class.
print(book.title)
# OR:
= Book.select()
books = [b for b in books] all_books
Selecting specific fields
All examples in this book’s project select all fields from our tables, as they are small. This can be a performance hit as tables grow larger.
To select only specific columns, pass the model name, a dot, and the attribute as arguments to your .select
call. For example:
Book.select(Book.author, Book.title)
This query selects only the author and title columns.
Conditions
To filter what you are selecting, pass regular python conditional statements to a .where
call:
== "Flask by Example")
Book.select().where(Book.title
>= 5.00) Book.select().where(Book.price
To pass multiple conditions, wrap each in brackets and separate them with an ampersand &
for AND
, or a pipe |
for OR.
For example:
>= 5.00) & (Book.published == True) ) Book.select().where( (Book.price
Is the equivalent of:
SELECT * FROM book WHERE book.price >= 5.00 AND book.published = true;
For an OR
query:
== "Flask By Example") | (Book.title == "Tkinter By Exmaple") ) Book.select().where( (Book.title
Is the equivalent of:
SELECT * FROM book WHERE book.title = 'Flask By Example' OR book.title = 'Tkinter By Example';
You can combine both AND
and OR
using brackets, just like in regular SQL:
Book.select().where(
(> "2020-01-01")
(Book.date_published | (
) < "2020-01-01") & (Book.revenue >= 100)
(Book.date_published
) )
The above selects all books which are either newer than Jan 1st 2020, or both older than Jan 1st 2020 and have a revenue above 100. In SQL this would be:
SELECT * FROM book WHERE (book.date_published > '2020-01-01') OR (book.date_published < '2020-01-01' AND book.revenue >= 100);
Getting a Single Instance
If there should only be a single instance of a row matching your condition, you can use the .get
method to grab it directly. Arguments passed to this method are the same conditionals as would be passed to a .where
call.
For example:
= Book.get(Book.name == "Flask By Example") flask_book
Here your flask_book
variable will be an instance of your Book
model.
This will throw a ModelNotFound
exception if the criteria do not match and no record could be found.
If you aren’t certain that there is a matching row in your database, you can instead use .get_or_none
, which will return a Model
instance if a match is found, or None
if not.
= Book.get_or_none(Book.title == "Flask By Example")
maybe_book
if not maybe_book:
print("No book with that name")
Advanced
Aliases
You can append .alias
to an attribute inside a call to .select
to alias the field. This is useful in situations where multiple tables have a column with the same name. For example:
"book_title")) Book.select(Book.author, Book.title.alias(
This selects a Book’s author, and aliases its title to book_title
.
Joins
Join two tables by appending a call to .join
after a .select
. The call to join takes two arguments - the Model class to join with, and a condition on which to perform the join, named on
. To select fields from a joined table, simply pass the Model’s name and attribute to select
as normal.
For example:
Book.select(
Book.title, Book.author, Author.date_of_birth
).join(=(Author.name == Book.author)
Author, on )
This example performs the query:
SELECT book.title, book.author, author.date_of_birth
FROM book
JOIN author ON author.name = book.author;
Join types
Join types are available as constants which can be imported from Peewee like so:
from peewee import JOIN
These are then passed as the second argument to your .join
call:
from peewee import JOIN
Book.select(
Book.title, Book.author, Author.date_of_birth
).join(=(Author.name == Book.author)
Author, JOIN.INNER, on )
See the Peewee documentation for an up-to-date list of available joins.
Functions
Functions are found in a module named fn
. If you know the name of the function you would like to use in regular SQL, it is probably called the same thing in the Peewee module.
For example, to use a MAX
function:
from peewee import fn
= Book.select(fn.MAX(Book.copies_sold)) most_copies_sold
You can also alias these function calls:
from peewee import fn
= Book.select(fn.SUM(Book.revenue).alias("total_revenue"), Book.title).group_by(Book.title) books
Group By, Order By, Limit, Offset
These functionalities are all offered by functions of the same name. For example:
= Book.select(Book.title, fn.SUM(Book.revenue)).group_by(Book.title).order_by(Book.title.asc()).limit(5).offset(5) books
As above, ordering is done by specifying the Model class and attibute, then appending .asc()
or .desc()
for ascending or descending. To alter where NULLs appear, pass nulls="LAST"
or nulls="FIRST"
to .asc
or .desc
.
Appendix 2 - Pytest
In these appendices I will be covering some more complex parts of the mentioned libraries which I couldn’t fit into our simple example, but still deem important to know. These can be read either before following the book’s examples, or after finishing the project.
As always, the official Pytest docs are going to be more accurate than what is written here.
These examples are correct for Pytest version 7.0.0.
Basics
Tests by default live in a folder called tests
. Within that folder, any python files which begin with test_
will be run by Pytest. Similarly, any functions inside those files which begin with test_
will be detected and executed as tests.
Running tests is achieved by executing the command pytest
in the same directory as your tests
folder. If you specify a file as the second argument, e.g. pytest tests/models/test_models.py
, only that file will be run.
Some of the flags you may want to pass to the command are:
-s
Show the ouput ofprint
statements.-x
Stop after a test fails.-v
Display the file and function of each test as it runs.-k
Only run tests whose name matches a provided string. For examplepytest -k email
will only run tests if their function name contains the word “email”.
You can pass multiple flags in one go, for example pytest -sxv
.
Advanced
Fixtures
Fixtures are a rather deep topic, which you can find on the Pytest docs in more detail than I could ever write. However, for the purpose of understanding this book, you can think of a fixture as an object injected into a test which requires it.
The two fixtures I find myself using the most are mocker
for Mocking and monkeypatch
for Patching.
Mocking
Mocking is replacing a piece of code with a dummy object which keeps track of how many times it was called and with what arguments. This dummy function will not execute the original code.
Mocking in Pytest is made very easy with the pytest-mock
library. With this installed simply add a parameter named mocker
to your test functions and a Mock
object will be injected.
Mocking a Function
To mock a function, pass a string representing the import path to the function to mocker.patch
, like so:
"helpers.emails.send_email") mocker.patch(
Now in your test, import this function and use a method to check it was / wasn’t called:
def test_send_email_called(mocker):
"helpers.emails.send_email")
mocker.patch(
function_which_should_send_email()
from helpers.emails import send_email
send_email.assert_called_once()
Mocking an Object
To mock a method of an object, rather than a standalone function, use mocker.patch.object
. Pass the object as the first argument, and a string of the method to mock as the second.
object(Book, "get_total_revenue") mocker.patch.
You can now check the method was called in much the same way as a function:
def test_get_total_revenue(mocker):
from models.book import Book
object(Book, "get_total_revenue")
mocker.patch.
get_all_book_revenues()
Book.get_total_revenue.assert_called_once()
Spying
Spying is like Mocking, except the original functionality of the mocked function will execute. This is useful during integration tests, where subsequent function calls rely on the results of the one you wish to mock.
Spying is also handled by the pytest-mock
module, and is available in the mocker
fixture.
def test_get_total_revenue(mocker):
from models.book import Book
= mocker.spy(Book, "get_total_revenue")
book_spy
get_all_book_revenues()
book_spy.assert_called_once()
Patching
Patching is changing the behaviour of a function or method temporarily for the duration of a single test. This is useful for integration tests which rely on functions gathering data which is not static, such as the current date or a request to an external API.
Patching is achieved using the monkeypatch
fixture built in to Pytest.
Patching a function
Patching a function is a bit more complicated than mocking, but the syntax is generally similar. The monkeypatch
object uses the setattr
method to patch a function, which requires the function-to-patch as the first argument, and the replacement function as the second.
Instead of passing the function directly, however, you need to pass its __code__
attribute:
def patched_send_email():
print("sending")
setattr("helpers.emails.send_email.__code__", patched_send_email.__code__) monkeypatch.
Patching an Object
Patching an object does not require the use of __code__
, and is also achieved via setattr
. The first argument is the object, the second is a string of the method to patch, and the third is the replacement function:
from models.book import Book
def patched_get_total_rev():
return 50.00
setattr(Book, "get_total_revenue", patched_get_total_rev) monkeypatch.