- create hello.py with class User(db.Model)
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.sqlite3' app.config['SECRET_KEY'] = "op23413kjkjljlksjfaksjfdlksajfkjlkjlkjk" app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) class User(db.Model): __tablename__ = 'users' username = db.Column(db.String(64), primary_key=True) passwd = db.Column(db.String(64)) def __repr__(self): return '<User %r>' % self.username if __name__ == "__main__": db.create_all() app.run("0.0.0.0", 8090, debug=True)
2. test hello.py with python3 shell
zhub2@apache-vm1:~ $ python3 >>> from hello import db >>> db.create_all() >>> from hello import User ### INSERT NEW User >>> user_john = User(username='john', passwd='carlos23') >>> db.session.add(user_john) >>> db.session.commit() ### LIST ALL User >>> User.query.all() [<User 'john'>] ### UPDATE User WHERE username='john' >>> user_john = User.query.filter_by(username='john').first() >>> user_john.passwd='test123' >>> db.session.add(user_john) >>> db.session.commit()
import os
from os import path
from flask import jsonify, send_from_directory, render_template, session, url_for, flash, escape, redirect, request, abort, make_response, g
import ldap3
import subprocess
from functools import wraps
import pathlib
from datetime import datetime
from werkzeug.utils import secure_filename
from app import app
import base64
from flask_sqlalchemy import SQLAlchemy
import logging
from logging.handlers import TimedRotatingFileHandler
from flask import Flask
from app.config import Config
app = Flask(__name__)
app.config.from_object(Config)
app.config[‘SQLALCHEMY_DATABASE_URI’] = ‘sqlite:///users.sqlite3’
app.config[‘SQLALCHEMY_TRACK_MODIFICATIONS’] = False
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = ‘users’
username = db.Column(db.String(100), primary_key=True)
passwd = db.Column(db.String(100))
def __init__(self, username, passwd):
self.username = username
self.passwd = passwd
def __repr__(self):
return ” % self.username
logHandler = TimedRotatingFileHandler(filename=”audit.log”, when=’midnight’, backupCount=30)
logFormatter = logging.Formatter(‘%(asctime)s %(message)s’)
logHandler.setFormatter( logFormatter )
logger = logging.getLogger( ‘MyLogger’ )
logger.addHandler( logHandler )
logger.setLevel( logging.INFO )
EXEMPT_METHODS = set([‘OPTIONS’])
PATH_UPLOADS = app.config[‘PATH_UPLOADS’] if app.config[‘PATH_UPLOADS’][0] == “/” else os.path.join(app.root_path, app.config[‘PATH_UPLOADS’])
SSH_PASS = “/home/zhub2/sshpass”
@app.before_request
def before_request_func():
g.ip_addr = “127.0.0.1”
print(“before_request is running!”)
def api(cmd):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = p.communicate()
return stdout
def remote_ssh(username, passwd, ip_addr, cmd):
ssh_cmd = SSH_PASS + ” -p ” + passwd + ” ssh ” + username + “@” + ip_addr + ” -o ConnectTimeout=2 -o StrictHostKeyChecking=no -o LogLevel=Error ” + cmd
return ssh_cmd
def remote_bash(username, passwd, ip_addr):
bash_cmd = SSH_PASS + ” -p ” + passwd + ” ssh ” + username + “@” + ip_addr + ” -o ConnectTimeout=2 -o StrictHostKeyChecking=no -o LogLevel=Error bash -s — <./" + username + ".SAMPLE.sh"
return bash_cmd
def get_passwd(username):
session_query = User.query.filter_by(username=username).first()
passwd = str(session_query.passwd)
return passwd
@app.template_filter()
def b64encode(data):
return str(base64.urlsafe_b64encode(data.encode("utf-8")), "utf-8")
@app.template_filter()
def b64decode(data):
return str(base64.urlsafe_b64decode(data), "utf-8")
@app.context_processor
def inject_user():
if logged_in():
return dict(user=session['username'], role=session['role'])
return dict()
def logged_in():
if 'username' in session:
return True
return False
def humanbytes(B):
'Return the given bytes as a human friendly KB, MB, GB, or TB string'
B = float(B)
KB = float(1024)
MB = float(KB ** 2) # 1,048,576
GB = float(KB ** 3) # 1,073,741,824
TB = float(KB ** 4) # 1,099,511,627,776
if B 1 else ‘ B’)
elif KB <= B < MB:
return '{0:.2f} KB'.format(B/KB)
elif MB <= B < GB:
return '{0:.2f} MB'.format(B/MB)
elif GB <= B < TB:
return '{0:.2f} GB'.format(B/GB)
elif TB /dev/null 2>&1″ + os.linesep)
f.write(“sudo su – wasadm” + os.linesep)
f.write(“export WAS_USER=” + username + os.linesep)
f.write(“export WAS_PASS=” + passwd + os.linesep)
f.write(cmd)
full_cmd = remote_bash(username, passwd, ip_addr)
logger.info(“%s run command: %s” % (session[‘username’], full_cmd))
return render_template(“wascontrol.html”, subprocess_output=api(full_cmd))
@app.route(“/jboss”)
@login_required
def jboss():
return render_template(“jboss.html”)
@app.route(“/jboss”, methods = [“POST”, “GET”])
@login_required
def jboss_after():
ip_addr = request.form.get(‘ip_addr’)
g.ip_addr = ip_addr
cmd_old = request.form.get(‘name’)
username = session[‘username’]
passwd = get_passwd(username)
cmd = “/opt/rh/eap7/root/usr/share/wildfly/bin/jboss-cli.sh –timeout=90000 –connect controller=127.0.0.1:9999 –command='” + cmd_old + “‘”
with open(username + “.SAMPLE.sh”, “w”) as f:
f.write(“echo ” + passwd + ” |sudo -S su – jboss >/dev/null 2>&1″ + os.linesep)
f.write(“sudo su – jboss” + os.linesep)
f.write(cmd)
full_cmd = remote_bash(username, passwd, ip_addr)
print(full_cmd)
logger.info(“%s run CLI command: %s on %s” % (session[‘username’], cmd_old, ip_addr))
return render_template(“jboss.html”, subprocess_output=api(full_cmd))
@app.route(“/pm2”)
@login_required
def pm2():
return render_template(“pm2.html”)
@app.route(“/pm2”, methods = [“POST”, “GET”])
@login_required
def pm2_after():
ip_addr = request.form.get(‘ip_addr’)
g.ip_addr = ip_addr
cmd = request.form.get(‘name’)
username = session[‘username’]
passwd = get_passwd(username)
with open(username + “.SAMPLE.sh”, “w”) as f:
f.write(“echo ” + passwd + ” |sudo -S su – pm2 >/dev/null 2>&1″ + os.linesep)
f.write(“sudo su – pm2” + os.linesep)
f.write(cmd)
full_cmd = remote_bash(username, passwd, ip_addr)
print(full_cmd)
logger.info(“%s run PM2 command: %s on %s” % (session[‘username’], cmd, ip_addr))
return render_template(“pm2.html”, subprocess_output=api(full_cmd))
@app.route(“/shell_cmd”)
@login_required
def shell_cmd():
return render_template(“shell_cmd.html”)
@app.route(“/shell_cmd”, methods = [“POST”, “GET”])
@login_required
def shell_cmd_after():
ip_addr = request.form.get(‘ip_addr’)
g.ip_addr = ip_addr
cmd = request.form.get(‘name’)
passwd = get_passwd(session[‘username’])
full_cmd = remote_ssh(session[‘username’], passwd, ip_addr, cmd)
print(full_cmd)
logger.info(“%s run command: %s” % (session[‘username’], full_cmd))
return render_template(“shell_cmd.html”, subprocess_output=api(full_cmd))
@app.route(“/home”)
@login_required
def home():
if ‘userfiles’ not in session:
return redirect(url_for(‘logout’))
pathlib.Path(session[‘userfiles’]).mkdir(exist_ok=True)
files_list = get_files(session[‘userfiles’])
return render_template(‘home.htm’, files = files_list)
@app.route(“/home-admin”)
@login_required
def home_admin():
if session[‘role’] != ‘admin’:
return redirect(url_for(‘logout’))
tenants_list = get_files(PATH_UPLOADS)
for i in range(len(tenants_list)):
tenants_list[i][‘files’] = get_files(PATH_UPLOADS+’/’+tenants_list[i][‘name’])
return render_template(‘home-admin.htm’, tenants = tenants_list)
def allowed_file(filename):
return ‘.’ in filename and \
filename.rsplit(‘.’, 1)[1].lower() in app.config[‘ALLOWED_EXTENSIONS’]
@app.route(“/upload”, methods=[“GET”, “POST”])
@login_required
def upload():
if request.method == “POST”:
file = request.files[“file”]
if file.filename == ”:
return make_response(jsonify({“alert”: “warning”,”message”: “No file selected”}), 200)
if not allowed_file(file.filename):
return make_response(jsonify({“alert”: “warning” ,”message”: “Extension not allowed”}), 200)
if file:
try:
filename = secure_filename(file.filename)
file.save(os.path.join(session[‘userfiles’], filename))
return make_response(jsonify({“alert”: “success”, “message”: “File uploaded”}), 200)
except:
return make_response(jsonify({“alert”: “danger”, “message”: “File uploading error”}), 200)
return render_template(“upload.htm”, extensions = app.config[‘ALLOWED_EXTENSIONS’])
@app.route(“/delete/”)
@login_required
def delete(filename):
if ‘userfiles’ not in session:
return redirect(url_for(‘logout’))
file = session[‘userfiles’]+’/’+str(base64.urlsafe_b64decode(filename), “utf-8”)
if os.path.exists(file):
os.remove(file)
return redirect(url_for(‘home’))
@app.route(“/delete-admin//”)
@login_required
def delete_admin(tenant,filename):
if session[‘role’] != ‘admin’:
return redirect(url_for(‘logout’))
file = PATH_UPLOADS+’/’+tenant+’/’+str(base64.urlsafe_b64decode(filename), “utf-8”)
if os.path.exists(file):
os.remove(file)
return redirect(url_for(‘home_admin’))
@app.route(‘/files//’)
def files(dirname,filename):
return send_from_directory(PATH_UPLOADS+’/’+dirname, filename)
@app.after_request
def after_request_func(response):
ip_addr = g.ip_addr
return response
if __name__ == “__main__”:
db.create_all()
app.run(“0.0.0.0”, 8080, debug=True)
It’s nearly impossible to find experienced
people on this topic, but you sound like you know what
you’re talking about! Thanks