Flask App Authentication with JWT

Hello, welcome back to the 3rd post of my Flask series, this post would be a continuation of my previous post, please click on any link below to view any post you missed

OUTLINE OF FLASK SERIES

 

The code for this post can be found on my GitHub account under the branch feat/jwt-auth-model

 

At the end of this post, you will learn the following

  • Manipulating a database using SQLAlchemy and creating database tables using Flask-Migrate.
  • Hashing password before saving to database.
  • Using the re module, which provides regular expression matching operations to check whether the password received as an argument fulfills many certain requirements such as password must be longer than 8 characters with a maximum of 200 characters. The password must include at least one uppercase letter, one lowercase letter, one number, and one symbol.
  • Basic usage of Webargs.
  • Basic knowledge of JWT (JSON Web Tokens)
  • Create an access token using Flask-JWT-Extended
  • Work with a refresh token
  • Restrict access using a blacklist token (logout users)

 

We would install new dependencies for our new post

  • Flask-SQLAlchemy: adds support for the SQLAlchemy ORM to Flask applications.
    This extension simplifies executing common SQLAlchemy tasks within a Flask application and is a very popular ORM package that allows us to access objects rather than database tables for data.
  • Flask-Migrate: uses the Alembic package to handle SQLAlchemy database migrations for flask applications. We will use Flask-Migrate to set up our MySQL database
  • PyMySQL : is a Python MySQL Driver
  • Flask-JWT-Extended:  is a user authentication package that provides the create_access_token() function for making new access JWTs.
    It also provides the @jwt_required decorator() for protecting the API endpoints (for checking whether users have logged in).
    Also, the get_jwt_identity() function is provided to get the identity of a JWT in a protected endpoint. This allows us to know who the authenticated users are. This is an extremely useful package for user authentication.

In this post, we will be using MySQL as our backend database for development.

If you don’t have MySQL installed on your system, you can use any database of your choice or use the default dev.sqlite provided.

Open your terminal and do the following to create a new database.

mysql -u root

mysql> CREATE USER 'diary_admin'@'localhost' IDENTIFIED BY 'password2020';
Query OK, 0 rows affected (0.00 sec)

mysql> CREATE DATABASE dev_db;
Query OK, 1 row affected (0.00 sec)

mysql> GRANT ALL PRIVILEGES ON dev_db . * TO 'diary_admin'@'localhost';
Query OK, 0 rows affected (0.00 sec)

Manipulating a database using SQLAlchemy and creating database tables.

Before we begin, let me define some following terms
Object Relational Mapping (ORM) is a programming technique that allows the developer to map objects in the programming language to the data model in a database, making it no need to write raw SQL to persist in a database.
SQLAlchemy is the most popular ORM in Python.

JWT JSON Web Token.
JWT is used for user authentication and is passed between the user and the server.
JWT works by encoding the user identity and sign it digitally, making it an unforgeable token that identifies the user, and the application can later
control access for the user based on their identity.

A JWT is a string composed of the header, payload, and signature. Those three parts are separated by a period (.).

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.
eyJpc3MiOiJ0b3B0YWwuY29tIiwiZXhwIjoxNDI2NDIwODAwLCJodHRwOi8vdG9wdGFsLmNvbS9qd3RfY2xhaW1zL2lzX2FkbWluIjp0cnVlLCJjb21wYW55IjoiVG9wdGFsIiwiYXdlc29tZSI6dHJ1ZX0.
yRQYnWzskCZUxPwaQupWkiUzKELZ49eM7oWxAQK_ZXw

 

Defining Our Model
Models are like schemas in the database, we would implement a user Model.

Create a new model.py file in the api folder, add the following code below to it.

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import IntegrityError, SQLAlchemyError


db = SQLAlchemy()





class BaseModel(db.Model):
    """Define the base model for all other models."""

    __abstract__ = True
    id = db.Column(db.Integer(), primary_key=True)
    created_on = db.Column(db.DateTime(), server_default=db.func.now(), 
                      nullable=False)
    updated_on = db.Column(db.DateTime(),nullable=False,
                           server_default=db.func.now(),
                           onupdate=db.func.now())
    def save(self):
        """Save an instance of the model from the database."""
        try:
            db.session.add(self)
            db.session.commit()     
        except IntegrityError:
            db.session.rollback()
        except SQLAlchemyError:
            db.session.rollback()

    def update(self):
        """Update an instance of the model from the database."""
        return db.session.commit()


    def delete(self):
        """Delete an instance of the model from the database."""
        try:
            db.session.delete(self)
            db.session.commit()
        except SQLAlchemyError:
            db.session.rollback()
            
    


