일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- create table
- Jupyter Notebook
- 실행권한
- STF
- ubuntu
- port forwarding
- rethinkdb
- Jupyter
- STF_PortForwarding
- openpyxl
- 28015
- GoCD
- nmap
- nGrinder
- Materials
- ssh
- PYTHON
- mysql
- perfect
- appium server
- insert
- ftp
- centos
- SWIFT
- sshpass
- nohup
- appium
- postgresql
- kitura
- postgres
- Today
- Total
don't stop believing
Flask로 API 서버 만들기 (6) - Security and Authentication 본문
Developing API Sample Server by Flask
Original Post: How to structure a Flask-RESTPlus web service for production builds
Github: https://github.com/cosmic-byte/flask-restplus-boilerplate
Flask로 API 서버 만들기 (1) - 개발 환경 준비
Flask로 API 서버 만들기 (2) - config 와 실행 확인
Flask로 API 서버 만들기 (3) - User 테이블 만들기
Flask로 API 서버 만들기 (4) - Testing
Flask로 API 서버 만들기 (5) - User Operations
Flask로 API 서버 만들기 (6) - Security and Authentication
이번에는 인증 토큰을 이용한 로그인과 로그아웃, 그리고 블랙리스트 등록을 해보겠습니다.
먼저 블랙리스트 db만들겠습니다. models 폴더 아래 blacklist.py 파일을 만들어 줍니다.
$ sudo vim ./app/main/model/blacklist.py
blacklist.py 파일에 아래와 같이 추가합니다.
from app.main import db import datetime class BlacklistToken(db.Model): """ Token Model for storing JWT tokens """ __tablename__ = 'blacklist_tokens' id = db.Column(db.Integer, primary_key=True, autoincrement=True) token = db.Column(db.String(500), unique=True, nullable=False) blacklisted_on = db.Column(db.DateTime, nullable=False) def __init__(self, token): self.token = token self.blacklisted_on = datetime.datetime.now() def __repr__(self): return '<id: token: {}'.format(self.token) @staticmethod def check_blacklist(auth_token): # check whether auth token has been blacklisted res = BlacklistToken.query.filter_by(token=str(auth_token)).first() if res: return True else: return False
blacklist_tokens라는 테이블을 만들고 id, token, blacklisted_on 컬럼을 만듭니다.
manage.py 파일에 blacklist 모듈을 import해줍니다.
from app.main.model import user, blacklist
$ sudo vim manage.py
from app.main.model import user 다음에 blacklist를 추가해 줍니다.
import os import unittest from flask_migrate import Migrate, MigrateCommand from flask_script import Manager from app import blueprint from app.main import create_app, db from app.main.model import user, blacklist app = create_app(os.getenv('BOILERPLATE_ENV') or 'dev') app.register_blueprint(blueprint) app.app_context().push() manager = Manager(app) migrate = Migrate(app, db) manager.add_command('db', MigrateCommand) @manager.command def run(): app.run(host='0.0.0.0') @manager.command def test(): """Runs the unit tests.""" tests = unittest.TestLoader().discover('app/test', pattern='test*.py') result = unittest.TextTestRunner(verbosity=2).run(tests) if result.wasSuccessful(): return 0 return 1 if __name__ == '__main__': manager.run()
db를 변경했으니 migrate와 upgrade 명령을 실행해줍니다.
$ python manage.py db migrate --message 'add blacklist table' $ python manage.py db upgrade
그 다음 service 폴더 아래 blacklist_service.py 파일을 만들어 줍니다.
$ sudo vim ./app/main/service/blacklist_service.py
blacklist_ervice.py 파일에 아래와 같이 추가합니다.
from app.main import db from app.main.model.blacklist import BlacklistToken def save_token(token): blacklist_token = BlacklistToken(token=token) try: # insert the token db.session.add(blacklist_token) db.session.commit() response_object = { 'status': 'success', 'message': 'Successfully logged out.' } return response_object, 200 except Exception as e: response_object = { 'status': 'fail', 'message': e } return response_object, 200
save_token 함수를 호출하면 blacklist_token 테이블에 token을 저장합니다.
이번에는 model 폴더 아래있는 user.py에 token을 encoding 과 decoding 하는 method를 추가합니다.
encode/decode에 필요한 모듈도 추가해 줍니다.
import datetime
import jwt
from app.main.model.blacklist import BlacklistToken
from ..config import key
$ sudo vim ./app/main/model/user.py
추가되는 method는 def encode_auth_token(self, user_id):, def decode_auth_token(auth_token): 두 개 입니다.
from .. import db, flask_bcrypt import datetime from app.main.model.blacklist import BlacklistToken from ..config import key import jwt class User(db.Model): """ User Model for storing user related details """ __tablename__ = "user" id = db.Column(db.Integer, primary_key=True, autoincrement=True) email = db.Column(db.String(255), unique=True, nullable=False) registered_on = db.Column(db.DateTime, nullable=False) admin = db.Column(db.Boolean, nullable=False, default=False) public_id = db.Column(db.String(100), unique=True) username = db.Column(db.String(50), unique=True) password_hash = db.Column(db.String(100)) @property def password(self): raise AttributeError('password: write-only field') @password.setter def password(self, password): self.password_hash = flask_bcrypt.generate_password_hash(password).decode('utf-8') def check_password(self, password): return flask_bcrypt.check_password_hash(self.password_hash, password) def encode_auth_token(self, user_id): """ Generates the Auth Token :return: string """ try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1, seconds=5), 'iat': datetime.datetime.utcnow(), 'sub': user_id } return jwt.encode( payload, key, algorithm='HS256' ) except Exception as e: return e @staticmethod def decode_auth_token(auth_token): """ Decodes the auth token :param auth_token: :return: integer|string """ try: payload = jwt.decode(auth_token, key) is_blacklisted_token = BlacklistToken.check_blacklist(auth_token) if is_blacklisted_token: return 'Token blacklisted. Please log in again.' else: return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired. Please log in again.' except jwt.InvalidTokenError: return 'Invalid token. Please log in again.' def __repr__(self): return "<user '{}'="">".format(self.username)</user>
기능이 추가되었으니 test를 해보겠습니다.
test 폴더에 base.py 파일을 생성합니다.
$ sudo vim ./app/test/base.py
base.py 파일에 아래와 같이 테스에 사용할 session을 만드는 script를 추가합니다.
from flask_testing import TestCase from app.main import db from manage import app class BaseTestCase(TestCase): """ Base Tests """ def create_app(self): app.config.from_object('app.main.config.TestingConfig') return app def setUp(self): db.create_all() db.session.commit() def tearDown(self): db.session.remove() db.drop_all()
이제 test 폴더 안에 test_user_medol.py 파일을 만듭니다.
$ sudo vim ./app/test/test_user_medol.py
test_user_medol.py에 테스트 script를 추가합니다.
import unittest import datetime from app.main import db from app.main.model.user import User from app.test.base import BaseTestCase class TestUserModel(BaseTestCase): def test_encode_auth_token(self): user = User( email='test@test.com', password='test', registered_on=datetime.datetime.utcnow() ) db.session.add(user) db.session.commit() auth_token = user.encode_auth_token(user.id) self.assertTrue(isinstance(auth_token, bytes)) def test_decode_auth_token(self): user = User( email='test@test.com', password='test', registered_on=datetime.datetime.utcnow() ) db.session.add(user) db.session.commit() auth_token = user.encode_auth_token(user.id) self.assertTrue(isinstance(auth_token, bytes)) self.assertTrue(User.decode_auth_token(auth_token.decode("utf-8") ) == 1) if __name__ == '__main__': unittest.main()
파일을 저장하고 python manage.py test 명령으로 test를 실행합니다.
$ python manage.py test test_app_is_development (test_config.TestDevelopmentConfig) ... ok test_app_is_production (test_config.TestProductionConfig) ... ok test_app_is_testing (test_config.TestTestingConfig) ... ok test_decode_auth_token (test_user_medol.TestUserModel) ... ok test_encode_auth_token (test_user_medol.TestUserModel) ... ok ---------------------------------------------------------------------- Ran 5 tests in 1.069s OK
이제 login/logout을 만들 차례입니다.
먼저 dto에 login/out에서 사용할 field를 만들어 줍니다.
$ sudo vim ./app/main/util/dto.py
dto.py 파일을 열고 아래와 같이 AuthDto 클래스를 추가해 줍니다.
from flask_restplus import Namespace, fields class UserDto: api = Namespace('user', description='user related operations') user = api.model('user', { 'email': fields.String(required=True, description='user email address'), 'username': fields.String(required=True, description='user username'), 'password': fields.String(required=True, description='user password'), 'public_id': fields.String(description='user Identifier') }) class AuthDto: api = Namespace('auth', description='authentication related operations') user_auth = api.model('auth_details', { 'email': fields.String(required=True, description='The email address'), 'password': fields.String(required=True, description='The user password '), })
다음으로 로그인 로직을 처리하는 auth helper를 만들어 줍니다.
$ sudo vim ./app/main/service/auth_helper.py
아래와 같이 스크립트를 작성합니다.
유저가 로그아웃을 하면 사용된 토큰은 blacklist로 들어가고 같은 토큰으로는 사용할 수 없게 됩니다.
from app.main.model.user import User from ..service.blacklist_service import save_token class Auth: @staticmethod def login_user(data): try: # fetch the user data user = User.query.filter_by(email=data.get('email')).first() if user and user.check_password(data.get('password')): auth_token = user.encode_auth_token(user.id) if auth_token: response_object = { 'status': 'success', 'message': 'Successfully logged in.', 'Authorization': auth_token.decode() } return response_object, 200 else: response_object = { 'status': 'fail', 'message': 'email or password does not match.' } return response_object, 401 except Exception as e: print(e) response_object = { 'status': 'fail', 'message': 'Try again' } return response_object, 500 @staticmethod def logout_user(data): if data: auth_token = data.split(" ")[1] else: auth_token = '' if auth_token: resp = User.decode_auth_token(auth_token) if not isinstance(resp, str): # mark the token as blacklisted return save_token(token=auth_token) else: response_object = { 'status': 'fail', 'message': resp } return response_object, 401 else: response_object = { 'status': 'fail', 'message': 'Provide a valid auth token.' } return response_object, 403
handler를 만들었으니 이제는 controller를 만들 차례입니다.
controller 폴더에 auth_controller.py를 만들어 줍니다.
$ sudo vim ./app/main/controller/auth_controller.py
API 서버에서 사용할 login과 logout을 만들어 줍니다.
from flask import request from flask_restplus import Resource from app.main.service.auth_helper import Auth from ..util.dto import AuthDto api = AuthDto.api user_auth = AuthDto.user_auth @api.route('/login') class UserLogin(Resource): """ User Login Resource """ @api.doc('user login') @api.expect(user_auth, validate=True) def post(self): # get the post data post_data = request.json return Auth.login_user(data=post_data) @api.route('/logout') class LogoutAPI(Resource): """ Logout Resource """ @api.doc('logout a user') def post(self): # get auth token auth_header = request.headers.get('Authorization') return Auth.logout_user(data=auth_header)
마지막으로 swagger로 API 문서를 추가하기 위해 ./app/__init__.py에 auth_controller 패키지의 api 모듈을 추가해 줍니다.
$ sudo vim ./app/__init__.py
추가되는 module과 namespace는 아래와 같습니다.
from .main.controller.auth_controller import api as auth_ns
...
api.add_namespace(auth_ns)
# app/__init__.py from flask_restplus import Api from flask import Blueprint from .main.controller.user_controller import api as user_ns from .main.controller.auth_controller import api as auth_ns blueprint = Blueprint('api', __name__) api = Api(blueprint, title='FLASK RESTPLUS API BOILER-PLATE WITH JWT', version='1.0', description='a boilerplate for flask restplus web service' ) api.add_namespace(user_ns, path='/user') api.add_namespace(auth_ns)
이제 API 서버를 실행하고 5000번 포트로 접속해 봅니다.
$ python manage.py run * Serving Flask app "app.main" (lazy loading) * Environment: development * Debug mode: on * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit) * Restarting with stat * Debugger is active! * Debugger PIN: 145-484-411
브라우저를 열고 API 서버의 5000포트로 접속합니다.
추가한 auth의 login과 logout이 보입니다.
login/logout 기능이 추가되었으니 테스트를 해봐야겠죠. 테스트를 하기 전에 user_service.py에 generate_token method를 아래와 같이 추가해 줍니다.
$ sudo vim ./app/main/service/user_service.py
def generate_token(user): try: # generate the auth token auth_token = user.encode_auth_token(user.id) response_object = { 'status': 'success', 'message': 'Successfully registered.', 'Authorization': auth_token.decode() } return response_object, 201 except Exception as e: response_object = { 'status': 'fail', 'message': 'Some error occurred. Please try again.' } return response_object, 401
그 다음 user_service.py의 save_new_user method의 return 값을
response_object = {
'status': 'success',
'message': 'Successfully registered.'
}
return response_object, 201
이것이 아닌 아래와 같이 수정합니다.
return generate_token(new_user)
전체 user_service.py는 아래와 같습니다.
import uuid import datetime from app.main import db from app.main.model.user import User def save_new_user(data): user = User.query.filter_by(email=data['email']).first() if not user: new_user = User( public_id=str(uuid.uuid4()), email=data['email'], username=data['username'], password=data['password'], registered_on=datetime.datetime.utcnow() ) save_changes(new_user) return generate_token(new_user) else: response_object = { 'status': 'fail', 'message': 'User already exists. Please Log in.', } return response_object, 409 def get_all_users(): return User.query.all() def get_a_user(public_id): return User.query.filter_by(public_id=public_id).first() def generate_token(user): try: # generate the auth token auth_token = user.encode_auth_token(user.id) response_object = { 'status': 'success', 'message': 'Successfully registered.', 'Authorization': auth_token.decode() } return response_object, 201 except Exception as e: response_object = { 'status': 'fail', 'message': 'Some error occurred. Please try again.' } return response_object, 401 def save_changes(data): db.session.add(data) db.session.commit()
login/logout의 테스트 스크립트를 작성합니다.
개발에 있어서 테스트는 필수이며 자동화 되어야 합니다.
test 폴더에 test_auth.py를 생성합니다.
$ sudo vim ./app/test/test_auth.py
test_auth.py 파일에 아래와 같이 작성합니다.
import unittest import json from app.test.base import BaseTestCase def register_user(self): return self.client.post( '/user/', data=json.dumps(dict( email='example@gmail.com', username='username', password='123456' )), content_type='application/json' ) def login_user(self): return self.client.post( '/auth/login', data=json.dumps(dict( email='example@gmail.com', password='123456' )), content_type='application/json' ) class TestAuthBlueprint(BaseTestCase): def test_registered_user_login(self): """ Test for login of registered-user login """ with self.client: # user registration user_response = register_user(self) response_data = json.loads(user_response.data.decode()) self.assertTrue(response_data['Authorization']) self.assertEqual(user_response.status_code, 201) # registered user login login_response = login_user(self) data = json.loads(login_response.data.decode()) self.assertTrue(data['Authorization']) self.assertEqual(login_response.status_code, 200) def test_valid_logout(self): """ Test for logout before token expires """ with self.client: # user registration user_response = register_user(self) response_data = json.loads(user_response.data.decode()) self.assertTrue(response_data['Authorization']) self.assertEqual(user_response.status_code, 201) # registered user login login_response = login_user(self) data = json.loads(login_response.data.decode()) self.assertTrue(data['Authorization']) self.assertEqual(login_response.status_code, 200) # valid token logout response = self.client.post( '/auth/logout', headers=dict( Authorization='Bearer ' + json.loads( login_response.data.decode() )['Authorization'] ) ) data = json.loads(response.data.decode()) self.assertTrue(data['status'] == 'success') self.assertEqual(response.status_code, 200) if __name__ == '__main__': unittest.main()
python manage.py test 명령으로 테스트를 해봅니다.
$ python manage.py test test_registered_user_login (test_auth.TestAuthBlueprint) Test for login of registered-user login ... ok test_valid_logout (test_auth.TestAuthBlueprint) Test for logout before token expires ... ok test_app_is_development (test_config.TestDevelopmentConfig) ... ok test_app_is_production (test_config.TestProductionConfig) ... ok test_app_is_testing (test_config.TestTestingConfig) ... ok test_decode_auth_token (test_user_medol.TestUserModel) ... ok test_encode_auth_token (test_user_medol.TestUserModel) ... ok ---------------------------------------------------------------------- Ran 7 tests in 2.617s OK
여기까지 인증에 대한 개발이었습니다.
'Python > Flask' 카테고리의 다른 글
Flask로 API 서버 만들기 (8) - Extra tips (Makefiles) (0) | 2018.11.18 |
---|---|
Flask로 API 서버 만들기 (7) - Route protection and Authorization (1) | 2018.11.18 |
Flask로 API 서버 만들기 (5) - User Operations (2) | 2018.11.12 |
Flask로 API 서버 만들기 (4) - Testing (0) | 2018.11.12 |
Flask로 API 서버 만들기 (3) - User 테이블 만들기 (2) | 2018.11.12 |