Sqlalchemy project – example of architecture
Author: Rafal MarguzewiczPublished:
Categories: Python3
Tags: architecture • sqlalchemy
First files Shema.py contains all class with schemas tables. This example is pseudocode
packagename/models/db.py
from sqlalchemy import create_engine
from packagename.config.config_parser import settings
'''debug mode ''
if settings['db']['debug']:
from packagename.logs.sql import debug_mode
debug_mode(settings['db']['logger'])
db = settings['db']
engine = create_engine(f'postgresql+psycopg2://{db["user"]}:{db["password"]}@{db["host"]}:{db["port"]}/{db["dbname"]}', echo=db['echo'])
connect = engine.connect()
packagename/models/schema.py
from packagename.models.db import engine
from sqlalchemy import insert, update, Table, MetaData, Column, text, ForeignKey, UniqueConstraint
from sqlalchemy import Integer, DateTime, String, Numeric, SmallInteger, Date
metadata = MetaData(bind=engine)
connect = engine.connect()
class Main(object):
conn = engine.connect()
def create_table(self):
self.table.create(engine, checkfirst=True)
def drop_table(self):
self.table.drop(engine, checkfirst=True)
def truncate_table(self):
self.table.delete().execute()
def find_all(self):
sql = text(f"select * from {self.table_name}")
return connect.execute(sql).fetchall()
def insert(self, data):
try:
insert(self.table).values(data).execute()
except Exception as e:
raise e
def update(self, data={}):
try:
update(self.table).values(**data).execute()
except Exception as e:
raise e
class Class1(Main):
table_name = 'table1'
table = Table(table_name, metadata,
Column('id', Integer, primary_key=True),
Column('rssi', SmallInteger),
Column('status', SmallInteger, default=0),
Column('lat', Numeric),
Column('lon', Numeric),
Column('internaltemp', Numeric, nullable=False),
Column('datetime', DateTime(timezone=False), nullable=False))
class Class2(Main):
table_name = 'table2'
table = Table(table_name, metadata,
Column('id', Integer, primary_key=True),
Column('names', String(22), unique=True),
Column('address', String(90)))
class Class3(Main):
table_name = 'class3'
table = Table(table_name, metadata,
Column('id', Integer, primary_key=True, unique=True),
Column('class_id', Integer, ForeignKey("table1.id")),
Column('name', String(90), comment='Name owner'),
Column('voltage', Numeric),
Column('active', SmallInteger, default=0),
Column('last_activity', DateTime(timezone=False)),
Column('created', Date, nullable=False, server_default=text("current_timestamp")),
Column('updated', Date),
UniqueConstraint('id', 'class_id', name='unique_w_in_gateway'))
if __name__ == "__main__":
metadata.drop_all()
#create tables
metadata.create_all()
Next: inheriting classes
packagename/models/class1.py
from sqlalchemy import text
from packagename.models.shema import connect, Class1
class Class1(Class1):
def count(self):
"""Number rows
:returns: int
"""
sql = text(f"select count(1) from table")
return connect.execute(sql).fetchone()
packagename/models/class2.py
from packagename.models.db import engine, connect
from packagename.models.shema import Class2
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
class Class2(Class2):
def upsert(self, data):
conn = engine.connect()
stmt = insert(self.table).values(**data)
stmt = stmt.on_conflict_do_update(index_elements=['id'], set_=data)
try:
conn.execute(stmt)
conn.close()
except Exception as e:
raise e
def last_activity(self, id=None):
sql = text(f"select last_activity from {self.table_name} where id = '{id}'")
return connect.execute(sql).fetchone()[0]
packagename/models/class3.py
from sqlalchemy import text, update
from sqlalchemy.dialects.postgresql import insert
from packagename.models.db import engine
from packagename.models.shema import connect, Class3
from packagename.config.config_parser import config_yaml
settings = config_yaml()['worker']
time_out = settings['timeout_window'] or 300
class Class3(Class3):
def parentUse(self, data=None):
super().insert({'gateway_id': data['gateway_id'], 'wristband_id': data['id'], 'date_from': data['date_from']})
def closeWindow(self, id, last_activity):
update(self.table) \
.values(date=last_activity) \
.where(self.table.c.id==id) \
.where(self.table.c.name==None).execute()
def closeAllWindows(self):
opened = self.openedWindows()
if opened:
print(opened)
for i in opened:
self.closeWindow(i[0], i[1])
def lastActivity(self, wristband_id=None):
"""Check last 5 min whether the band is active
:wristnad_id: str
:returns: boolean
"""
sql = text(f"select last_activity from wristbands where wristband_id = '{wristband_id}' limit 1")
return connect.execute(sql).fetchone()[0]
def currentTime(self):
"""Check last 5 min whether the band is active
:wristnad_id: str
:returns: boolean
"""
sql = text(f"select current_time")
connect = engine.connect()
return connect.execute(sql).fetchone()[0]
def isOpenWindow(self, wristband_id=None):
"""Check last 5 min whether the band is active
:wristnad_id: str
:returns: boolean
"""
where = '' if wristband_id is None else f" wristband_id = '{wristband_id}' and"
sql = text(f"select wristband_id from {self.table_name} where{where} date_to is NULL")
return connect.execute(sql).fetchall()
def openedWindows(self, time=time_out):
"""Select all wristband_id with open window and close it
:time: int
"""
connect = engine.connect()
sql = text(f'''
SELECT h.wristband_id, w.last_activity
FROM history h, wristbands w where h.wristband_id=w.id and h.date_to is NULL and w.last_activity < (current_timestamp - interval '{time}' second)''')
The fallowing code you can use to save time of executions of statements. Only longer than 200ms
packagename/logs/sql.py
from time import time
from sqlalchemy import event
from sqlalchemy.engine import Engine
from telemetry.logs.log import setup_logger
def debug_mode(settings):
lo = setup_logger(settings)
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time())
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
t = time() - conn.info['query_start_time'].pop(-1)
if t > 0.002:
lo.debug(f"{t:f} {statement} {parameters}")
packagename/logs/sql.py
#date | level | time execution | statement | params
2018-07-24 11:49:36,431|DEBUG|0.002100 select wristband_id from history where wristband_id = 'BB:BB:BB:45:01' and date_to is NULL {}
2018-07-24 11:49:39,434|DEBUG|0.002064 select wristband_id from history where wristband_id = 'BB:BB:BB:51:01' and date_to is NULL {}
2018-07-24 11:50:10,983|DEBUG|0.002095 select wristband_id from history where wristband_id = 'BB:BB:BB:27:01' and date_to is NULL {}
2018-07-24 11:50:21,617|DEBUG|0.002792 UPDATE history SET date_to=%(date_to)s WHERE history.wristband_id = %(wristband_id_1)s AND history.date_to IS NULL {'date_to': datetime.datetime(2018, 7, 24, 11, 49, 6), 'wristband_id_1': 'BB:BB:BB:73:02'}
SELECT h.wristband_id, w.last_activity
FROM history h, wristbands w where h.wristband_id=w.id and h.date_to is NULL and w.last_activity < (current_timestamp - interval '60' second) {}
Use
packagename/workers/worker.py
from telemetry.models.class2 import Class1
from telemetry.models.class2 import Class2
from telemetry.models.wristbands import Wristbands as Model
Class2().closeWindow(1, '2018-01-01')
Model().upsert(dict)
list_of_dicts = [{},{}]
Class1().insert(list_of_dicts)
Popular search terms:
| |
Wow, very interesting concepts