Python基于Flask实现二次认证FTP文件上传功能

代码块

<code># _*_ coding:utf-8 _*_
''' 用于文件上传下载 '''


import sys
reload(sys)
sys.setdefaultencoding('utf-8')

import os
basedir = os.path.dirname(os.path.abspath(__file__))
libdirs = os.path.join(basedir,'lib')
confdir = os.path.join(basedir,'conf')
logpath = os.path.join(basedir,'logs')
inifile = os.path.join(confdir,'__init__.py')
appfile = os.path.join(confdir,'appconf.py')
sys.path.insert(0,libdirs)

import time
import hmac
import base64
import random
import string
import hashlib
from werkzeug.utils import secure_filename
from flask import Flask,render_template,jsonify,request,send_from_directory,redirect,url_for,make_response

app=Flask(__name__)
app.debug=True

#生成随机8位字符串
SALT = ''.join(random.sample(string.ascii_letters + string.digits, 8))

#生成默认Conf配置文件
if os.path.isdir(confdir) is False:
os.makedirs(confdir)
with open(inifile,'w') as f:
f.write('from appconf import init\\nConf = init()\\n')
if os.path.exists(appfile) is False:
with open(appfile,'w') as f:
f.write('''# _*_ coding:utf-8 _*_
def init():
return {
'global':{
'default_key':'000000',
'upload_dir':'upload', #默认上传路径
'secret_key':'key%s', #Token加盐salt值
'expire_time':3600, #Token过期时间
'limit_size': 100 * 1024 * 1024,
'allowed_extensions':['sh','py','gz','tar.gz','zip','tar'] #支持上传的文件类型
},

'ulogin':{
'admin': '%s' #默认登录账户密码
}
}
''' % (int(time.time()),SALT))

from conf import Conf

URINFO=Conf['ulogin']
SECKEY=Conf['global']['secret_key']
EXPIRE=Conf['global']['expire_time']
UPLOAD_FOLDER=Conf['global']['upload_dir']
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH']=Conf['global']['limit_size']
ALLOWED_EXTENSIONS = set(Conf['global']['allowed_extensions'])

# 用于判断文件后缀
def allowed_file(filename):
return '.' in filename and filename.rsplit('.',1)[1] in ALLOWED_EXTENSIONS

#登录接口(需要二次登录通过Seckey登录)
@app.route('/login',methods=['GET','POST'])
def do_login():
username = password = None
try:
if request.method == 'GET':
username=request.args.get('username',None)
password=request.args.get('password',None)
if request.method == 'POST':
username=request.json.get('username',None)
password=request.json.get('password',None)
if username is None:
return jsonify({"code":1,"message":"The username is empty"})
if password is None:
return jsonify({"code":1,"message":"The password is empty"})
if username not in URINFO:
return jsonify({"code":1,"message":"The username is not exists"})
if URINFO.get(username) not in password:
return jsonify({"code": 1,"message": "The password is error"})
tmp_code=Conf['global']['default_key']
if request.cookies.has_key('Seckey'):
tmp_code=request.cookies.get('Seckey',tmp_code)
if password != URINFO.get(username)+ tmp_code:
tmp_code = ''.join(random.sample(string.ascii_letters + string.digits,16))
response = make_response(jsonify({"code": 0,"seckey":tmp_code}))
response.set_cookie('Seckey', tmp_code)
response.set_cookie('Username', username)

return response
Token=encrypt_token(key=username,expire=EXPIRE)
response=make_response(jsonify({"code":0,"message":"Login successful"}))
response.set_cookie('Username',username)
response.set_cookie('Token',Token)
return response
except Exception,e:
response = make_response(jsonify({"code":1,"message":"Login failed"}))
response.set_cookie('Username',username)
return response

#注销接口
@app.route('/logout',methods=['GET'])
def do_logout():
Token=request.cookies.get('Token',None)
Username=request.cookies.get('Username',None)
if decrypt_token(token=Token,key=Username) is False:
return jsonify({"code":1,"message":"User logout fails,Token authentication fails"})
try:
response=make_response(jsonify({"code":0,"message":"User logout successful"}))
response.delete_cookie('Username',Username)
response.delete_cookie('Token',Token)
return response
except Exception:
return jsonify({"code": 1, "message": "User logout failed"})

#生成Token
def encrypt_token(key=SECKEY,expire=EXPIRE):
time_str=str(time.time() + expire).encode('utf-8')
hmac_str=hmac.new(key.encode('utf-8'),time_str,hashlib.md5().update(SECKEY)).hexdigest()
hash_str=hmac_str.encode('utf-8') + ':' + time_str
token_str=base64.urlsafe_b64encode(hash_str).encode('utf-8')
return token_str

