flask_sqlalchemy insert or update record

  1. create hello.py with class User(db.Model)
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
 
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.sqlite3'
app.config['SECRET_KEY'] = "op23413kjkjljlksjfaksjfdlksajfkjlkjlkjk"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
 
class User(db.Model):
    __tablename__ = 'users'
    username = db.Column(db.String(64), primary_key=True)
    passwd = db.Column(db.String(64))
    def __repr__(self):
        return '<User %r>' % self.username
 
if __name__ == "__main__":
        db.create_all()
        app.run("0.0.0.0", 8090, debug=True)

2. test hello.py with python3 shell

zhub2@apache-vm1:~ $ python3
>>> from hello import db
>>> db.create_all()
>>> from hello import User
### INSERT NEW User
>>> user_john = User(username='john', passwd='carlos23')
>>> db.session.add(user_john)
>>> db.session.commit()
### LIST ALL User
>>> User.query.all()
[<User 'john'>]
### UPDATE User WHERE username='john'
>>> user_john = User.query.filter_by(username='john').first()
>>> user_john.passwd='test123'
>>> db.session.add(user_john)
>>> db.session.commit()

2 Replies to “flask_sqlalchemy insert or update record”

  1. import os
    from os import path
    from flask import jsonify, send_from_directory, render_template, session, url_for, flash, escape, redirect, request, abort, make_response, g
    import ldap3
    import subprocess
    from functools import wraps
    import pathlib
    from datetime import datetime
    from werkzeug.utils import secure_filename
    from app import app
    import base64
    from flask_sqlalchemy import SQLAlchemy

    import logging
    from logging.handlers import TimedRotatingFileHandler
    from flask import Flask
    from app.config import Config

    app = Flask(__name__)
    app.config.from_object(Config)
    app.config[‘SQLALCHEMY_DATABASE_URI’] = ‘sqlite:///users.sqlite3’
    app.config[‘SQLALCHEMY_TRACK_MODIFICATIONS’] = False
    db = SQLAlchemy(app)

    class User(db.Model):
    __tablename__ = ‘users’
    username = db.Column(db.String(100), primary_key=True)
    passwd = db.Column(db.String(100))
    def __init__(self, username, passwd):
    self.username = username
    self.passwd = passwd
    def __repr__(self):
    return ” % self.username

    logHandler = TimedRotatingFileHandler(filename=”audit.log”, when=’midnight’, backupCount=30)
    logFormatter = logging.Formatter(‘%(asctime)s %(message)s’)
    logHandler.setFormatter( logFormatter )
    logger = logging.getLogger( ‘MyLogger’ )
    logger.addHandler( logHandler )
    logger.setLevel( logging.INFO )

    EXEMPT_METHODS = set([‘OPTIONS’])
    PATH_UPLOADS = app.config[‘PATH_UPLOADS’] if app.config[‘PATH_UPLOADS’][0] == “/” else os.path.join(app.root_path, app.config[‘PATH_UPLOADS’])
    SSH_PASS = “/home/zhub2/sshpass”

    @app.before_request
    def before_request_func():
    g.ip_addr = “127.0.0.1”
    print(“before_request is running!”)

    def api(cmd):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
    stderr=subprocess.PIPE, universal_newlines=True)
    stdout, stderr = p.communicate()
    return stdout

    def remote_ssh(username, passwd, ip_addr, cmd):
    ssh_cmd = SSH_PASS + ” -p ” + passwd + ” ssh ” + username + “@” + ip_addr + ” -o ConnectTimeout=2 -o StrictHostKeyChecking=no -o LogLevel=Error ” + cmd
    return ssh_cmd

    def remote_bash(username, passwd, ip_addr):
    bash_cmd = SSH_PASS + ” -p ” + passwd + ” ssh ” + username + “@” + ip_addr + ” -o ConnectTimeout=2 -o StrictHostKeyChecking=no -o LogLevel=Error bash -s — <./" + username + ".SAMPLE.sh"
    return bash_cmd

    def get_passwd(username):
    session_query = User.query.filter_by(username=username).first()
    passwd = str(session_query.passwd)
    return passwd

    @app.template_filter()
    def b64encode(data):
    return str(base64.urlsafe_b64encode(data.encode("utf-8")), "utf-8")

    @app.template_filter()
    def b64decode(data):
    return str(base64.urlsafe_b64decode(data), "utf-8")

    @app.context_processor
    def inject_user():
    if logged_in():
    return dict(user=session['username'], role=session['role'])
    return dict()

    def logged_in():
    if 'username' in session:
    return True
    return False

    def humanbytes(B):
    'Return the given bytes as a human friendly KB, MB, GB, or TB string'
    B = float(B)
    KB = float(1024)
    MB = float(KB ** 2) # 1,048,576
    GB = float(KB ** 3) # 1,073,741,824
    TB = float(KB ** 4) # 1,099,511,627,776

    if B 1 else ‘ B’)
    elif KB <= B < MB:
    return '{0:.2f} KB'.format(B/KB)
    elif MB <= B < GB:
    return '{0:.2f} MB'.format(B/MB)
    elif GB <= B < TB:
    return '{0:.2f} GB'.format(B/GB)
    elif TB /dev/null 2>&1″ + os.linesep)
    f.write(“sudo su – wasadm” + os.linesep)
    f.write(“export WAS_USER=” + username + os.linesep)
    f.write(“export WAS_PASS=” + passwd + os.linesep)
    f.write(cmd)
    full_cmd = remote_bash(username, passwd, ip_addr)
    logger.info(“%s run command: %s” % (session[‘username’], full_cmd))
    return render_template(“wascontrol.html”, subprocess_output=api(full_cmd))

    @app.route(“/jboss”)
    @login_required
    def jboss():
    return render_template(“jboss.html”)

    @app.route(“/jboss”, methods = [“POST”, “GET”])
    @login_required
    def jboss_after():
    ip_addr = request.form.get(‘ip_addr’)
    g.ip_addr = ip_addr
    cmd_old = request.form.get(‘name’)
    username = session[‘username’]
    passwd = get_passwd(username)
    cmd = “/opt/rh/eap7/root/usr/share/wildfly/bin/jboss-cli.sh –timeout=90000 –connect controller=127.0.0.1:9999 –command='” + cmd_old + “‘”
    with open(username + “.SAMPLE.sh”, “w”) as f:
    f.write(“echo ” + passwd + ” |sudo -S su – jboss >/dev/null 2>&1″ + os.linesep)
    f.write(“sudo su – jboss” + os.linesep)
    f.write(cmd)
    full_cmd = remote_bash(username, passwd, ip_addr)
    print(full_cmd)
    logger.info(“%s run CLI command: %s on %s” % (session[‘username’], cmd_old, ip_addr))
    return render_template(“jboss.html”, subprocess_output=api(full_cmd))

    @app.route(“/pm2”)
    @login_required
    def pm2():
    return render_template(“pm2.html”)

    @app.route(“/pm2”, methods = [“POST”, “GET”])
    @login_required
    def pm2_after():
    ip_addr = request.form.get(‘ip_addr’)
    g.ip_addr = ip_addr
    cmd = request.form.get(‘name’)
    username = session[‘username’]
    passwd = get_passwd(username)
    with open(username + “.SAMPLE.sh”, “w”) as f:
    f.write(“echo ” + passwd + ” |sudo -S su – pm2 >/dev/null 2>&1″ + os.linesep)
    f.write(“sudo su – pm2” + os.linesep)
    f.write(cmd)
    full_cmd = remote_bash(username, passwd, ip_addr)
    print(full_cmd)
    logger.info(“%s run PM2 command: %s on %s” % (session[‘username’], cmd, ip_addr))
    return render_template(“pm2.html”, subprocess_output=api(full_cmd))

    @app.route(“/shell_cmd”)
    @login_required
    def shell_cmd():
    return render_template(“shell_cmd.html”)

    @app.route(“/shell_cmd”, methods = [“POST”, “GET”])
    @login_required
    def shell_cmd_after():
    ip_addr = request.form.get(‘ip_addr’)
    g.ip_addr = ip_addr
    cmd = request.form.get(‘name’)
    passwd = get_passwd(session[‘username’])
    full_cmd = remote_ssh(session[‘username’], passwd, ip_addr, cmd)
    print(full_cmd)
    logger.info(“%s run command: %s” % (session[‘username’], full_cmd))
    return render_template(“shell_cmd.html”, subprocess_output=api(full_cmd))

    @app.route(“/home”)
    @login_required
    def home():
    if ‘userfiles’ not in session:
    return redirect(url_for(‘logout’))
    pathlib.Path(session[‘userfiles’]).mkdir(exist_ok=True)
    files_list = get_files(session[‘userfiles’])
    return render_template(‘home.htm’, files = files_list)

    @app.route(“/home-admin”)
    @login_required
    def home_admin():
    if session[‘role’] != ‘admin’:
    return redirect(url_for(‘logout’))
    tenants_list = get_files(PATH_UPLOADS)
    for i in range(len(tenants_list)):
    tenants_list[i][‘files’] = get_files(PATH_UPLOADS+’/’+tenants_list[i][‘name’])
    return render_template(‘home-admin.htm’, tenants = tenants_list)

    def allowed_file(filename):
    return ‘.’ in filename and \
    filename.rsplit(‘.’, 1)[1].lower() in app.config[‘ALLOWED_EXTENSIONS’]

    @app.route(“/upload”, methods=[“GET”, “POST”])
    @login_required
    def upload():
    if request.method == “POST”:
    file = request.files[“file”]

    if file.filename == ”:
    return make_response(jsonify({“alert”: “warning”,”message”: “No file selected”}), 200)
    if not allowed_file(file.filename):
    return make_response(jsonify({“alert”: “warning” ,”message”: “Extension not allowed”}), 200)

    if file:
    try:
    filename = secure_filename(file.filename)
    file.save(os.path.join(session[‘userfiles’], filename))
    return make_response(jsonify({“alert”: “success”, “message”: “File uploaded”}), 200)
    except:
    return make_response(jsonify({“alert”: “danger”, “message”: “File uploading error”}), 200)

    return render_template(“upload.htm”, extensions = app.config[‘ALLOWED_EXTENSIONS’])

    @app.route(“/delete/”)
    @login_required
    def delete(filename):
    if ‘userfiles’ not in session:
    return redirect(url_for(‘logout’))
    file = session[‘userfiles’]+’/’+str(base64.urlsafe_b64decode(filename), “utf-8”)
    if os.path.exists(file):
    os.remove(file)
    return redirect(url_for(‘home’))

    @app.route(“/delete-admin//”)
    @login_required
    def delete_admin(tenant,filename):
    if session[‘role’] != ‘admin’:
    return redirect(url_for(‘logout’))
    file = PATH_UPLOADS+’/’+tenant+’/’+str(base64.urlsafe_b64decode(filename), “utf-8”)
    if os.path.exists(file):
    os.remove(file)
    return redirect(url_for(‘home_admin’))

    @app.route(‘/files//’)
    def files(dirname,filename):
    return send_from_directory(PATH_UPLOADS+’/’+dirname, filename)

    @app.after_request
    def after_request_func(response):
    ip_addr = g.ip_addr
    return response

    if __name__ == “__main__”:
    db.create_all()
    app.run(“0.0.0.0”, 8080, debug=True)

Leave a Reply

Your email address will not be published. Required fields are marked *