Flask-JWT-ExtendedでSPA(Vue.js+Flask)のAPI認可を実装する

こんにちは。CTO室の神沼(@t_kanuma)です。
本稿はPythonのWebフレームワーク”Flask“のJWT周りのライブラリである
Flask-JWT-Extended“を使ったAPI認可の実装に関する記事です。

題材

過去にVuetifyを試したときに作った簡単なメモアプリを流用しました。
そのバックエンドをFlaskで実装します。


アプリケーションの構成

バージョン

・ Python : 3.9.8
・ Flask : 2.0.2
・ Flask-JWT-Extended : 4.3.1

インストール

筆者、pipenvを使っています。

pipenv install flask-jwt-extended

初期化

SQLAlchemyやMarshmallowと同様です。

# main/__init__.py (app.pyです)
from flask_jwt_extended import JWTManager

jwt = JWTManager()
def create_app():
    app = Flask(__name__)
    app.config.from_object('config')
    # omit...
    jwt.init_app(app)
    return app

コンフィグ

このアプリをデプロイして公開する気は全くないのですが、XSSとCSRF対策のためJWTをCookieに保管します。(保管場所のセキュリティについては、この記事がとてもためになりました。)

# config.py

# 署名鍵(対称鍵)
# デフォルトのアルゴリズムはHS256
JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY')

# デフォルトは15分
JWT_ACCESS_TOKEN_EXPIRES = datetime.timedelta(minutes=10)

JWT_TOKEN_LOCATION = ['cookies']

# CSRF対策 - same siteにのみCookieを送信する。
JWT_COOKIE_SAMESITE = 'Strict'

# XSS対策 - デフォルトでhttponly=Trueのため、設定不要

# httpsでのみCookieを送信する。ローカル環境のためFalseの設定。
JWT_COOKIE_SECURE = os.getenv('JWT_COOKIE_SECURE')

ログイン処理

JWT生成

# auth/controller.py
from flask_jwt_extended import (create_access_token, set_access_cookies,
                                unset_access_cookies, verify_jwt_in_request)
from main import jwt   # 上で宣言済み

@auth.route('/login', methods=['POST'])
def authenticate() -> Tuple[Response, int]:
    app.logger.info('login started')

    req_data: Any = request.get_json()

    user_name_to_auth: Optional[str] = req_data['user_name']
    password_to_auth: Optional[str] = req_data['password']

    user = User.query.filter(User.user_name == user_name_to_auth)\
        .filter(User.password == password_to_auth).one_or_none()

    if user is not None:
        app.logger.info('login succeeded')
        # アクセストークンの生成
        access_token = create_access_token(
            identity=user_schema.dump(user))
        response = make_response()
        # クッキーにアクセストークンを配置する。
        set_access_cookies(response, access_token)
        return response, 200
    else:
        app.logger.info('login failed')
        return jsonify({'msg': 'Wrong login id or password.'}), 401

コールバック登録

このライブラリではアクセストークン作成時やリソースアクセス時に起動するデコレーター群を用意しています。(詳細はこちらをご確認ください。)

# auth/controller.py

# JWTのsubject claimを決める。
# returnの内容がjwtの"sub"に入る。
# create_access_token(..)実行時に起動する。
@jwt.user_identity_loader
def identity_user(user):
    return user['id']

# claimを追加する。
# create_access_token(..)実行時に起動する。
@jwt.additional_claims_loader
def add_claims_to_access_token(identity):
    return {
        'department': 'CTO Office',
        'manager_role_attached': False
    }

# @jwt_requiredが付与されている関数の実行前に起動する。
# (つまりリソースにアクセスした時)
# @jwt_requiredを付与した関数の中で、current_user変数でここでreturnする値を参照できる。
@jwt.user_lookup_loader
def lookup_user(_jwt_header, jwt_data):
    identity = jwt_data["sub"]
    return User.query.filter_by(id=identity).one_or_none()

その他

CORS対応

Flask-CORSをインストールして使います。

# python側
# main/__init__.py (app.pyです)
from flask_cors import CORS

def create_app():
    app = Flask(__name__)

    # omit...

    # origins -> レスポンスヘッダにAccess-Control-Allow-Originを付与する。
    # supports_credentials -> レスポンスヘッダにAccess-Control-Allow-Credentialsを付与する。 フロントエンドがCookie(アクセストークン)をAPIに送信できるようになる。
    CORS(app, resources={
     '/*': {'origins': os.getenv('FRONTEND_ORIGIN'), 'supports_credentials': True}})

    # omit...
// Vue側のログイン処理
// views/Login.vue

