don't stop believing

Flask로 API 서버 만들기 (6) - Security and Authentication 본문

Python/Flask

Flask로 API 서버 만들기 (6) - Security and Authentication

Tongchun 2018. 11. 12. 19:30

Developing API Sample Server by Flask 

Original Post: How to structure a Flask-RESTPlus web service for production builds

Github: https://github.com/cosmic-byte/flask-restplus-boilerplate


Flask로 API 서버 만들기 (1) - 개발 환경 준비

Flask로 API 서버 만들기 (2) - config 와 실행 확인

Flask로 API 서버 만들기 (3) - User 테이블 만들기

Flask로 API 서버 만들기 (4) - Testing

Flask로 API 서버 만들기 (5) - User Operations

Flask로 API 서버 만들기 (6) - Security and Authentication

Flask로 API 서버 만들기 (7) - Route protection and Authorization

Flask로 API 서버 만들기 (8) - Extra tips (Makefiles)


이번에는 인증 토큰을 이용한 로그인과 로그아웃, 그리고 블랙리스트 등록을 해보겠습니다.

먼저 블랙리스트 db만들겠습니다. models 폴더 아래 blacklist.py 파일을 만들어 줍니다.

1
$ sudo vim ./app/main/model/blacklist.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

blacklist.py 파일에 아래와 같이 추가합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from app.main import db
import datetime
class BlacklistToken(db.Model):
"""
Token Model for storing JWT tokens
"""
__tablename__ = 'blacklist_tokens'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(500), unique=True, nullable=False)
blacklisted_on = db.Column(db.DateTime, nullable=False)
def __init__(self, token):
self.token = token
self.blacklisted_on = datetime.datetime.now()
def __repr__(self):
return '<id: token: {}'.format(self.token)
@staticmethod
def check_blacklist(auth_token):
# check whether auth token has been blacklisted
res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
if res:
return True
else:
return False
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

blacklist_tokens라는 테이블을 만들고 id, token, blacklisted_on 컬럼을 만듭니다.


manage.py 파일에 blacklist 모듈을 import해줍니다.

from app.main.model import user, blacklist

1
$ sudo vim manage.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

from app.main.model import user 다음에 blacklist를 추가해 줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import os
import unittest
from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager
from app import blueprint
from app.main import create_app, db
from app.main.model import user, blacklist
app = create_app(os.getenv('BOILERPLATE_ENV') or 'dev')
app.register_blueprint(blueprint)
app.app_context().push()
manager = Manager(app)
migrate = Migrate(app, db)
manager.add_command('db', MigrateCommand)
@manager.command
def run():
app.run(host='0.0.0.0')
@manager.command
def test():
"""Runs the unit tests."""
tests = unittest.TestLoader().discover('app/test', pattern='test*.py')
result = unittest.TextTestRunner(verbosity=2).run(tests)
if result.wasSuccessful():
return 0
return 1
if __name__ == '__main__':
manager.run()
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

db를 변경했으니 migrate와 upgrade 명령을 실행해줍니다.

1
2
$ python manage.py db migrate --message 'add blacklist table'
$ python manage.py db upgrade
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

그 다음 service 폴더 아래 blacklist_service.py 파일을 만들어 줍니다.

1
$ sudo vim ./app/main/service/blacklist_service.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

blacklist_ervice.py 파일에 아래와 같이 추가합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from app.main import db
from app.main.model.blacklist import BlacklistToken
def save_token(token):
blacklist_token = BlacklistToken(token=token)
try:
# insert the token
db.session.add(blacklist_token)
db.session.commit()
response_object = {
'status': 'success',
'message': 'Successfully logged out.'
}
return response_object, 200
except Exception as e:
response_object = {
'status': 'fail',
'message': e
}
return response_object, 200
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

save_token 함수를 호출하면 blacklist_token 테이블에 token을 저장합니다.


이번에는 model 폴더 아래있는 user.py에 token을 encoding 과 decoding 하는 method를 추가합니다.

encode/decode에 필요한 모듈도 추가해 줍니다.


import datetime

import jwt

from app.main.model.blacklist import BlacklistToken

from ..config import key

1
$ sudo vim ./app/main/model/user.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

추가되는 method는 def encode_auth_token(self, user_id):, def decode_auth_token(auth_token): 두 개 입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from .. import db, flask_bcrypt
import datetime
from app.main.model.blacklist import BlacklistToken
from ..config import key
import jwt
class User(db.Model):
""" User Model for storing user related details """
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String(255), unique=True, nullable=False)
registered_on = db.Column(db.DateTime, nullable=False)
admin = db.Column(db.Boolean, nullable=False, default=False)
public_id = db.Column(db.String(100), unique=True)
username = db.Column(db.String(50), unique=True)
password_hash = db.Column(db.String(100))
@property
def password(self):
raise AttributeError('password: write-only field')
@password.setter
def password(self, password):
self.password_hash = flask_bcrypt.generate_password_hash(password).decode('utf-8')
def check_password(self, password):
return flask_bcrypt.check_password_hash(self.password_hash, password)
def encode_auth_token(self, user_id):
"""
Generates the Auth Token
:return: string
"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1, seconds=5),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
key,
algorithm='HS256'
)
except Exception as e:
return e
@staticmethod
def decode_auth_token(auth_token):
"""
Decodes the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, key)
is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
if is_blacklisted_token:
return 'Token blacklisted. Please log in again.'
else:
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
def __repr__(self):
return "<user '{}'="">".format(self.username)</user>
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

기능이 추가되었으니 test를 해보겠습니다.

test 폴더에 base.py 파일을 생성합니다.

1
$ sudo vim ./app/test/base.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

base.py 파일에 아래와 같이 테스에 사용할 session을 만드는 script를 추가합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask_testing import TestCase
from app.main import db
from manage import app
class BaseTestCase(TestCase):
""" Base Tests """
def create_app(self):
app.config.from_object('app.main.config.TestingConfig')
return app
def setUp(self):
db.create_all()
db.session.commit()
def tearDown(self):
db.session.remove()
db.drop_all()
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

이제 test 폴더 안에 test_user_medol.py 파일을 만듭니다.

1
$ sudo vim ./app/test/test_user_medol.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

