FlaskAPIでJWT認証を実装する(後編)

はじめに

Pythonでお馴染みの(そろそろ言いたかった)エンジニアの眞嶋です。

今回は初の2部構成でJWT認証についてお送りしております。

前編ではJWTとは何かについてお話ししましたので、後編ではいよいよ本題のFlaskAPIへの組み込み実装について解説していきます。

FlaskAPIの構成

FlaskでAPIサーバを構築するとき、大きく2つのルーティング実装があります。

  1. Blueprint(Flask)によるルーティング
  2. Resource(Flask-Restful)によるルーティング

今回は拡張性などを踏まえて2のResourceを使った実装で説明していきます。

ベースのプログラム

from flask import Flask, make_response, jsonify
from flask_restx import Api, Resource


def create_app():
    app = Flask(__name__)

    return app


app = create_app()
api = Api(app, doc='/documents/')


@api.route('/no-secure')
class NoSecureController(Resource):
    def get(self):
        return make_response(jsonify({'message': 'This is NonSecure Endpoint.'}), 200)


if __name__ == '__main__':
    app.run(debug=True)

さて、最初に「Flask-Restful」と言ってたのになんか違うくない!?と思ったあなた、正解です。

実は「Flask-Restful」自体の開発・サポートは終了してしまっているため、その後継である「Flask-Restx」を使用しています。

既存の記事ではRestfulの使用がまだまだ多いため、通りは良いですが、実装する際にはRestxの方を使うようにしましょう。

ユーザー認証の下準備

通常は複数のファイルに分けてディレクトリ構成を考えるのですが、説明の都合上、1つのファイルに全て詰め込んで説明していきます。

1.ユーザー情報

本来はデータベースから取得しますが、サンプルコードの簡易化のために簡易的なユーザー情報を用意します。

users = [
    {'user_id': 'U0000001', 'login_id': 'user1@example.com', 'password': 'password123', 'name': '山田太郎'},
    {'user_id': 'U0000002', 'login_id': 'user2@example.com', 'password': 'password456', 'name': '鈴木一郎'},
    {'user_id': 'U0000003', 'login_id': 'user3@example.com', 'password': 'password789', 'name': '斉藤花子'},
]

2.認証処理

def authenticate(login_id, password):
    auth_user = next(
        (user for user in users
            if (user.get('login_id') == login_id and user.get('password') == password)),
        None)

    return auth_user

先ほど作成したユーザーリストを順番に検証し、ログインIDとパスワードが一致しているユーザーを取得して返却するメソッドです。

認証の結果、対象のユーザーがいなかった場合を検知するため、該当するユーザーがいないときはNoneを返すようにしています。

ユーザー認証を実装してみる

先ほど作成したメソッドを使って、実際にユーザー認証を実装してみましょう。

まずはログイン用のエントリポイントを作成します。

この段階では認証処理(authenticateメソッド)が正常に動いているかを確認するだけで良いので、以下のような簡易のものでOKです。

@api.route('/login')
class LoginController(Resource):
    def post(self):
        input_data = request.json
        auth_user = authenticate(input_data.get('login_id'), input_data.get('password'))
        if auth_user is None:
            return make_response(jsonify({'message': 'Invalid User.'}), 401)

        return make_response(jsonify(auth_user), 200)

http://127.0.0.1:5000/loginにPOSTメソッドでリクエストを送ると、成功時にはユーザー情報、失敗時には「Invalid User.」というメッセージが返ってくることを確認してみましょう。

正常に動作していたら、ついにアクセストークンの発行を行っていきます。

といっても実はアクセストークンの発行自体はメソッドが用意されているため、subクレームに設定する一意識別子だけ指定すれば簡単に発行することができます。

from flask_jwt_extended import create_access_token

@api.route('/login')
class LoginController(Resource):
    def post(self):
        input_data = request.json
        auth_user = authenticate(input_data.get('login_id'), input_data.get('password'))
        if auth_user is None:
            return make_response(jsonify({'message': 'Invalid User.'}), 401)

        access_token = create_access_token(identity=auth_user['user_id'])
        return make_response({'access_token': access_token}, 200)

アクセストークンを利用した認証

認証必須とするにはエントリポイントに「@jwt_required()」と追記するだけでOKです。

下記のような感じですね。

from flask_jwt_extended import jwt_required

@api.route('/secure')
class SecureController(Resource):
    @jwt_required()
    def get(self):
        return make_response(jsonify({'message': 'This is Secure Endpoint.'}), 200)

http://127.0.0.1:5000/secureにアクセスしてみるとエラーレスポンスが返却されていることが確認できます。

このエントリポイントにアクセスするためには、先ほど作成した/loginエントリポイントからアクセストークンを取得し、リクエストヘッダーに設定したうえでアクセスする必要があります。

Bearerトークンの形式でセットしてリクエストしてみましょう。

正常にアクセスできていることが確認できました。

まとめ

ここまでを集約したプログラム全体を記載しておきます。

from flask import Flask, make_response, jsonify, request
from flask_restx import Api, Resource
from flask_jwt_extended import JWTManager, jwt_required, create_access_token


def create_app():
    app = Flask(__name__)
    app.config['JWT_SECRET_KEY'] = 'SecretKey'

    return app


app = create_app()
api = Api(app, doc='/documents/')
jwt = JWTManager(app)

users = [
    {'user_id': 'U0000001', 'login_id': 'user1@example.com','password': 'password123', 'name': '山田太郎'},
    {'user_id': 'U0000002', 'login_id': 'user2@example.com','password': 'password456', 'name': '鈴木一郎'},
    {'user_id': 'U0000003', 'login_id': 'user3@example.com','password': 'password789', 'name': '斉藤花子'},
]


def authenticate(login_id, password):
    auth_user = next(
        (user for user in users
            if (user.get('login_id') == login_id and user.get('password') == password)),
        None)

    return auth_user


@api.route('/no-secure')
class NoSecureController(Resource):
    def get(self):
        return make_response(jsonify({'message': 'This is NonSecure Endpoint.'}), 200)

@api.route('/login')
class LoginController(Resource):
    def post(self):
        input_data = request.json
        auth_user = authenticate(input_data.get('login_id'), input_data.get('password'))
        if auth_user is None:
            return make_response(jsonify({'message': 'Invalid User.'}), 401)

        access_token = create_access_token(identity=auth_user['user_id'])
        return make_response({'access_token': access_token}, 200)

@api.route('/secure')
class SecureController(Resource):
    @jwt_required()
    def get(self):
        return make_response(jsonify({'message': 'This is Secure Endpoint.'}), 200)

if __name__ == '__main__':
    app.run(debug=True)

おまけ

各エントリポイントの動作検証に使用したPostmanのリクエスト集を添付しておきますので、インポートして試してみてください。

成功時、失敗時のレスポンス情報も設定してあるので見るだけでもイメージがわくと思います。

https://drive.google.com/uc?id=1U3vh1ZpyvFZxB9Nw7aNt-SaywQ3SeSsP

さいごに

長くなってしまいましたが、最後までご覧いただきありがとうございました。

今後もPythonやAPIサーバに関する記事を中心に投稿していきますので、引き続きよろしくお願いいたします。

関連記事

プロジェクトストーリー

技術

コメント

この記事へのコメントはありません。

カテゴリー

TOP
TOP