일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- appium server
- Jupyter Notebook
- SWIFT
- centos
- ssh
- kitura
- 실행권한
- nohup
- STF_PortForwarding
- mysql
- rethinkdb
- sshpass
- create table
- openpyxl
- ftp
- appium
- Materials
- 28015
- nGrinder
- nmap
- Jupyter
- PYTHON
- ubuntu
- postgresql
- insert
- port forwarding
- STF
- perfect
- GoCD
- postgres
- Today
- Total
don't stop believing
Flask로 API 서버 만들기 (6) - Security and Authentication 본문
Developing API Sample Server by Flask
Original Post: How to structure a Flask-RESTPlus web service for production builds
Github: https://github.com/cosmic-byte/flask-restplus-boilerplate
Flask로 API 서버 만들기 (1) - 개발 환경 준비
Flask로 API 서버 만들기 (2) - config 와 실행 확인
Flask로 API 서버 만들기 (3) - User 테이블 만들기
Flask로 API 서버 만들기 (4) - Testing
Flask로 API 서버 만들기 (5) - User Operations
Flask로 API 서버 만들기 (6) - Security and Authentication
이번에는 인증 토큰을 이용한 로그인과 로그아웃, 그리고 블랙리스트 등록을 해보겠습니다.
먼저 블랙리스트 db만들겠습니다. models 폴더 아래 blacklist.py 파일을 만들어 줍니다.
1$ sudo vim ./app/main/model/blacklist.py
blacklist.py 파일에 아래와 같이 추가합니다.
1234567891011121314151617181920212223242526272829from app.main import dbimport datetimeclass BlacklistToken(db.Model):"""Token Model for storing JWT tokens"""__tablename__ = 'blacklist_tokens'id = db.Column(db.Integer, primary_key=True, autoincrement=True)token = db.Column(db.String(500), unique=True, nullable=False)blacklisted_on = db.Column(db.DateTime, nullable=False)def __init__(self, token):self.token = tokenself.blacklisted_on = datetime.datetime.now()def __repr__(self):return '<id: token: {}'.format(self.token)@staticmethoddef check_blacklist(auth_token):# check whether auth token has been blacklistedres = BlacklistToken.query.filter_by(token=str(auth_token)).first()if res:return Trueelse:return False
blacklist_tokens라는 테이블을 만들고 id, token, blacklisted_on 컬럼을 만듭니다.
manage.py 파일에 blacklist 모듈을 import해줍니다.
from app.main.model import user, blacklist
1$ sudo vim manage.py
from app.main.model import user 다음에 blacklist를 추가해 줍니다.
12345678910111213141516171819202122232425262728293031323334353637import osimport unittestfrom flask_migrate import Migrate, MigrateCommandfrom flask_script import Managerfrom app import blueprintfrom app.main import create_app, dbfrom app.main.model import user, blacklistapp = create_app(os.getenv('BOILERPLATE_ENV') or 'dev')app.register_blueprint(blueprint)app.app_context().push()manager = Manager(app)migrate = Migrate(app, db)manager.add_command('db', MigrateCommand)@manager.commanddef run():app.run(host='0.0.0.0')@manager.commanddef test():"""Runs the unit tests."""tests = unittest.TestLoader().discover('app/test', pattern='test*.py')result = unittest.TextTestRunner(verbosity=2).run(tests)if result.wasSuccessful():return 0return 1if __name__ == '__main__':manager.run()
db를 변경했으니 migrate와 upgrade 명령을 실행해줍니다.
12$ python manage.py db migrate --message 'add blacklist table'$ python manage.py db upgrade
그 다음 service 폴더 아래 blacklist_service.py 파일을 만들어 줍니다.
1$ sudo vim ./app/main/service/blacklist_service.py
blacklist_ervice.py 파일에 아래와 같이 추가합니다.
123456789101112131415161718192021from app.main import dbfrom app.main.model.blacklist import BlacklistTokendef save_token(token):blacklist_token = BlacklistToken(token=token)try:# insert the tokendb.session.add(blacklist_token)db.session.commit()response_object = {'status': 'success','message': 'Successfully logged out.'}return response_object, 200except Exception as e:response_object = {'status': 'fail','message': e}return response_object, 200
save_token 함수를 호출하면 blacklist_token 테이블에 token을 저장합니다.
이번에는 model 폴더 아래있는 user.py에 token을 encoding 과 decoding 하는 method를 추가합니다.
encode/decode에 필요한 모듈도 추가해 줍니다.
import datetime
import jwt
from app.main.model.blacklist import BlacklistToken
from ..config import key
1$ sudo vim ./app/main/model/user.py
추가되는 method는 def encode_auth_token(self, user_id):, def decode_auth_token(auth_token): 두 개 입니다.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071from .. import db, flask_bcryptimport datetimefrom app.main.model.blacklist import BlacklistTokenfrom ..config import keyimport jwtclass User(db.Model):""" User Model for storing user related details """__tablename__ = "user"id = db.Column(db.Integer, primary_key=True, autoincrement=True)email = db.Column(db.String(255), unique=True, nullable=False)registered_on = db.Column(db.DateTime, nullable=False)admin = db.Column(db.Boolean, nullable=False, default=False)public_id = db.Column(db.String(100), unique=True)username = db.Column(db.String(50), unique=True)password_hash = db.Column(db.String(100))@propertydef password(self):raise AttributeError('password: write-only field')@password.setterdef password(self, password):self.password_hash = flask_bcrypt.generate_password_hash(password).decode('utf-8')def check_password(self, password):return flask_bcrypt.check_password_hash(self.password_hash, password)def encode_auth_token(self, user_id):"""Generates the Auth Token:return: string"""try:payload = {'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1, seconds=5),'iat': datetime.datetime.utcnow(),'sub': user_id}return jwt.encode(payload,key,algorithm='HS256')except Exception as e:return e@staticmethoddef decode_auth_token(auth_token):"""Decodes the auth token:param auth_token::return: integer|string"""try:payload = jwt.decode(auth_token, key)is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)if is_blacklisted_token:return 'Token blacklisted. Please log in again.'else:return payload['sub']except jwt.ExpiredSignatureError:return 'Signature expired. Please log in again.'except jwt.InvalidTokenError:return 'Invalid token. Please log in again.'def __repr__(self):return "<user '{}'="">".format(self.username)</user>
기능이 추가되었으니 test를 해보겠습니다.
test 폴더에 base.py 파일을 생성합니다.
1$ sudo vim ./app/test/base.py
base.py 파일에 아래와 같이 테스에 사용할 session을 만드는 script를 추가합니다.
12345678910111213141516171819from flask_testing import TestCasefrom app.main import dbfrom manage import appclass BaseTestCase(TestCase):""" Base Tests """def create_app(self):app.config.from_object('app.main.config.TestingConfig')return appdef setUp(self):db.create_all()db.session.commit()def tearDown(self):db.session.remove()db.drop_all()
이제 test 폴더 안에 test_user_medol.py 파일을 만듭니다.
1$ sudo vim ./app/test/test_user_medol.py
test_user_medol.py에 테스트 script를 추가합니다.
123456789101112131415161718192021222324252627282930313233343536import unittestimport datetimefrom app.main import dbfrom app.main.model.user import Userfrom app.test.base import BaseTestCaseclass TestUserModel(BaseTestCase):def test_encode_auth_token(self):user = User(email='test@test.com',password='test',registered_on=datetime.datetime.utcnow())db.session.add(user)db.session.commit()auth_token = user.encode_auth_token(user.id)self.assertTrue(isinstance(auth_token, bytes))def test_decode_auth_token(self):user = User(email='test@test.com',password='test',registered_on=datetime.datetime.utcnow())db.session.add(user)db.session.commit()auth_token = user.encode_auth_token(user.id)self.assertTrue(isinstance(auth_token, bytes))self.assertTrue(User.decode_auth_token(auth_token.decode("utf-8") ) == 1)if __name__ == '__main__':unittest.main()
파일을 저장하고 python manage.py test 명령으로 test를 실행합니다.
1234567891011$ python manage.py testtest_app_is_development (test_config.TestDevelopmentConfig) ... oktest_app_is_production (test_config.TestProductionConfig) ... oktest_app_is_testing (test_config.TestTestingConfig) ... oktest_decode_auth_token (test_user_medol.TestUserModel) ... oktest_encode_auth_token (test_user_medol.TestUserModel) ... ok----------------------------------------------------------------------Ran 5 tests in 1.069sOK
이제 login/logout을 만들 차례입니다.
먼저 dto에 login/out에서 사용할 field를 만들어 줍니다.
1$ sudo vim ./app/main/util/dto.py
dto.py 파일을 열고 아래와 같이 AuthDto 클래스를 추가해 줍니다.
1234567891011121314151617from flask_restplus import Namespace, fieldsclass UserDto:api = Namespace('user', description='user related operations')user = api.model('user', {'email': fields.String(required=True, description='user email address'),'username': fields.String(required=True, description='user username'),'password': fields.String(required=True, description='user password'),'public_id': fields.String(description='user Identifier')})class AuthDto:api = Namespace('auth', description='authentication related operations')user_auth = api.model('auth_details', {'email': fields.String(required=True, description='The email address'),'password': fields.String(required=True, description='The user password '),})
다음으로 로그인 로직을 처리하는 auth helper를 만들어 줍니다.
1$ sudo vim ./app/main/service/auth_helper.py
아래와 같이 스크립트를 작성합니다.
유저가 로그아웃을 하면 사용된 토큰은 blacklist로 들어가고 같은 토큰으로는 사용할 수 없게 됩니다.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758from app.main.model.user import Userfrom ..service.blacklist_service import save_tokenclass Auth:@staticmethoddef login_user(data):try:# fetch the user datauser = User.query.filter_by(email=data.get('email')).first()if user and user.check_password(data.get('password')):auth_token = user.encode_auth_token(user.id)if auth_token:response_object = {'status': 'success','message': 'Successfully logged in.','Authorization': auth_token.decode()}return response_object, 200else:response_object = {'status': 'fail','message': 'email or password does not match.'}return response_object, 401except Exception as e:print(e)response_object = {'status': 'fail','message': 'Try again'}return response_object, 500@staticmethoddef logout_user(data):if data:auth_token = data.split(" ")[1]else:auth_token = ''if auth_token:resp = User.decode_auth_token(auth_token)if not isinstance(resp, str):# mark the token as blacklistedreturn save_token(token=auth_token)else:response_object = {'status': 'fail','message': resp}return response_object, 401else:response_object = {'status': 'fail','message': 'Provide a valid auth token.'}return response_object, 403
handler를 만들었으니 이제는 controller를 만들 차례입니다.
controller 폴더에 auth_controller.py를 만들어 줍니다.
1$ sudo vim ./app/main/controller/auth_controller.py
API 서버에서 사용할 login과 logout을 만들어 줍니다.
123456789101112131415161718192021222324252627282930313233from flask import requestfrom flask_restplus import Resourcefrom app.main.service.auth_helper import Authfrom ..util.dto import AuthDtoapi = AuthDto.apiuser_auth = AuthDto.user_auth@api.route('/login')class UserLogin(Resource):"""User Login Resource"""@api.doc('user login')@api.expect(user_auth, validate=True)def post(self):# get the post datapost_data = request.jsonreturn Auth.login_user(data=post_data)@api.route('/logout')class LogoutAPI(Resource):"""Logout Resource"""@api.doc('logout a user')def post(self):# get auth tokenauth_header = request.headers.get('Authorization')return Auth.logout_user(data=auth_header)
마지막으로 swagger로 API 문서를 추가하기 위해 ./app/__init__.py에 auth_controller 패키지의 api 모듈을 추가해 줍니다.
1$ sudo vim ./app/__init__.py
추가되는 module과 namespace는 아래와 같습니다.
from .main.controller.auth_controller import api as auth_ns
...
api.add_namespace(auth_ns)
1234567891011121314151617# app/__init__.pyfrom flask_restplus import Apifrom flask import Blueprintfrom .main.controller.user_controller import api as user_nsfrom .main.controller.auth_controller import api as auth_nsblueprint = Blueprint('api', __name__)api = Api(blueprint,title='FLASK RESTPLUS API BOILER-PLATE WITH JWT',version='1.0',description='a boilerplate for flask restplus web service')api.add_namespace(user_ns, path='/user')api.add_namespace(auth_ns)
이제 API 서버를 실행하고 5000번 포트로 접속해 봅니다.
12345678$ python manage.py run* Serving Flask app "app.main" (lazy loading)* Environment: development* Debug mode: on* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)* Restarting with stat* Debugger is active!* Debugger PIN: 145-484-411
브라우저를 열고 API 서버의 5000포트로 접속합니다.
추가한 auth의 login과 logout이 보입니다.
login/logout 기능이 추가되었으니 테스트를 해봐야겠죠. 테스트를 하기 전에 user_service.py에 generate_token method를 아래와 같이 추가해 줍니다.
1$ sudo vim ./app/main/service/user_service.py
12345678910111213141516def generate_token(user):try:# generate the auth tokenauth_token = user.encode_auth_token(user.id)response_object = {'status': 'success','message': 'Successfully registered.','Authorization': auth_token.decode()}return response_object, 201except Exception as e:response_object = {'status': 'fail','message': 'Some error occurred. Please try again.'}return response_object, 401
그 다음 user_service.py의 save_new_user method의 return 값을
response_object = {
'status': 'success',
'message': 'Successfully registered.'
}
return response_object, 201
이것이 아닌 아래와 같이 수정합니다.
return generate_token(new_user)
전체 user_service.py는 아래와 같습니다.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354import uuidimport datetimefrom app.main import dbfrom app.main.model.user import Userdef save_new_user(data):user = User.query.filter_by(email=data['email']).first()if not user:new_user = User(public_id=str(uuid.uuid4()),email=data['email'],username=data['username'],password=data['password'],registered_on=datetime.datetime.utcnow())save_changes(new_user)return generate_token(new_user)else:response_object = {'status': 'fail','message': 'User already exists. Please Log in.',}return response_object, 409def get_all_users():return User.query.all()def get_a_user(public_id):return User.query.filter_by(public_id=public_id).first()def generate_token(user):try:# generate the auth tokenauth_token = user.encode_auth_token(user.id)response_object = {'status': 'success','message': 'Successfully registered.','Authorization': auth_token.decode()}return response_object, 201except Exception as e:response_object = {'status': 'fail','message': 'Some error occurred. Please try again.'}return response_object, 401def save_changes(data):db.session.add(data)db.session.commit()
login/logout의 테스트 스크립트를 작성합니다.
개발에 있어서 테스트는 필수이며 자동화 되어야 합니다.
test 폴더에 test_auth.py를 생성합니다.
1$ sudo vim ./app/test/test_auth.py
test_auth.py 파일에 아래와 같이 작성합니다.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475import unittestimport jsonfrom app.test.base import BaseTestCasedef register_user(self):return self.client.post('/user/',data=json.dumps(dict(email='example@gmail.com',username='username',password='123456')),content_type='application/json')def login_user(self):return self.client.post('/auth/login',data=json.dumps(dict(email='example@gmail.com',password='123456')),content_type='application/json')class TestAuthBlueprint(BaseTestCase):def test_registered_user_login(self):""" Test for login of registered-user login """with self.client:# user registrationuser_response = register_user(self)response_data = json.loads(user_response.data.decode())self.assertTrue(response_data['Authorization'])self.assertEqual(user_response.status_code, 201)# registered user loginlogin_response = login_user(self)data = json.loads(login_response.data.decode())self.assertTrue(data['Authorization'])self.assertEqual(login_response.status_code, 200)def test_valid_logout(self):""" Test for logout before token expires """with self.client:# user registrationuser_response = register_user(self)response_data = json.loads(user_response.data.decode())self.assertTrue(response_data['Authorization'])self.assertEqual(user_response.status_code, 201)# registered user loginlogin_response = login_user(self)data = json.loads(login_response.data.decode())self.assertTrue(data['Authorization'])self.assertEqual(login_response.status_code, 200)# valid token logoutresponse = self.client.post('/auth/logout',headers=dict(Authorization='Bearer ' + json.loads(login_response.data.decode())['Authorization']))data = json.loads(response.data.decode())self.assertTrue(data['status'] == 'success')self.assertEqual(response.status_code, 200)if __name__ == '__main__':unittest.main()
python manage.py test 명령으로 테스트를 해봅니다.
123456789101112131415$ python manage.py testtest_registered_user_login (test_auth.TestAuthBlueprint)Test for login of registered-user login ... oktest_valid_logout (test_auth.TestAuthBlueprint)Test for logout before token expires ... oktest_app_is_development (test_config.TestDevelopmentConfig) ... oktest_app_is_production (test_config.TestProductionConfig) ... oktest_app_is_testing (test_config.TestTestingConfig) ... oktest_decode_auth_token (test_user_medol.TestUserModel) ... oktest_encode_auth_token (test_user_medol.TestUserModel) ... ok----------------------------------------------------------------------Ran 7 tests in 2.617sOK
여기까지 인증에 대한 개발이었습니다.
'Python > Flask' 카테고리의 다른 글
Flask로 API 서버 만들기 (8) - Extra tips (Makefiles) (0) | 2018.11.18 |
---|---|
Flask로 API 서버 만들기 (7) - Route protection and Authorization (1) | 2018.11.18 |
Flask로 API 서버 만들기 (5) - User Operations (2) | 2018.11.12 |
Flask로 API 서버 만들기 (4) - Testing (0) | 2018.11.12 |
Flask로 API 서버 만들기 (3) - User 테이블 만들기 (2) | 2018.11.12 |