class User(BaseModel):
    __tablename__ = 'user'

    username = db.Column(db.String(80), nullable=False, unique=True)
    email = db.Column(db.String(200), nullable=False, unique=True)
    password = db.Column(db.String(200),nullable=False)
    
    
    @classmethod
    def get_by_username(cls, username):
        return cls.query.filter_by(username=username).first()

    @classmethod
    def get_by_email(cls, email):
        return cls.query.filter_by(email=email).first()


    @classmethod 
    def get_by_id(cls, id):                 
        return cls.query.filter_by(id=id).first()





 

First of all, db = SQLAlchemy(), the db object is instantiated from the class SQLAlchemy and provides access to all the functionality of Flask-SQLAlchemy.
Before we created our User we would create a BaseModel, that our User model would inherit. The Base model would contain the following

id: The identity of an object (primary key)
created_at: The creation time of the object
updated_at: The last update time of the object

and three methods:

  • save: This is to persist the data to the database and handles errors that may occur.
  • update: This is to update information in the database.
  • delete: This is to delete information from the database.
The User model will be mapped to the user table in the database.
The fields and methods we defined for our user model are as follows:
  • username: The username of the user. The maximum length allowed is 80 characters. It can’t be null and is a unique field.
  • email: The user’s email. The maximum length allowed is 200. It can’t be blank and is a unique field.
  • password: The user’s password. The maximum length allowed is 200.
  • is_active: This is to indicate whether the account is activated by email. It is a Boolean field with a default value of False.

We are also going to define 3 methods in the user model:
get_by_username: This method is used for searching the user by username.
get_by_email: This method is used for searching the user by email.
get_by_id: This method is used for searching the user by id.

Update your .env file

Update your .env file and add your database URL and set your secret key.

Your secret key is used for generating cryptographically strong random numbers suitable for managing data as authentication, security tokens, so make sure you add strings, that is not easy to decode.

DEV_DATABASE_URL='mysql+pymysql://diary_admin:password2020@localhost/dev_db'
FLASK_APP=main.py 
FLASK_DEBUG=1 
FLASK_ENV=development
SECRET_KEY='GGFGFJ77Thh68jbbb&'

We make a little change and add the following to the app.py file.
add the db object imported from the models.py file and Migrate object to the create app function.

CREATE OUR RESOuRCES (VIEW)

Create an instance of Flask-JWT-Extended. We first import the JWTManager class from flask_jwt_extended,  then we instantiate a Flask-JWT-Extended instance by calling JWTManager(), and assign it to the jwt variable.

from flasgger import Swagger
from flask import Flask
from flask_migrate import Migrate
from flask_restful import Api
from webargs.flaskparser import abort, parser

from api.config import env_config
from api.models import db
from utils import errors
from werkzeug import exceptions


from flask_jwt_extended import JWTManager

api = Api()

jwt = JWTManager()

def create_app(config_name):

    #......................
   
    db.init_app(app)   # Add db session, new code here
 
    Migrate(app, db) #new code here
    jwt.init_app(app)
    
    #--------------------------
        
    return app

The next step is execution. We will use Flask-Migrate to build a script to create the user tables, open your terminal, run this.

flask db init

You would see this below and you notice a new folder named migrations was created.

Creating directory /home/oluchi/Desktop/blog_Notes/diary_app/migrations ...  done
  Creating directory /home/oluchi/Desktop/blog_Notes/diary_app/migrations/versions ...  done
  Generating /home/oluchi/Desktop/blog_Notes/diary_app/migrations/script.py.mako ...  done
  Generating /home/oluchi/Desktop/blog_Notes/diary_app/migrations/alembic.ini ...  done
  Generating /home/oluchi/Desktop/blog_Notes/diary_app/migrations/README ...  done
  Generating /home/oluchi/Desktop/blog_Notes/diary_app/migrations/env.py ...  done
  Please edit configuration/connection/logging settings in '/home/oluchi/Desktop/blog_Notes/diary_app/migrations/alembic.ini' before proceeding.

RUN  flask db migrate -m “create user table”
This command to create the database and tables, you noticed we added a comment to it( create user table)

INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.autogenerate.compare] Detected added table 'user'
  Generating /home/oluchi/Desktop/blog_Notes/diary_app/migrations/versions/37fb92a6fcad_create_user_table.py ...  done

Now, please check /migrations/versions/37fb92a6fcad_create_user_table.py under the versions folder. This file is created by Flask-Migrate.
Note that you may get a different revision ID here. Please review the file before you run the flask db upgrade command.
That’s because, sometimes, it may not detect every change you make to your models.

Lastly, run the command below

flask db upgrade
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 37fb92a6fcad, create user table

Now let’s view our DB to see if everything is in check.

mysql> use dev_db;
Database changed
mysql> show tables;
+-------------------+
| Tables_in_dev_dbb |
+-------------------+
| alembic_version   |
| user              |
+-------------------+
2 rows in set (0.00 sec)

Yes, everything is in check.

The next step is to create our Resources  (View Classes).

We are going to create 5 endpoints.

  • UserRegistrationResource:  This resource is for creating a new user.
  • UserLoginResource: This resource is for logging in the user and creating the user identity using create_access_token() from Flask-JWT-Extended.
  • UserInfoResource: This resource is for showing the user biodata, here we would use the created access_token from UserLoginResource to log in the user and check for the user authenticity before he/she can access their biodata.
  • RefreshAccessTokenResource: For obtaining a new access token by using the refresh token.
  •  RevokeAccessTokenResource: It works by placing the token into a blacklist when the user is logged out.

 

Create a new file user.py in the api folder, and copy the code below into it.

import re
from http import HTTPStatus

from flask_jwt_extended import (create_access_token, create_refresh_token,
                                get_jwt_identity, get_raw_jwt, jwt_optional,
                                jwt_refresh_token_required, jwt_required)
from flask_restful import Resource
from webargs import validate
from webargs.fields import Email, Str
from webargs.flaskparser import use_kwargs
from werkzeug.security import check_password_hash, generate_password_hash

from api.app import api
from api.models import User

black_list = set()

PASSWORD_VALIDATION = validate.Regexp(
    "^(?=.*[0-9]+.*)(?=.*[a-zA-Z]+.*).{7,16}$",
    error="Password must contain at least one letter, at"
    " least one number, be longer than six charaters "
    "and shorter than 16.")



class UserRegistrationResource(Resource):
    """Define endpoints for user registration."""

    form_validation = {
        "username":Str(required=True,location="json"),
        "email":  Email(required=True, location="json"),
        "password":Str(required=True, 
        location="json",validate=PASSWORD_VALIDATION)
        
    }

    @use_kwargs(form_validation)
    def post(self, username, email, password):
        """Create new  user."""
        
        # Check if use and email exist before creation

        if User.get_by_username(username):
            return {'message': 'username already exist'}, 
             HTTPStatus.BAD_REQUEST

        if User.get_by_email(email):
            return {'message': 'email already exist'}, HTTPStatus.BAD_REQUEST

        password= generate_password_hash(password)

        user = User(
            username=username,
            email=email,
            password=password
        )

        user.save()

        data = {
            'id': user.id,
            'username': user.username,
            'email': user.email
        }

        return data, HTTPStatus.CREATED
        



class UserLoginResource(Resource):
    """Define endpoints for user login."""

    user_login = {
        "email":Email(required=True,location="json"),
        "password":Str(required=True, location="json")
        
    }

    @use_kwargs(user_login)
    def post(self, email, password):
        """Create new  user."""
        
        user = User.get_by_email(email)
        if user and check_password_hash(user.password, password):
            return {
                "status": "success",
                "data": {
                    "user_id": user.id,
                    "email": user.email,
                    "access_token": create_access_token(identity=user.id, 
                     fresh=True),
                    "refresh_token": create_refresh_token(identity=user.id)
                }
            } , HTTPStatus.OK
        return {
            "status": "fail",
            "data": {
                "msg": "Unable to authenticate user: Invalid credentials"
            }
        }, HTTPStatus.UNAUTHORIZED



        

class UserInfoResource(Resource):

    @jwt_required
    def get(self):

        user = User.get_by_id(id=get_jwt_identity())
        if user:

            data = {
                'message':"welcome to your biodata page",
                'id': user.id,
                'username': user.username,
                'email': user.email,
            }

            return data, HTTPStatus.OK
        return {"status":"fail"}, HTTPStatus.UNAUTHORIZED



class RefreshAccessTokenResource(Resource):

    @jwt_refresh_token_required
    def post(self):
        
        current_user = get_jwt_identity()
        if current_user:

            token = create_access_token(identity=current_user, fresh=False)

            return {'token': token}, HTTPStatus.OK
        return {"message": "invalid user"}, HTTPStatus.UNAUTHORIZED


class RevokeAccessTokenResource(Resource):

    @jwt_required
    def post(self):
        jti = get_raw_jwt()['jti']

        if jti:

            black_list.add(jti)

            return {'message': 'Successfully logged out'}, HTTPStatus.OK
        return {"message":"Something bad occurred while trying to log you out"
            }, HTTPStatus.BAD_REQUEST



Explaining the code above, we imported use_kwargs from Webargs.flaskparser to validate our client input, check whether the user already exists in the database by means of User.get_by_user(username).

If such an entry is found, that means the user has already registered and we will simply return an error message. We will also perform the same check on email as well.

We created a method to check our password strength
We hashed our password using check_password_hash from Werkzeug and persisted to the database using user.save().

 

UPDATE THE CONFIG.PY FILE

We will then add the following code in config.py. As you can tell, we are enabling the blacklist feature and also telling the application to check both the access and refresh token, we also set an expiry date for our access tokens too.

Your updated config.py will look like this:

import os

basedir = os.path.abspath(os.path.dirname(__file__))




class Config:
    SECRET_KEY = os.environ.get("SECRET_KEY")
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    JWT_ERROR_MESSAGE_KEY = 'message'
    JWT_BLACKLIST_ENABLED = True   
    JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh']
    JWT_ACCESS_TOKEN_EXPIRES = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRES",  
    14400))


    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = os.environ.get(
        "DEV_DATABASE_URL"
    ) or "sqlite:///" + os.path.join(basedir, "dev.sqlite")


class TestingConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = os.environ.get(
        "TEST_DATABASE_URL"
    ) or "sqlite:///" + os.path.join(basedir, "test.sqlite")


class ProductionConfig(Config):
    SQLALCHEMY_DATABASE_URI = os.environ.get(
        "DATABASE_URL"
    ) or "sqlite:///" + os.path.join(basedir, "prod.sqlite")


env_config = {
    "development": DevelopmentConfig,
    "testing": TestingConfig,
    "production": ProductionConfig,
}

We will then import the black_list in app.py and a check_if_token_in_blacklist() method that checks whether the token is on the blacklist.

Your updated app.py file will look like this.

from flasgger import Swagger
from flask import Flask
from flask_migrate import Migrate
from flask_restful import Api
from webargs.flaskparser import abort, parser

from api.config import env_config
from api.models import db
from utils import errors
from werkzeug import exceptions
from resources.user import UserInfoResource, UserLoginResource, UserRegistrationResource, RefreshAccessTokenResource, RevokeAccessTokenResource
from resources. default import DefaultResource


from flask_jwt_extended import JWTManager
from resources.user import black_list

api = Api()

jwt = JWTManager()


def create_app(config_name):

    import resources

    app = Flask(__name__)

    app.config.from_object(env_config[config_name])
    
   
    db.init_app(app)   # Add db session, new code here
 
    Migrate(app, db) #new code here
    #register api 
    api.init_app(app)
    Swagger(app)
    
    jwt.init_app(app)
    #error handling
    @jwt.token_in_blacklist_loader
    def check_if_token_in_blacklist(decrypted_token):
        jti = decrypted_token['jti']
        return jti in black_list

    app.register_error_handler(exceptions.NotFound,
                               errors.handle_404_errors)

    app.register_error_handler(exceptions.InternalServerError,
                               errors.handle_server_errors)

    app.register_error_handler(exceptions.BadRequest,
                               errors.handle_400_errors)

    app.register_error_handler(FileNotFoundError,
                               errors.handle_400_errors)

    app.register_error_handler(TypeError, errors.handle_400_errors)

    app.register_error_handler(KeyError, errors.handle_404_errors)

    app.register_error_handler(AttributeError,
                               errors.handle_400_errors)

    app.register_error_handler(ValueError, errors.handle_400_errors)

    app.register_error_handler(AssertionError,
                               errors.handle_400_errors)
    
    
    #new code
    @parser.error_handler
    def handle_request_parsing_error(err, req, schema, *, error_status_code, 
     error_headers):
        """webargs error handler that uses Flask-RESTful's abort function to   
        return
        a JSON error response to the client.
        """
        abort(error_status_code, errors=err.messages)
        
    return app


 #register our urls for user module