#验证Token
def decrypt_token(token,key=SECKEY):
try:
token_str=base64.urlsafe_b64decode(token.encode('utf-8'))
hash_str=token_str.split(':')
if len(hash_str) != 2:
return False
hmac_str=hash_str[0]
time_str=hash_str[1]
if time.time() > float(time_str):
return False
old_hmac_str=hmac.new(key.encode('utf-8'),time_str.encode('utf-8'),hashlib.md5().update(SECKEY)).hexdigest()
if hmac_str != old_hmac_str:
return False
return True
except Exception:

return False

#上传页面
@app.route('/upload')
def upload_render():
Token=request.cookies.get('Token',None)
Username=request.cookies.get('Username',None)
if decrypt_token(token=Token,key=Username) is False:
return jsonify({"code":1,"message":"Token Authentication failed"})
return render_template('upload.html')

#下载接口
@app.route('/download',methods=['GET'],strict_slashes=False)
def download():
Token=request.cookies.get('Token',None)
Username = request.cookies.get('Username', None)
if decrypt_token(token=Token,key=Username) is False:
return jsonify({"code":1,"message":"Token Authentication failed"})
filename=request.args.get('file',None)
if filename is None:
return jsonify({"code":1,"message":"File is None"})
if os.path.isfile(os.path.join('upload', filename)):
return send_from_directory('upload',filename,as_attachment=True)
return jsonify({"code": 1, "message": "File does not exist"})

#上传接口
@app.route('/api/upload',methods=['POST','GET'],strict_slashes=False)
def api_upload():
Token=request.cookies.get('Token',None)
Username = request.cookies.get('Username', None)
if decrypt_token(token=Token,key=Username) is False:
return jsonify({"code":1,"message":"Token Authentication failed"})
if request.method == 'GET':
return render_template('upload.html')
file_dir=os.path.join(basedir,app.config['UPLOAD_FOLDER'])
if not os.path.exists(file_dir):
os.makedirs(file_dir)
f=request.files['myfile'] # 从表单的file字段获取文件,myfile为该表单的name值
try:
if f.filename == '':
return render_template('upload.html')
if f and allowed_file(f.filename): # 判断是否是允许上传的文件类型
fname=secure_filename(f.filename)
fname=str(fname).encode('utf-8')
if os.path.exists(os.path.join(file_dir,fname)) is False:
f.save(os.path.join(file_dir,fname))

else:
eot = fname.split('.',1)[0].encode('utf-8')
ext = fname.split('.',1)[1] # 获取文件后缀
unix_time = int(time.time())
new_filename=eot+'_'+str(unix_time)+'.'+ext # 修改了上传的文件名
f.save(os.path.join(file_dir,new_filename)) #保存文件到upload目录
time.sleep(3)
return redirect(url_for('upload_render'))
else:
return jsonify({"code":1,"message":"File type not allowed for upload"})
except Exception:
return jsonify({"code": 1, "message": "File type not allowed for upload"})

if __name__ == '__main__':
app.run(host='0.0.0.0',port=5678)/<code>

目录结构

Python基于Flask实现二次认证FTP文件上传功能

文件/目录结构说明:

main.py #主程序

start.sh #启停脚本

upload #文件上传目录,上传的文件会保存至此路径

templates/upload.html #模板文件,文件上传页面

lib #库文件

conf/appconf.py #配置文件

测试上传

1.启动服务


Python基于Flask实现二次认证FTP文件上传功能

服务启动完成以后会生成appconf.py,查看内容获取初始登录用户名/密码

2.登录系统

说明:需二次登录验证,第一次登录成功获取seckey用于第二次登录

http://127.0.0.1:5678/login?username=用户名&password=密码

Python基于Flask实现二次认证FTP文件上传功能

http://127.0.0.1:5678/login?username=用户名&password=密码+seckey

Python基于Flask实现二次认证FTP文件上传功能

二次登录成功以后生成Cookie,接口返回如下:


Python基于Flask实现二次认证FTP文件上传功能

3.文件上传

说明:该接口会拿取登录态进行Token验证,未登录会验证失败

http://127.0.0.1:5678/upload


Python基于Flask实现二次认证FTP文件上传功能

测试上传一个文件


Python基于Flask实现二次认证FTP文件上传功能

到upload目录确认文件是否上传成功


Python基于Flask实现二次认证FTP文件上传功能

测试错误文件格式上传(因为配置文件限制上传的文件格式,所以文件格式不符合会报错)


Python基于Flask实现二次认证FTP文件上传功能


Python基于Flask实现二次认证FTP文件上传功能

4.退出登录

说明:注销动作会清除Cookie以达到注销的目的

http://127.0.0.1:5678/logout

Python基于Flask实现二次认证FTP文件上传功能

5.由于本人是在本地测试,如果其它环境请修改IP地址即可


分享到:


相關文章: