data-transport/transport/disk.py

251 lines
7.0 KiB
Python
Raw Normal View History

import os
2020-05-18 02:57:18 +00:00
import sys
2020-05-18 02:57:18 +00:00
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory
else:
from common import Reader,Writer
2023-01-04 15:12:14 +00:00
# import nujson as json
2023-01-04 15:12:55 +00:00
import json
2020-12-11 09:22:49 +00:00
# from threading import Lock
2020-07-28 02:47:02 +00:00
import sqlite3
2020-12-11 08:46:31 +00:00
import pandas as pd
2020-12-11 09:22:49 +00:00
from multiprocessing import Lock
2023-12-09 00:19:46 +00:00
from transport.common import Reader, Writer, IEncoder
class DiskReader(Reader) :
"""
This class is designed to read data from disk (location on hard drive)
@pre : isready() == True
"""
2020-07-28 02:47:02 +00:00
def __init__(self,**params):
"""
@param path absolute path of the file to be read
"""
Reader.__init__(self)
self.path = params['path'] if 'path' in params else None
2021-11-18 21:21:26 +00:00
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
def isready(self):
return os.path.exists(self.path)
def meta(self,**_args):
return []
2020-05-18 02:57:18 +00:00
def read(self,**args):
2021-11-18 21:21:26 +00:00
_path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
return pd.read_csv(_path,delimiter=self.delimiter)
2021-07-20 00:07:06 +00:00
def stream(self,**args):
"""
2019-12-02 03:32:47 +00:00
This function reads the rows from a designated location on disk
@param size number of rows to be read, -1 suggests all rows
"""
2021-07-20 00:07:06 +00:00
2020-05-18 02:57:18 +00:00
size = -1 if 'size' not in args else int(args['size'])
f = open(self.path,'rU')
i = 1
for row in f:
i += 1
if size == i:
break
if self.delimiter :
2021-07-20 00:07:06 +00:00
yield row.split(self.delimiter)
yield row
f.close()
class DiskWriter(Writer):
2020-07-28 02:47:02 +00:00
"""
This function writes output to disk in a designated location. The function will write a text to a text file
- If a delimiter is provided it will use that to generate a xchar-delimited file
- If not then the object will be dumped as is
"""
2020-07-28 02:47:02 +00:00
THREAD_LOCK = Lock()
def __init__(self,**params):
super().__init__()
self._path = params['path']
2023-11-11 16:30:58 +00:00
self._delimiter = params['delimiter'] if 'delimiter' in params else None
2023-09-30 06:17:35 +00:00
self._mode = 'w' if 'mode' not in params else params['mode']
# def meta(self):
# return self.cache['meta']
# def isready(self):
# """
# This function determines if the class is ready for execution or not
# i.e it determines if the preconditions of met prior execution
# """
# return True
# # p = self.path is not None and os.path.exists(self.path)
# # q = self.name is not None
# # return p and q
# def format (self,row):
# self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
# self.cache['meta']['rows'] += 1
# return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
def write(self,info,**_args):
"""
This function writes a record to a designated file
@param label <passed|broken|fixed|stats>
@param row row to be written
"""
2020-07-28 02:47:02 +00:00
try:
2023-09-30 06:17:35 +00:00
DiskWriter.THREAD_LOCK.acquire()
_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
2023-09-30 06:17:35 +00:00
_path = self._path if 'path' not in _args else _args['path']
_mode = self._mode if 'mode' not in _args else _args['mode']
info.to_csv(_path,index=False,sep=_delim)
pass
2020-07-28 02:47:02 +00:00
except Exception as e:
#
# Not sure what should be done here ...
pass
finally:
DiskWriter.THREAD_LOCK.release()
2022-01-29 17:15:45 +00:00
class SQLite :
def __init__(self,**_args) :
self.path = _args['database'] if 'database' in _args else _args['path']
self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
2020-09-22 18:30:04 +00:00
self.conn.row_factory = sqlite3.Row
2022-01-29 17:15:45 +00:00
self.fields = _args['fields'] if 'fields' in _args else []
def has (self,**_args):
found = False
try:
if 'table' in _args :
table = _args['table']
sql = "SELECT * FROM :table limit 1".replace(":table",table)
_df = pd.read_sql(sql,self.conn)
found = _df.columns.size > 0
except Exception as e:
pass
return found
def close(self):
try:
self.conn.close()
except Exception as e :
print(e)
def apply(self,sql):
try:
if not sql.lower().startswith('select'):
cursor = self.conn.cursor()
cursor.execute(sql)
cursor.close()
self.conn.commit()
else:
return pd.read_sql(sql,self.conn)
except Exception as e:
print (e)
class SQLiteReader (SQLite,DiskReader):
def __init__(self,**args):
super().__init__(**args)
# DiskReader.__init__(self,**args)
# self.path = args['database'] if 'database' in args else args['path']
# self.conn = sqlite3.connect(self.path,isolation_level=None)
# self.conn.row_factory = sqlite3.Row
self.table = args['table'] if 'table' in args else None
2020-09-22 18:30:04 +00:00
def read(self,**args):
if 'sql' in args :
sql = args['sql']
2020-09-26 21:53:33 +00:00
elif 'filter' in args :
2020-09-22 18:30:04 +00:00
sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter'])
sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*")
2021-11-18 21:21:26 +00:00
else:
sql = ' '.join(['SELECT * FROM ',self.table])
if 'limit' in args :
sql = sql + " LIMIT "+args['limit']
2020-09-26 21:53:33 +00:00
return pd.read_sql(sql,self.conn)
2020-09-22 18:30:04 +00:00
def close(self):
try:
self.conn.close()
except Exception as e :
pass
2022-01-29 17:15:45 +00:00
class SQLiteWriter(SQLite,DiskWriter) :
2020-12-11 09:22:49 +00:00
connection = None
LOCK = Lock()
2020-07-28 02:47:02 +00:00
def __init__(self,**args):
"""
:path
:fields json|csv
"""
2022-01-29 17:15:45 +00:00
# DiskWriter.__init__(self,**args)
super().__init__(**args)
self.table = args['table'] if 'table' in args else None
2020-07-28 02:47:02 +00:00
2022-01-29 17:15:45 +00:00
# self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
# self.conn.row_factory = sqlite3.Row
# self.fields = args['fields'] if 'fields' in args else []
2019-12-02 03:23:54 +00:00
2022-06-26 20:48:29 +00:00
if self.fields and not self.isready() and self.table:
2020-07-28 02:47:02 +00:00
self.init(self.fields)
2020-12-11 09:22:49 +00:00
SQLiteWriter.connection = self.conn
2020-07-28 02:47:02 +00:00
def init(self,fields):
self.fields = fields;
sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
cursor = self.conn.cursor()
cursor.execute(sql)
cursor.close()
self.conn.commit()
def isready(self):
try:
sql = "SELECT count(*) FROM sqlite_master where name=':table'"
sql = sql.replace(":table",self.table)
cursor = self.conn.cursor()
r = cursor.execute(sql)
r = r.fetchall()
cursor.close()
2022-06-26 20:48:29 +00:00
return r[0][0] != 0
2020-07-28 02:47:02 +00:00
except Exception as e:
pass
return 0
#
# If the table doesn't exist we should create it
#
def write(self,info,**_args):
2020-07-28 02:47:02 +00:00
"""
"""
2023-11-30 18:05:04 +00:00
#if not self.fields :
# #if type(info) == pd.DataFrame :
# # _columns = list(info.columns)
# #self.init(list(info.keys()))
2020-07-28 02:47:02 +00:00
2022-01-29 17:15:45 +00:00
if type(info) == dict :
2020-07-28 02:47:02 +00:00
info = [info]
2021-12-09 21:25:58 +00:00
elif type(info) == pd.DataFrame :
info = info.fillna('')
2021-12-09 21:25:58 +00:00
info = info.to_dict(orient='records')
2023-11-30 18:40:57 +00:00
if not self.fields :
2023-12-09 00:19:46 +00:00
2023-11-30 18:40:57 +00:00
_rec = info[0]
self.init(list(_rec.keys()))
2023-11-30 18:05:04 +00:00
2020-12-11 09:22:49 +00:00
SQLiteWriter.LOCK.acquire()
try:
2021-12-09 21:25:58 +00:00
2020-12-11 09:22:49 +00:00
cursor = self.conn.cursor()
2021-12-09 21:25:58 +00:00
sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
2020-12-11 09:22:49 +00:00
for row in info :
2021-12-09 21:25:58 +00:00
stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
2023-12-09 00:19:46 +00:00
stream = json.dumps(stream,cls=IEncoder)
stream = stream.replace("[","").replace("]","")
2021-12-09 21:25:58 +00:00
self.conn.execute(sql.replace(":values",stream) )
# cursor.commit()
2020-12-11 09:22:49 +00:00
2021-12-09 21:25:58 +00:00
self.conn.commit()
2020-07-28 02:47:02 +00:00
# print (sql)
2020-12-11 09:22:49 +00:00
except Exception as e :
2021-12-09 21:25:58 +00:00
print (e)
2020-12-11 09:22:49 +00:00
pass
2023-11-30 18:05:04 +00:00
SQLiteWriter.LOCK.release()