api.add_resource(UserRegistrationResource,
                     "/v1/user/register/",
                     endpoint="user_registration")
api.add_resource(UserLoginResource,
                     "/v1/user/login/",
                     endpoint="user_login")
api.add_resource(UserInfoResource,
                     "/v1/user/user_info/",
                     endpoint="user_info")

api.add_resource(RefreshAccessTokenResource,
                     "/v1/user/refresh_token/",
                     endpoint="refresh_token")

api.add_resource(RevokeAccessTokenResource,
                     "/v1/user/signout_access/",
                     endpoint="signout_access")

#register url for default

api.add_resource(DefaultResource, "/", endpoint="home")

You would notice at the bottom of the file, I registered all our resources URL after importing it from resources module, UserRegistrationResource by adding this api.add_resource (UserRegistrationResource,”/v1/user/register/”, endpoint=”user_registration”) and same with other resources too.

testing Web Services with HTTPie

We are going to use our installed httpie to test our endpoint, now on your terminal start the app by running flask run and open a new terminal
and paste the following on your terminal.

  • Create a new user
http POST :5000/v1/user/register/ username="oluchiy" email="oluch@gmail.com" password="445hD#@%yu78"

You would notice the following.

We successfully created a new user

  •  Let’s try creating a user with a weak password and an existing username.

Run the following commands on your terminal.

http POST :5000/v1/user/register/ username="olu" email="olu@gmail.com" password="445h"

http POST :5000/v1/user/register/ username="oluchiy" email="oluchlynda@gmail.com" password="445hD#@%yu78"

Our actions were not successful.

 

  • Log in our user and get access tokens
http POST :5000/v1/user/login/ email="oluch@gmail.com" password="445hD#@%yu78"

httpie_testing

  • Get our user biodata, NOTE THAT  this URL requires access token, copy the access_token generated above.
http GET http://127.0.0.1:5000/v1/user/user_info/  "Authorization:Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1OTUxODcwOTYsIm5iZiI6MTU5NTE4NzA5NiwianRpIjoiYzJkMmY1ZDEtMzAzYS00ZmMxLTgwMTYtNzFiYmRkZjlhOWVkIiwiZXhwIjoxNTk1MTg3OTk2LCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.bl45kpKOTpcWk0c9AE7_WogGQjxgRu9jORQ5Sp8zjh0"

httpie_testing

 

  • Obtaining a new access token using a refresh token

Copy the refresh token generated and run the following on your terminal

http POST http://127.0.0.1:5000/v1/user/refresh_token/  "email=oluch@gmail.com"  "password=445hD#@%yu78" "Authorization:Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1OTUxODcwOTYsIm5iZiI6MTU5NTE4NzA5NiwianRpIjoiNDRiMzJlOTMtMDg2NC00MzMxLTk0NWQtY2ZlYzgxMmYyNmY2IiwiZXhwIjoxNTk3Nzc5MDk2LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.t_ukMbvboODibtslOm-scfJeHfsD22Gvu88pxnZeV8U"

httpie_testing

  • Finally, logout our user and blacklist the tokens.

Copy the access token generated above and run this on your terminal

http POST http://127.0.0.1:5000/v1/user/signout_access/  "email=oluch@gmail.com"  "password=445hD#@%yu78" "Authorization:Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1OTUxODg3MTYsIm5iZiI6MTU5NTE4ODcxNiwianRpIjoiYmQ2M2E3MTctMGQ5Ny00ZTZiLTlkZWEtOWIzOWNiZGIwYTY5IiwiZXhwIjoxNTk1MTg5NjE2LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.nVScMyaJBAd0ynSvuQMjbRIFdaEpRMQo3OlziHw0HHo"

httpie_testing

We are done with our 3rd post on my Flask series, next post I would be implementing access control on our CRUD operations (meaning only registered users can sign in, create, update, delete and view diary notes).

The link to Github tutorial is here

Until then happy coding 😊

2 Comments

  • Hi oluchi, I think your articles are great .They’ve been insightful thus far, however I think they still need reviewing, just to check for bugs. I ran into a circular import error, in the app.py module. Also there’s a typo
    “Create a new file user.py in the *api folder, and copy the code below into it.” instead it was imported from the resources directory. Thanks for the articles I’m going to follow them through to the end 😉,
    Cheers!

Leave a Reply

Your email address will not be published. Required fields are marked *