diff --git a/setup.py b/setup.py index af06eda..8a6a504 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.1.2", + "version":"1.1.6", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/disk.py b/transport/disk.py index 00c4a87..3fbaecc 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -5,13 +5,15 @@ if sys.version_info[0] > 2 : else: from common import Reader,Writer import json +from threading import Lock +import sqlite3 class DiskReader(Reader) : """ This class is designed to read data from disk (location on hard drive) @pre : isready() == True """ - + def __init__(self,**params): """ @param path absolute path of the file to be read @@ -40,12 +42,13 @@ class DiskReader(Reader) : yield row f.close() class DiskWriter(Writer): + """ This function writes output to disk in a designated location. The function will write a text to a text file - If a delimiter is provided it will use that to generate a xchar-delimited file - If not then the object will be dumped as is """ - + THREAD_LOCK = Lock() def __init__(self,**params): Writer.__init__(self) self.cache['meta'] = {'cols':0,'rows':0,'delimiter':None} @@ -81,8 +84,84 @@ class DiskWriter(Writer): @param label @param row row to be written """ - f = open(self.path,'a') - f.write(self.format(info)) - f.close() + try: + DiskWriter.THREAD_LOCK.acquire() + f = open(self.path,'a') + if self.delimiter : + if type(info) == list : + for row in info : + f.write(self.format(row)) + else: + f.write(self.format(info)) + else: + if not type(info) == str : + f.write(json.dumps(info)) + else: + f.write(info) + f.close() + except Exception as e: + # + # Not sure what should be done here ... + pass + finally: + DiskWriter.THREAD_LOCK.release() +class SQLiteWriter(DiskWriter) : + def __init__(self,**args): + """ + :path + :fields json|csv + """ + DiskWriter.__init__(self,**args) + self.table = args['table'] + self.conn = sqlite3.connect(self.path,isolation_level=None) + self.conn.row_factory = sqlite3.Row + self.fields = args['fields'] if 'fields' in args else [] + + if self.fields and not self.isready(): + self.init(self.fields) + + def init(self,fields): + self.fields = fields; + sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"]) + + cursor = self.conn.cursor() + cursor.execute(sql) + cursor.close() + self.conn.commit() + def isready(self): + try: + sql = "SELECT count(*) FROM sqlite_master where name=':table'" + sql = sql.replace(":table",self.table) + cursor = self.conn.cursor() + + r = cursor.execute(sql) + r = r.fetchall() + cursor.close() + + return r[0][0] + except Exception as e: + pass + return 0 + # + # If the table doesn't exist we should create it + # + def write(self,info): + """ + """ + + if not self.fields : + self.init(list(info.keys())) + + if type(info) != list : + info = [info] + cursor = self.conn.cursor() + + + sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"]) + for row in info : + cursor.execute(sql.replace(":values",json.dumps(row))) + # self.conn.commit() + # print (sql) +