don't stop believing

Flask로 API 서버 만들기 (6) - Security and Authentication 본문

Python/Flask

Flask로 API 서버 만들기 (6) - Security and Authentication

Tongchun 2018. 11. 12. 19:30

Developing API Sample Server by Flask 

Original Post: How to structure a Flask-RESTPlus web service for production builds

Github: https://github.com/cosmic-byte/flask-restplus-boilerplate


Flask로 API 서버 만들기 (1) - 개발 환경 준비

Flask로 API 서버 만들기 (2) - config 와 실행 확인

Flask로 API 서버 만들기 (3) - User 테이블 만들기

Flask로 API 서버 만들기 (4) - Testing

Flask로 API 서버 만들기 (5) - User Operations

Flask로 API 서버 만들기 (6) - Security and Authentication

Flask로 API 서버 만들기 (7) - Route protection and Authorization

Flask로 API 서버 만들기 (8) - Extra tips (Makefiles)


이번에는 인증 토큰을 이용한 로그인과 로그아웃, 그리고 블랙리스트 등록을 해보겠습니다.

먼저 블랙리스트 db만들겠습니다. models 폴더 아래 blacklist.py 파일을 만들어 줍니다.

$ sudo vim ./app/main/model/blacklist.py

blacklist.py 파일에 아래와 같이 추가합니다.

from app.main import db
import datetime


class BlacklistToken(db.Model):
    """
    Token Model for storing JWT tokens
    """
    __tablename__ = 'blacklist_tokens'

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    token = db.Column(db.String(500), unique=True, nullable=False)
    blacklisted_on = db.Column(db.DateTime, nullable=False)

    def __init__(self, token):
        self.token = token
        self.blacklisted_on = datetime.datetime.now()

    def __repr__(self):
        return '<id: token: {}'.format(self.token)

    @staticmethod
    def check_blacklist(auth_token):
        # check whether auth token has been blacklisted
        res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
        if res:
            return True
        else:
            return False

blacklist_tokens라는 테이블을 만들고 id, token, blacklisted_on 컬럼을 만듭니다.


manage.py 파일에 blacklist 모듈을 import해줍니다.

from app.main.model import user, blacklist

$ sudo vim manage.py

from app.main.model import user 다음에 blacklist를 추가해 줍니다.

import os
import unittest

from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager

from app import blueprint
from app.main import create_app, db
from app.main.model import user, blacklist


app = create_app(os.getenv('BOILERPLATE_ENV') or 'dev')
app.register_blueprint(blueprint)

app.app_context().push()

manager = Manager(app)

migrate = Migrate(app, db)

manager.add_command('db', MigrateCommand)

@manager.command
def run():
        app.run(host='0.0.0.0')

@manager.command
def test():
        """Runs the unit tests."""
        tests = unittest.TestLoader().discover('app/test', pattern='test*.py')
        result = unittest.TextTestRunner(verbosity=2).run(tests)
        if result.wasSuccessful():
                return 0
        return 1

if __name__ == '__main__':
        manager.run()

db를 변경했으니 migrate와 upgrade 명령을 실행해줍니다.

$ python manage.py db migrate --message 'add blacklist table'
$ python manage.py db upgrade

그 다음 service 폴더 아래 blacklist_service.py 파일을 만들어 줍니다.

$ sudo vim ./app/main/service/blacklist_service.py

blacklist_ervice.py 파일에 아래와 같이 추가합니다.

from app.main import db
from app.main.model.blacklist import BlacklistToken


def save_token(token):
    blacklist_token = BlacklistToken(token=token)
    try:
        # insert the token
        db.session.add(blacklist_token)
        db.session.commit()
        response_object = {
            'status': 'success',
            'message': 'Successfully logged out.'
        }
        return response_object, 200
    except Exception as e:
        response_object = {
            'status': 'fail',
            'message': e
        }
        return response_object, 200

save_token 함수를 호출하면 blacklist_token 테이블에 token을 저장합니다.


이번에는 model 폴더 아래있는 user.py에 token을 encoding 과 decoding 하는 method를 추가합니다.

encode/decode에 필요한 모듈도 추가해 줍니다.


import datetime

import jwt

from app.main.model.blacklist import BlacklistToken

from ..config import key

$ sudo vim ./app/main/model/user.py

추가되는 method는 def encode_auth_token(self, user_id):, def decode_auth_token(auth_token): 두 개 입니다.

from .. import db, flask_bcrypt
import datetime
from app.main.model.blacklist import BlacklistToken
from ..config import key
import jwt


class User(db.Model):
    """ User Model for storing user related details """
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    email = db.Column(db.String(255), unique=True, nullable=False)
    registered_on = db.Column(db.DateTime, nullable=False)
    admin = db.Column(db.Boolean, nullable=False, default=False)
    public_id = db.Column(db.String(100), unique=True)
    username = db.Column(db.String(50), unique=True)
    password_hash = db.Column(db.String(100))

    @property
    def password(self):
        raise AttributeError('password: write-only field')

    @password.setter
    def password(self, password):
        self.password_hash = flask_bcrypt.generate_password_hash(password).decode('utf-8')

    def check_password(self, password):
        return flask_bcrypt.check_password_hash(self.password_hash, password)

  
    def encode_auth_token(self, user_id):
        """
        Generates the Auth Token
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1, seconds=5),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                key,
                algorithm='HS256'
            )
        except Exception as e:
            return e

    @staticmethod
    def decode_auth_token(auth_token):
        """
        Decodes the auth token
        :param auth_token:
        :return: integer|string
        """
        try:
            payload = jwt.decode(auth_token, key)
            is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
            if is_blacklisted_token:
                return 'Token blacklisted. Please log in again.'
            else:
                return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired. Please log in again.'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please log in again.'

    def __repr__(self):
        return "<user '{}'="">".format(self.username)</user>

기능이 추가되었으니 test를 해보겠습니다.

test 폴더에 base.py 파일을 생성합니다.

$ sudo vim ./app/test/base.py

base.py 파일에 아래와 같이 테스에 사용할 session을 만드는 script를 추가합니다.

from flask_testing import TestCase
from app.main import db
from manage import app


class BaseTestCase(TestCase):
    """ Base Tests """

    def create_app(self):
        app.config.from_object('app.main.config.TestingConfig')
        return app

    def setUp(self):
        db.create_all()
        db.session.commit()

    def tearDown(self):
        db.session.remove()
        db.drop_all()

이제 test 폴더 안에 test_user_medol.py 파일을 만듭니다.

$ sudo vim ./app/test/test_user_medol.py

test_user_medol.py에 테스트 script를 추가합니다.

import unittest
import datetime

from app.main import db
from app.main.model.user import User
from app.test.base import BaseTestCase


class TestUserModel(BaseTestCase):

    def test_encode_auth_token(self):
        user = User(
            email='test@test.com',
            password='test',
            registered_on=datetime.datetime.utcnow()
        )
        db.session.add(user)
        db.session.commit()
        auth_token = user.encode_auth_token(user.id)
        self.assertTrue(isinstance(auth_token, bytes))

    def test_decode_auth_token(self):
        user = User(
            email='test@test.com',
            password='test',
            registered_on=datetime.datetime.utcnow()
        )
        db.session.add(user)
        db.session.commit()
        auth_token = user.encode_auth_token(user.id)
        self.assertTrue(isinstance(auth_token, bytes))
        self.assertTrue(User.decode_auth_token(auth_token.decode("utf-8") ) == 1)


if __name__ == '__main__':
    unittest.main()

파일을 저장하고 python manage.py test 명령으로 test를 실행합니다.

$ python manage.py test
test_app_is_development (test_config.TestDevelopmentConfig) ... ok
test_app_is_production (test_config.TestProductionConfig) ... ok
test_app_is_testing (test_config.TestTestingConfig) ... ok
test_decode_auth_token (test_user_medol.TestUserModel) ... ok
test_encode_auth_token (test_user_medol.TestUserModel) ... ok

----------------------------------------------------------------------
Ran 5 tests in 1.069s

OK


이제 login/logout을 만들 차례입니다.

먼저 dto에 login/out에서 사용할 field를 만들어 줍니다.

$ sudo vim ./app/main/util/dto.py 

dto.py 파일을 열고 아래와 같이 AuthDto 클래스를 추가해 줍니다.

from flask_restplus import Namespace, fields
  
class UserDto:
    api = Namespace('user', description='user related operations')
    user = api.model('user', {
        'email': fields.String(required=True, description='user email address'),
        'username': fields.String(required=True, description='user username'),
        'password': fields.String(required=True, description='user password'),
        'public_id': fields.String(description='user Identifier')
    })
    
class AuthDto:
    api = Namespace('auth', description='authentication related operations')
    user_auth = api.model('auth_details', {
        'email': fields.String(required=True, description='The email address'),
        'password': fields.String(required=True, description='The user password '),
    })

다음으로 로그인 로직을 처리하는 auth helper를 만들어 줍니다.

$ sudo vim ./app/main/service/auth_helper.py

아래와 같이 스크립트를 작성합니다.

유저가 로그아웃을 하면 사용된 토큰은 blacklist로 들어가고 같은 토큰으로는 사용할 수 없게 됩니다.

from app.main.model.user import User
from ..service.blacklist_service import save_token


class Auth:

    @staticmethod
    def login_user(data):
        try:
            # fetch the user data
            user = User.query.filter_by(email=data.get('email')).first()
            if user and user.check_password(data.get('password')):
                auth_token = user.encode_auth_token(user.id)
                if auth_token:
                    response_object = {
                        'status': 'success',
                        'message': 'Successfully logged in.',
                        'Authorization': auth_token.decode()
                    }
                    return response_object, 200
            else:
                response_object = {
                    'status': 'fail',
                    'message': 'email or password does not match.'
                }
                return response_object, 401

        except Exception as e:
            print(e)
            response_object = {
                'status': 'fail',
                'message': 'Try again'
            }
            return response_object, 500

    @staticmethod
    def logout_user(data):
        if data:
            auth_token = data.split(" ")[1]
        else:
            auth_token = ''
        if auth_token:
            resp = User.decode_auth_token(auth_token)
            if not isinstance(resp, str):
                # mark the token as blacklisted
                return save_token(token=auth_token)
            else:
                response_object = {
                    'status': 'fail',
                    'message': resp
                }
                return response_object, 401
        else:
            response_object = {
                'status': 'fail',
                'message': 'Provide a valid auth token.'
            }
            return response_object, 403

 handler를 만들었으니 이제는 controller를 만들 차례입니다.

controller 폴더에 auth_controller.py를 만들어 줍니다.

$ sudo vim ./app/main/controller/auth_controller.py

API 서버에서 사용할 login과 logout을 만들어 줍니다.

from flask import request
from flask_restplus import Resource

from app.main.service.auth_helper import Auth
from ..util.dto import AuthDto

api = AuthDto.api
user_auth = AuthDto.user_auth


@api.route('/login')
class UserLogin(Resource):
    """
        User Login Resource
    """
    @api.doc('user login')
    @api.expect(user_auth, validate=True)
    def post(self):
        # get the post data
        post_data = request.json
        return Auth.login_user(data=post_data)


@api.route('/logout')
class LogoutAPI(Resource):
    """
    Logout Resource
    """
    @api.doc('logout a user')
    def post(self):
        # get auth token
        auth_header = request.headers.get('Authorization')
        return Auth.logout_user(data=auth_header)

마지막으로 swagger로 API 문서를 추가하기 위해 ./app/__init__.py에 auth_controller 패키지의 api 모듈을 추가해 줍니다.

$ sudo vim ./app/__init__.py

추가되는 module과 namespace는 아래와 같습니다.

from .main.controller.auth_controller import api as auth_ns

...

api.add_namespace(auth_ns)

# app/__init__.py
from flask_restplus import Api
from flask import Blueprint

from .main.controller.user_controller import api as user_ns
from .main.controller.auth_controller import api as auth_ns

blueprint = Blueprint('api', __name__)

api = Api(blueprint,
          title='FLASK RESTPLUS API BOILER-PLATE WITH JWT',
          version='1.0',
          description='a boilerplate for flask restplus web service'
          )

api.add_namespace(user_ns, path='/user')
api.add_namespace(auth_ns)

이제 API 서버를 실행하고 5000번 포트로 접속해 봅니다.

$ python manage.py run
 * Serving Flask app "app.main" (lazy loading)
 * Environment: development
 * Debug mode: on
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 145-484-411

브라우저를 열고 API 서버의 5000포트로 접속합니다.

추가한 auth의 login과 logout이 보입니다.


login/logout 기능이 추가되었으니 테스트를 해봐야겠죠. 테스트를 하기 전에 user_service.py에 generate_token method를 아래와 같이 추가해 줍니다.

$ sudo vim ./app/main/service/user_service.py
def generate_token(user):
    try:
        # generate the auth token
        auth_token = user.encode_auth_token(user.id)
        response_object = {
            'status': 'success',
            'message': 'Successfully registered.',
            'Authorization': auth_token.decode()
        }
        return response_object, 201
    except Exception as e:
        response_object = {
            'status': 'fail',
            'message': 'Some error occurred. Please try again.'
        }
        return response_object, 401

그 다음 user_service.py의 save_new_user method의 return 값을

response_object = {

    'status': 'success',

    'message': 'Successfully registered.'

}

return response_object, 201

이것이 아닌 아래와 같이 수정합니다.

return generate_token(new_user)


전체 user_service.py는 아래와 같습니다.

import uuid
import datetime

from app.main import db
from app.main.model.user import User


def save_new_user(data):
    user = User.query.filter_by(email=data['email']).first()
    if not user:
        new_user = User(
            public_id=str(uuid.uuid4()),
            email=data['email'],
            username=data['username'],
            password=data['password'],
            registered_on=datetime.datetime.utcnow()
        )
        save_changes(new_user)
        return generate_token(new_user)
    else:
        response_object = {
            'status': 'fail',
            'message': 'User already exists. Please Log in.',
        }
        return response_object, 409


def get_all_users():
    return User.query.all()


def get_a_user(public_id):
    return User.query.filter_by(public_id=public_id).first()

def generate_token(user):
    try:
        # generate the auth token
        auth_token = user.encode_auth_token(user.id)
        response_object = {
            'status': 'success',
            'message': 'Successfully registered.',
            'Authorization': auth_token.decode()
        }
        return response_object, 201
    except Exception as e:
        response_object = {
            'status': 'fail',
            'message': 'Some error occurred. Please try again.'
        }
        return response_object, 401

def save_changes(data):
    db.session.add(data)
    db.session.commit()

login/logout의 테스트 스크립트를 작성합니다.

개발에 있어서 테스트는 필수이며 자동화 되어야 합니다.

test 폴더에 test_auth.py를 생성합니다.

$ sudo vim ./app/test/test_auth.py

test_auth.py 파일에 아래와 같이 작성합니다.

import unittest
import json
from app.test.base import BaseTestCase


def register_user(self):
    return self.client.post(
        '/user/',
        data=json.dumps(dict(
            email='example@gmail.com',
            username='username',
            password='123456'
        )),
        content_type='application/json'
    )


def login_user(self):
    return self.client.post(
        '/auth/login',
        data=json.dumps(dict(
            email='example@gmail.com',
            password='123456'
        )),
        content_type='application/json'
    )


class TestAuthBlueprint(BaseTestCase):

    def test_registered_user_login(self):
            """ Test for login of registered-user login """
            with self.client:
                # user registration
                user_response = register_user(self)
                response_data = json.loads(user_response.data.decode())
                self.assertTrue(response_data['Authorization'])
                self.assertEqual(user_response.status_code, 201)

                # registered user login
                login_response = login_user(self)
                data = json.loads(login_response.data.decode())
                self.assertTrue(data['Authorization'])
                self.assertEqual(login_response.status_code, 200)

    def test_valid_logout(self):
        """ Test for logout before token expires """
        with self.client:
            # user registration
            user_response = register_user(self)
            response_data = json.loads(user_response.data.decode())
            self.assertTrue(response_data['Authorization'])
            self.assertEqual(user_response.status_code, 201)

            # registered user login
            login_response = login_user(self)
            data = json.loads(login_response.data.decode())
            self.assertTrue(data['Authorization'])
            self.assertEqual(login_response.status_code, 200)

            # valid token logout
            response = self.client.post(
                '/auth/logout',
                headers=dict(
                    Authorization='Bearer ' + json.loads(
                        login_response.data.decode()
                    )['Authorization']
                )
            )
            data = json.loads(response.data.decode())
            self.assertTrue(data['status'] == 'success')
            self.assertEqual(response.status_code, 200)

if __name__ == '__main__':
    unittest.main()

python manage.py test 명령으로 테스트를 해봅니다.

$ python manage.py test
test_registered_user_login (test_auth.TestAuthBlueprint)
Test for login of registered-user login ... ok
test_valid_logout (test_auth.TestAuthBlueprint)
Test for logout before token expires ... ok
test_app_is_development (test_config.TestDevelopmentConfig) ... ok
test_app_is_production (test_config.TestProductionConfig) ... ok
test_app_is_testing (test_config.TestTestingConfig) ... ok
test_decode_auth_token (test_user_medol.TestUserModel) ... ok
test_encode_auth_token (test_user_medol.TestUserModel) ... ok

----------------------------------------------------------------------
Ran 7 tests in 2.617s

OK


여기까지 인증에 대한 개발이었습니다.


Comments