Experiments with Rest Authentication, Flask Restful and SQLAlchemy

I have been experimenting with REST Authentication since some time now. I was inspired with this article RESTful Authentication with Flask by Miguel Grinberg to try out REST Authentication with Flask. I have used Flask Blueprints, Flask Restful and Flask-SQLAlchemy libraries for my current blog. Though I have not followed the best practices while coding but have tried to capture the important parts of HTTP Authentication.

Github Repo: testRestFlask

Required Libraries:

Flask==0.10.1
Flask-Script==2.0.5
Flask-HTTPAuth==2.2.0
Flask-SQLAlchemy==1.0
Jinja2==2.8
MarkupSafe==0.23
SQLAlchemy==0.9.3
Werkzeug==0.9.4
itsdangerous==0.24
passlib==1.6.1
Flask-RESTful==0.3.5

The Model:

from flask import current_app, g, Flask
from flask.ext.sqlalchemy import SQLAlchemy
from flask.ext.httpauth import HTTPBasicAuth
from passlib.apps import custom_app_context as pwd_context
from itsdangerous import (TimedJSONWebSignatureSerializer
as Serializer, BadSignature, SignatureExpired)

app = Flask(__name__)
db = SQLAlchemy(app)
auth = HTTPBasicAuth()

app.config['SECRET_KEY'] = 'the quick brown fox jumps over the lazy dog'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True

class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(32), index=True)
password_hash = db.Column(db.String(64))

# Generate password hash for a plain text password and save in DB
def hash_password(self, password):
self.password_hash = pwd_context.encrypt(password)

# Verify if the password is correct
def verify_password(self, password):
return pwd_context.verify(password, self.password_hash)

 # Generate authentication token based for an expiration period
def generate_auth_token(self, expiration=600):
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'id': self.id})

# Verify authentication token
@staticmethod
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user

# Static metod to verify either the authentication token or username password combination
@auth.verify_password
def verify_password(username_or_token, password):

# first try to authenticate by token
user = User.verify_auth_token(username_or_token)
if not user:
# try to authenticate with username/password
user = User.query.filter_by(username=username_or_token).first()
if not user or not user.verify_password(password):
return False
g.user = user
return True

The API:

from flask import render_template,jsonify,current_app,g
from flask import Blueprint
from flask_restful import reqparse, abort, Api, Resource,marshal
from flask.ext.httpauth import HTTPBasicAuth
from models import *


testRest = Blueprint('testRest', __name__,url_prefix='/api')
api = Api()
api.init_app(testRest)

parser = reqparse.RequestParser()
parser.add_argument('username')
parser.add_argument('password')

# Create a new resource
class Users(Resource):
  def post(self):
    data = parser.parse_args()
    username = data['username']
    password = data['password']
    if username is None or password is None:
        print "missing argument"
        abort(400,message='missing argument')    # missing argument
    if User.query.filter_by(username=username).first() is not None:
        print "existing user"
        abort(400,message='existing user')    # existing user
    user = User(username=username)
    user.hash_password(password)
    db.session.add(user)
    db.session.commit()
    return {'username':user.username}

#Get the User details
class UserRes(Resource):
  def get(self,tid):
    user = User.query.get(tid)
    if not user:
        abort(400,message='user not found')
    return {'username': user.username}

# Generate an authentication token
class AuthToken(Resource):
  @auth.login_required
  def get(self):
    token = g.user.generate_auth_token(600)
    return {'token': token.decode('ascii'), 'duration': 600}

# Get a user resource after authentication
class HResource(Resource):
  @auth.login_required
  def get(self):
    return {'data': 'Hello, %s!' %g.user.username}

api.add_resource(Users, '/users')
api.add_resource(UserRes, '/user/')
api.add_resource(AuthToken, '/token')
api.add_resource(HResource, '/resource')
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s