Sqlalchemy project – example of architecture

Author: Rafal Marguzewicz
Published:
Categories: Python3
Tags:


First files Shema.py contains all class with schemas tables. This example is pseudocode

packagename/models/db.py


from sqlalchemy import create_engine
from packagename.config.config_parser import settings


'''debug mode ''
if settings['db']['debug']:
    from packagename.logs.sql import debug_mode
    debug_mode(settings['db']['logger'])

db = settings['db']
engine = create_engine(f'postgresql+psycopg2://{db["user"]}:{db["password"]}@{db["host"]}:{db["port"]}/{db["dbname"]}', echo=db['echo'])
connect = engine.connect()

packagename/models/schema.py


from packagename.models.db import engine
from sqlalchemy import insert, update, Table, MetaData, Column, text, ForeignKey, UniqueConstraint
from sqlalchemy import Integer, DateTime, String, Numeric, SmallInteger, Date


metadata = MetaData(bind=engine)
connect = engine.connect()


class Main(object):
    conn = engine.connect()

    def create_table(self):
        self.table.create(engine, checkfirst=True)

    def drop_table(self):
        self.table.drop(engine, checkfirst=True)

    def truncate_table(self):
        self.table.delete().execute()

    def find_all(self):
        sql = text(f"select * from {self.table_name}")
        return connect.execute(sql).fetchall()

    def insert(self, data):
        try:
            insert(self.table).values(data).execute()
        except Exception as e:
            raise e

    def update(self, data={}):
        try:
            update(self.table).values(**data).execute()
        except Exception as e:
            raise e


class Class1(Main):
    table_name = 'table1'
    table = Table(table_name, metadata,
            Column('id', Integer, primary_key=True),
            Column('rssi', SmallInteger),
            Column('status', SmallInteger, default=0),
            Column('lat', Numeric),
            Column('lon', Numeric),
            Column('internaltemp', Numeric, nullable=False),
            Column('datetime', DateTime(timezone=False), nullable=False))


class Class2(Main):
    table_name = 'table2'
    table = Table(table_name, metadata,
            Column('id', Integer, primary_key=True),
            Column('names', String(22), unique=True),
            Column('address', String(90)))


class Class3(Main):
    table_name = 'class3'
    table = Table(table_name, metadata,
            Column('id', Integer, primary_key=True, unique=True),
            Column('class_id', Integer, ForeignKey("table1.id")),
            Column('name', String(90), comment='Name owner'),
            Column('voltage', Numeric),
            Column('active', SmallInteger, default=0),
            Column('last_activity', DateTime(timezone=False)),
            Column('created', Date, nullable=False, server_default=text("current_timestamp")),
            Column('updated', Date),
            UniqueConstraint('id', 'class_id', name='unique_w_in_gateway'))


if __name__ == "__main__":
    metadata.drop_all()
#create tables
    metadata.create_all()

Next: inheriting classes

packagename/models/class1.py


from sqlalchemy import text
from packagename.models.shema import connect, Class1


class Class1(Class1):

    def count(self):
        """Number rows
        :returns: int
        """
        sql = text(f"select count(1) from table")
        return connect.execute(sql).fetchone()

packagename/models/class2.py


from packagename.models.db import engine, connect
from packagename.models.shema import Class2
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert


class Class2(Class2):

    def upsert(self, data):
        conn = engine.connect()
        stmt = insert(self.table).values(**data)
        stmt = stmt.on_conflict_do_update(index_elements=['id'], set_=data)

        try:
            conn.execute(stmt)
            conn.close()
        except Exception as e:
            raise e

    def last_activity(self, id=None):
        sql = text(f"select last_activity from {self.table_name} where id = '{id}'")
        return connect.execute(sql).fetchone()[0]

packagename/models/class3.py


from sqlalchemy import text, update
from sqlalchemy.dialects.postgresql import insert

from packagename.models.db import engine
from packagename.models.shema import connect, Class3
from packagename.config.config_parser import config_yaml

settings = config_yaml()['worker']
time_out = settings['timeout_window'] or 300


class Class3(Class3):

    def parentUse(self, data=None):
        super().insert({'gateway_id': data['gateway_id'], 'wristband_id': data['id'], 'date_from': data['date_from']})

    def closeWindow(self, id, last_activity):
        update(self.table) \
            .values(date=last_activity) \
            .where(self.table.c.id==id) \
            .where(self.table.c.name==None).execute()

    def closeAllWindows(self):
        opened = self.openedWindows()
        if opened:
            print(opened)
            for i in opened:
                self.closeWindow(i[0], i[1])

    def lastActivity(self, wristband_id=None):
        """Check last 5 min whether the band is active
        :wristnad_id: str
        :returns: boolean
        """
        sql = text(f"select last_activity from wristbands where wristband_id = '{wristband_id}' limit 1")
        return connect.execute(sql).fetchone()[0]

    def currentTime(self):
        """Check last 5 min whether the band is active
        :wristnad_id: str
        :returns: boolean
        """
        sql = text(f"select current_time")
        connect = engine.connect()
        return connect.execute(sql).fetchone()[0]

    def isOpenWindow(self, wristband_id=None):
        """Check last 5 min whether the band is active
        :wristnad_id: str
        :returns: boolean
        """
        where = '' if wristband_id is None else f" wristband_id = '{wristband_id}' and"
        sql = text(f"select wristband_id from {self.table_name} where{where} date_to is NULL")
        return connect.execute(sql).fetchall()

    def openedWindows(self, time=time_out):
        """Select all wristband_id with open window and close it
        :time: int
        """
        connect = engine.connect()
        sql = text(f'''
            SELECT h.wristband_id, w.last_activity
            FROM history h, wristbands w where h.wristband_id=w.id and h.date_to is NULL and w.last_activity < (current_timestamp - interval '{time}' second)''')

The fallowing code you can use to save time of executions of statements. Only longer than 200ms
packagename/logs/sql.py


from time import time
from sqlalchemy import event
from sqlalchemy.engine import Engine
from telemetry.logs.log import setup_logger


def debug_mode(settings):
    lo = setup_logger(settings)

    @event.listens_for(Engine, "before_cursor_execute")
    def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        conn.info.setdefault('query_start_time', []).append(time())

    @event.listens_for(Engine, "after_cursor_execute")
    def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        t = time() - conn.info['query_start_time'].pop(-1)
        if t > 0.002:
            lo.debug(f"{t:f} {statement} {parameters}")

packagename/logs/sql.py


#date | level | time execution | statement | params
2018-07-24 11:49:36,431|DEBUG|0.002100 select wristband_id from history where wristband_id = 'BB:BB:BB:45:01' and date_to is NULL {}
2018-07-24 11:49:39,434|DEBUG|0.002064 select wristband_id from history where wristband_id = 'BB:BB:BB:51:01' and date_to is NULL {}
2018-07-24 11:50:10,983|DEBUG|0.002095 select wristband_id from history where wristband_id = 'BB:BB:BB:27:01' and date_to is NULL {}
2018-07-24 11:50:21,617|DEBUG|0.002792 UPDATE history SET date_to=%(date_to)s WHERE history.wristband_id = %(wristband_id_1)s AND history.date_to IS NULL {'date_to': datetime.datetime(2018, 7, 24, 11, 49, 6), 'wristband_id_1': 'BB:BB:BB:73:02'}

            SELECT h.wristband_id, w.last_activity
            FROM history h, wristbands w where h.wristband_id=w.id and h.date_to is NULL and w.last_activity < (current_timestamp - interval '60' second) {}

Use

packagename/workers/worker.py


from telemetry.models.class2 import Class1
from telemetry.models.class2 import Class2
from telemetry.models.wristbands import Wristbands as Model

Class2().closeWindow(1, '2018-01-01')
Model().upsert(dict)

list_of_dicts = [{},{}]
Class1().insert(list_of_dicts)

Sqlalchemy project – example of architecture
0 / 0 vote


Your email address will not be published. Required fields are marked *

Name *
Email *

Read previous post:
Yii2 and MongoDb configuration with PHP7.0-fpm and NGiNX

List to do: - Install MongoDB on Debian/Linux - Install requires MongoDB PHP Extension - Install MongoDB Extension for Yii...

Close