test_user_medol.py에 테스트 script를 추가합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import unittest
import datetime
from app.main import db
from app.main.model.user import User
from app.test.base import BaseTestCase
class TestUserModel(BaseTestCase):
def test_encode_auth_token(self):
user = User(
email='test@test.com',
password='test',
registered_on=datetime.datetime.utcnow()
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
def test_decode_auth_token(self):
user = User(
email='test@test.com',
password='test',
registered_on=datetime.datetime.utcnow()
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
self.assertTrue(User.decode_auth_token(auth_token.decode("utf-8") ) == 1)
if __name__ == '__main__':
unittest.main()
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

파일을 저장하고 python manage.py test 명령으로 test를 실행합니다.

1
2
3
4
5
6
7
8
9
10
11
$ python manage.py test
test_app_is_development (test_config.TestDevelopmentConfig) ... ok
test_app_is_production (test_config.TestProductionConfig) ... ok
test_app_is_testing (test_config.TestTestingConfig) ... ok
test_decode_auth_token (test_user_medol.TestUserModel) ... ok
test_encode_auth_token (test_user_medol.TestUserModel) ... ok
----------------------------------------------------------------------
Ran 5 tests in 1.069s
OK
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX


이제 login/logout을 만들 차례입니다.

먼저 dto에 login/out에서 사용할 field를 만들어 줍니다.

1
$ sudo vim ./app/main/util/dto.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

dto.py 파일을 열고 아래와 같이 AuthDto 클래스를 추가해 줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask_restplus import Namespace, fields
class UserDto:
api = Namespace('user', description='user related operations')
user = api.model('user', {
'email': fields.String(required=True, description='user email address'),
'username': fields.String(required=True, description='user username'),
'password': fields.String(required=True, description='user password'),
'public_id': fields.String(description='user Identifier')
})
class AuthDto:
api = Namespace('auth', description='authentication related operations')
user_auth = api.model('auth_details', {
'email': fields.String(required=True, description='The email address'),
'password': fields.String(required=True, description='The user password '),
})
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

다음으로 로그인 로직을 처리하는 auth helper를 만들어 줍니다.

1
$ sudo vim ./app/main/service/auth_helper.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

아래와 같이 스크립트를 작성합니다.

유저가 로그아웃을 하면 사용된 토큰은 blacklist로 들어가고 같은 토큰으로는 사용할 수 없게 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from app.main.model.user import User
from ..service.blacklist_service import save_token
class Auth:
@staticmethod
def login_user(data):
try:
# fetch the user data
user = User.query.filter_by(email=data.get('email')).first()
if user and user.check_password(data.get('password')):
auth_token = user.encode_auth_token(user.id)
if auth_token:
response_object = {
'status': 'success',
'message': 'Successfully logged in.',
'Authorization': auth_token.decode()
}
return response_object, 200
else:
response_object = {
'status': 'fail',
'message': 'email or password does not match.'
}
return response_object, 401
except Exception as e:
print(e)
response_object = {
'status': 'fail',
'message': 'Try again'
}
return response_object, 500
@staticmethod
def logout_user(data):
if data:
auth_token = data.split(" ")[1]
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
# mark the token as blacklisted
return save_token(token=auth_token)
else:
response_object = {
'status': 'fail',
'message': resp
}
return response_object, 401
else:
response_object = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return response_object, 403
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

 handler를 만들었으니 이제는 controller를 만들 차례입니다.

controller 폴더에 auth_controller.py를 만들어 줍니다.

1
$ sudo vim ./app/main/controller/auth_controller.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

API 서버에서 사용할 login과 logout을 만들어 줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from flask import request
from flask_restplus import Resource
from app.main.service.auth_helper import Auth
from ..util.dto import AuthDto
api = AuthDto.api
user_auth = AuthDto.user_auth
@api.route('/login')
class UserLogin(Resource):
"""
User Login Resource
"""
@api.doc('user login')
@api.expect(user_auth, validate=True)
def post(self):
# get the post data
post_data = request.json
return Auth.login_user(data=post_data)
@api.route('/logout')
class LogoutAPI(Resource):
"""
Logout Resource
"""
@api.doc('logout a user')
def post(self):
# get auth token
auth_header = request.headers.get('Authorization')
return Auth.logout_user(data=auth_header)
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

마지막으로 swagger로 API 문서를 추가하기 위해 ./app/__init__.py에 auth_controller 패키지의 api 모듈을 추가해 줍니다.

1
$ sudo vim ./app/__init__.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

추가되는 module과 namespace는 아래와 같습니다.

from .main.controller.auth_controller import api as auth_ns

...

api.add_namespace(auth_ns)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# app/__init__.py
from flask_restplus import Api
from flask import Blueprint
from .main.controller.user_controller import api as user_ns
from .main.controller.auth_controller import api as auth_ns
blueprint = Blueprint('api', __name__)
api = Api(blueprint,
title='FLASK RESTPLUS API BOILER-PLATE WITH JWT',
version='1.0',
description='a boilerplate for flask restplus web service'
)
api.add_namespace(user_ns, path='/user')
api.add_namespace(auth_ns)
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

이제 API 서버를 실행하고 5000번 포트로 접속해 봅니다.

1
2
3
4
5
6
7
8
$ python manage.py run
* Serving Flask app "app.main" (lazy loading)
* Environment: development
* Debug mode: on
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 145-484-411
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

브라우저를 열고 API 서버의 5000포트로 접속합니다.

추가한 auth의 login과 logout이 보입니다.


login/logout 기능이 추가되었으니 테스트를 해봐야겠죠. 테스트를 하기 전에 user_service.py에 generate_token method를 아래와 같이 추가해 줍니다.

1
$ sudo vim ./app/main/service/user_service.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def generate_token(user):
try:
# generate the auth token
auth_token = user.encode_auth_token(user.id)
response_object = {
'status': 'success',
'message': 'Successfully registered.',
'Authorization': auth_token.decode()
}
return response_object, 201
except Exception as e:
response_object = {
'status': 'fail',
'message': 'Some error occurred. Please try again.'
}
return response_object, 401
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

그 다음 user_service.py의 save_new_user method의 return 값을

response_object = {

    'status': 'success',

    'message': 'Successfully registered.'

}

return response_object, 201

이것이 아닌 아래와 같이 수정합니다.

return generate_token(new_user)


전체 user_service.py는 아래와 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import uuid
import datetime
from app.main import db
from app.main.model.user import User
def save_new_user(data):
user = User.query.filter_by(email=data['email']).first()
if not user:
new_user = User(
public_id=str(uuid.uuid4()),
email=data['email'],
username=data['username'],
password=data['password'],
registered_on=datetime.datetime.utcnow()
)
save_changes(new_user)
return generate_token(new_user)
else:
response_object = {
'status': 'fail',
'message': 'User already exists. Please Log in.',
}
return response_object, 409
def get_all_users():
return User.query.all()
def get_a_user(public_id):
return User.query.filter_by(public_id=public_id).first()
def generate_token(user):
try:
# generate the auth token
auth_token = user.encode_auth_token(user.id)
response_object = {
'status': 'success',
'message': 'Successfully registered.',
'Authorization': auth_token.decode()
}
return response_object, 201
except Exception as e:
response_object = {
'status': 'fail',
'message': 'Some error occurred. Please try again.'
}
return response_object, 401
def save_changes(data):
db.session.add(data)
db.session.commit()
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

login/logout의 테스트 스크립트를 작성합니다.

개발에 있어서 테스트는 필수이며 자동화 되어야 합니다.

test 폴더에 test_auth.py를 생성합니다.

1
$ sudo vim ./app/test/test_auth.py
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

test_auth.py 파일에 아래와 같이 작성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import unittest
import json
from app.test.base import BaseTestCase
def register_user(self):
return self.client.post(
'/user/',
data=json.dumps(dict(
email='example@gmail.com',
username='username',
password='123456'
)),
content_type='application/json'
)
def login_user(self):
return self.client.post(
'/auth/login',
data=json.dumps(dict(
email='example@gmail.com',
password='123456'
)),
content_type='application/json'
)
class TestAuthBlueprint(BaseTestCase):
def test_registered_user_login(self):
""" Test for login of registered-user login """
with self.client:
# user registration
user_response = register_user(self)
response_data = json.loads(user_response.data.decode())
self.assertTrue(response_data['Authorization'])
self.assertEqual(user_response.status_code, 201)
# registered user login
login_response = login_user(self)
data = json.loads(login_response.data.decode())
self.assertTrue(data['Authorization'])
self.assertEqual(login_response.status_code, 200)
def test_valid_logout(self):
""" Test for logout before token expires """
with self.client:
# user registration
user_response = register_user(self)
response_data = json.loads(user_response.data.decode())
self.assertTrue(response_data['Authorization'])
self.assertEqual(user_response.status_code, 201)
# registered user login
login_response = login_user(self)
data = json.loads(login_response.data.decode())
self.assertTrue(data['Authorization'])
self.assertEqual(login_response.status_code, 200)
# valid token logout
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
login_response.data.decode()
)['Authorization']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertEqual(response.status_code, 200)
if __name__ == '__main__':
unittest.main()
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

python manage.py test 명령으로 테스트를 해봅니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ python manage.py test
test_registered_user_login (test_auth.TestAuthBlueprint)
Test for login of registered-user login ... ok
test_valid_logout (test_auth.TestAuthBlueprint)
Test for logout before token expires ... ok
test_app_is_development (test_config.TestDevelopmentConfig) ... ok
test_app_is_production (test_config.TestProductionConfig) ... ok
test_app_is_testing (test_config.TestTestingConfig) ... ok
test_decode_auth_token (test_user_medol.TestUserModel) ... ok
test_encode_auth_token (test_user_medol.TestUserModel) ... ok
----------------------------------------------------------------------
Ran 7 tests in 2.617s
OK
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX


여기까지 인증에 대한 개발이었습니다.