// omit...
methods: {
    async login() {
      if (this.$refs.form.validate()) {
          const response = await fetch(
            `${process.env.VUE_APP_API_BASE_URL}/auth/login`,
            {
              method: "POST",
             // Cookie(アクセストークン)をAPIに送るため
              credentials: "include",
              headers: {
                "Content-Type": "application/json",
              },
              body: JSON.stringify({
                user_name: this.loginName,
                password: this.password,
              }),
            }
          );

          if (response.ok) {
            this.$router.push("/");
          } else {
            // TODO 401でログイン画面にエラーメッセージを表示する。
            // その他はエラーページに遷移する。
          }
      }
    },
    // omit...

ログイン処理の検証

ログイン処理成功後、APIは以下の2つのCookie項目を返します。

access_token_cookie=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTYzODg1MDU0NiwianRpIjoiYmZjZDdkZGItNGI0NC00MzI4LWJlZDAtZDdjN2I3ZGRmN2U1IiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MSwibmJmIjoxNjM4ODUwNTQ2LCJjc3JmIjoiNmE2YWVkZDktNjgxMi00NzU0LTljOTQtNTYxMDBkMWI4YTFmIiwiZXhwIjoxNjM4ODUxMTQ2LCJkZXBhcnRtZW50IjoiQ1RPIE9mZmljZSIsIm1hbmFnZXJfcm9sZV9hdHRhY2hlZCI6ZmFsc2V9.awA_wTX9mLnOBVHvVGCbVDmP0xBgOiKYVyUtwZVVrkQ; Secure; HttpOnly; Path=/; SameSite=Strict

csrf_access_token=6a6aedd9-6812-4754-9c94-56100d1b8a1f; Secure; Path=/; SameSite=Strict

JWT形式のアクセストークン

1つ目はJWT形式のアクセストークンです。以下、このサイトでデコードした結果です。  
(ヘッダーとペイロード)

{
  "typ": "JWT",
  "alg": "HS256"
}
{
  "fresh": false,
  "iat": 1638850546,
  "jti": "bfcd7ddb-4b44-4328-bed0-d7c7b7ddf7e5",
  "type": "access",
  "sub": 1,
  "nbf": 1638850546,
  "csrf": "6a6aedd9-6812-4754-9c94-56100d1b8a1f",
  "exp": 1638851146,
  "department": "CTO Office",
  "manager_role_attached": false
}

subにユーザIDが入っていること、追加したclaimが下部に入っていることが確認できます。

CSRFトークン

このライブラリはデフォルトでCSRFトークンを付与してくれます。
またJWTの方にも同じCSRFトークンが入っていることが確認できます。

これを次のリクエストでヘッダーの”X-CSRF-TOKEN”に設定して送信する必要があります。 そうすることで、サーバ側でこのライブラリが、リクエストヘッダーとJWTの中のCSRFトークンが同じであることチェックすることができるわけです。これによりそのリクエストがクライアントにとって意図したものであると確証が取れます。

ここで2種類のクライアントを考えてみます。

1. クライアントとサーバのドメインが同じ

このメモアプリサイトと同じ構成です。JWTはXSS対策でHttpOnlyがついていますが、CSRFトークンにはついていないため、このメモアプリサイトのJavaScriptから取得可能です。

ただ、今回のケースで言えば、CookieにSameSite=Strictを設定しており、かつクライアントがこのフロントエンドしか存在しないため、このCSRFトークンの仕組みがなくても、CSRF攻撃は起きません。下記No.2のようにSame Site以外のクライアントを持つ場合、このCSRFトークンが必要になると考えます。

2. クライアントとサーバのドメインが異なる

OAuthでの3rd Party Clientがクライアントになる場合を考えます。この場合、CookieではSameSite=Noneにして、CSRFトークンの仕組みを導入する必要があると考えます。

注意点として、このライブラリのデフォルトの設定だとCSRFトークンがCookieとして返ってきてしまうため、異なるドメインの3rd Party ClientがCSRFトークンを参照することができません。そこでコンフィグにてJWT_CSRF_IN_COOKIES = Falseを設定する必要があると考えます。そうした上でサーバサイドでCSRFトークンをHTTPレスポンスボディもしくはカスタムレスポンスヘッダに設定して返すことで、正当な3rd Party Clientからは正常にリソースにアクセスすることができ、攻撃者からのアクセスを防げると考えます。

リソースアクセス

認可処理

jwt_requiredアノテーションを付与することで、それを付与した関数の実行前に、 JWTが有効あるかを検証してくれます。

またcurrent_userで上記のコールバック(jwt.user_lookup_loader)で返却する値を取得できます。以下ではJWTの中のユーザIDを元にDBアクセスをしています。

コメントアウトしていますが(e.g. authorizationの部分)、Claimを取得してその内容を元に権限チェックができます。

# Python側
# memo/controller.py
from flask_jwt_extended import current_user, jwt_required

# omit...

@memo.route('', methods=['GET'])
@jwt_required()
def get_memos() -> Tuple[Response, int]:
    app.logger.info('get_memos started')

    # e.g. authorization
    # claims = get_jwt()
    # if claims['manager_role_attached'] is False:
    #     return jsonify({'msg': 'access to this resource is not allowed'}), 403

    memos: List[Memo] = Memo.query.filter(Memo.user_id == current_user.id).\
        filter(Memo.archived == false()).order_by(Memo.id.asc()).all()
    return jsonify(memo_schema.dump(memos)), 200
// Vue側
// views/MemoList.vue

// omit...
  methods: {
    async showMemos() {
        const response = await fetch(
          `${process.env.VUE_APP_API_BASE_URL}/memos`,
          {
            cache: "no-store",
            method: "GET",
            credentials: "include",
            headers: {
              "X-CSRF-TOKEN": this.$cookies.get("csrf_access_token"),
            },
          }
        );

        if (response.ok) {
             this.memos = await reponse.json();
        } else {
            // TODO 不正なトークンにより401が発生する場合があり得る。
            // その場合ログイン画面に遷移、その他の場合はエラー投げる
        }
    },

カスタムデコレーター

各関数に同じ認可処理を実装するためにはカスタムデコレータが有用です。
自作のアノテーションを付与することで認可処理をAOPできます。
詳細はここをご確認ください。

ログアウト(JWTの破棄)

ログアウトしたのにCookieにまだ有効なJWTを残すのはセキュリティ上よくないと考えます。
ここでXSS対策によりJavaScriptからCookie(JWT)を読み取ることはできません。
フロントエンドから要求して、API側で破棄します。

# auth/controller.py
from flask_jwt_extended import (create_access_token, set_access_cookies,
                                unset_access_cookies, verify_jwt_in_request)

# omit...

@auth.route('/logout', methods=['POST'])
def logout():
    app.logger.info('logout started.')
    response: Response = make_response()
    # cookieから取り除く。
    unset_access_cookies(response)
    return response, 200

画面遷移などのイベントでの有効性チェック

JWTが無効(有効期限切れ、未発行など)な状態での画面遷移、URL直打ちについてもログイン画面に遷移させています。(ここではそうではありませんが、良いUXのために有効期限切れの場合はリフレッシュトークンでアクセストークンをリフレッシュするのが通例だと思います。)

以下、Vueで画面遷移またはURL直打ちの際に、遷移先の処理を開始する前にインターセプトしてAPI側で有効性をチェックしています。無効の場合、ログイン画面にリダイレクトします。

// router/index.js
import auth from "@/modules/auth.js";

// omit...
router.beforeEach(async (to, from, next) => {
  if (to.matched.some((record) => record.meta.requiresAuth)) {
    if (await isLoggedIn()) {
      next();
    } else {
      next({
        path: "/login",
      });
    }
  } else {
    next();
  }
});
// modules/auth.js
export default {
  _getCookie: (name) => {
    const value = `; ${document.cookie}`;
    const parts = value.split(`; ${name}=`);
    if (parts.length === 2) return parts.pop().split(";").shift();
  },
  async isLoggedIn() {
      const response = await fetch(
        `${process.env.VUE_APP_API_BASE_URL}/auth/status`,
        {
          cache: "no-store",
          method: "GET",
          credentials: "include",
          headers: {
            "X-CSRF-TOKEN": this._getCookie("csrf_access_token"),
          },
        });
      return response.ok;
  },
// omit...
# auth/controller.py
from flask_jwt_extended import (create_access_token, set_access_cookies,
                                unset_access_cookies, verify_jwt_in_request)

# omit...

@auth.route('/status', methods=['GET'])
def isLoggedIn():
    app.logger.info('isloggedIn started.')
    # アノテーションでなく、以下のメソッド呼び出しで検証する。
    verify_jwt_in_request()
    return make_response(), 200

公開鍵ペアによる署名と検証

OAuthのような認可サーバとリソースサーバが別れている場合は、対称鍵ではなく公開鍵ペアでの署名と検証をすると思います。そのような場合は、コンフィグのJWT_PRIVATE_KEY
とJWT_PUBLIC_KEY項目で鍵を設定できます。

リフレッシュトークン

本稿ではリフレッシュトークンについてはスコープ外としました。

以上でこの記事を終わります。
どなたかのお役に立てたならば幸いです。
最後までお読みいただき、ありがとうございました。