From 98eaa99820fd3bfbd82e17088eb996b63b015f71 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 11 Apr 2022 18:34:32 -0500 Subject: [PATCH] bug fixes and enhancements, added console output(for logging) --- bin/transport | 167 ++++++------------------------------------ setup.py | 4 +- transport/__init__.py | 40 +++++----- transport/common.py | 24 ++++++ transport/disk.py | 12 ++- 5 files changed, 81 insertions(+), 166 deletions(-) diff --git a/bin/transport b/bin/transport index 6680f64..7200e72 100755 --- a/bin/transport +++ b/bin/transport @@ -53,148 +53,29 @@ if len(sys.argv) > 1: i += 2 -class Post(Process): - def __init__(self,**args): - super().__init__() - - if 'provider' not in args['target'] : - self.PROVIDER = args['target']['type'] - self.writer = transport.factory.instance(**args['target']) - else: - self.PROVIDER = args['target']['provider'] - args['target']['context'] = 'write' - self.store = args['target'] - # self.writer = transport.instance(**args['target']) - # - # If the table doesn't exists maybe create it ? - # - self.rows = args['rows'].fillna('') - - - def run(self): - _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows - ltypes = self.rows.dtypes.values - columns = self.rows.dtypes.index.tolist() - # if not self.writer.has() : - - - # self.writer.make(fields=columns) - # self.log(module='write',action='make-table',input={"name":self.writer.table}) - for name in columns : - if _info[name].dtype in ['int32','int64','int','float','float32','float64'] : - value = 0 - else: - value = '' - _info[name] = _info[name].fillna(value) - writer = transport.factory.instance(**self.store) - writer.write(_info) - writer.close() - - -class ETL (Process): - def __init__(self,**_args): - super().__init__() - - self.name = _args['id'] - if 'provider' not in _args['source'] : - #@deprecate - self.reader = transport.factory.instance(**_args['source']) - else: - # - # This is the new interface - _args['source']['context'] = 'read' - - self.reader = transport.instance(**_args['source']) - # - # do we have an sql query provided or not .... - # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None - self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None - self._oargs = _args['target'] #transport.factory.instance(**_args['target']) - self.JOB_COUNT = _args['jobs'] - self.jobs = [] - # self.logger = transport.factory.instance(**_args['logger']) - def log(self,**_args) : - _args['name'] = self.name - print (_args) - def run(self): - if self.cmd : - idf = self.reader.read(**self.cmd) - else: - idf = self.reader.read() - idf = pd.DataFrame(idf) - # idf = idf.replace({np.nan: None}, inplace = True) - - idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] - self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) - - # - # writing the data to a designated data source - # - try: - - - self.log(module='write',action='partitioning') - rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) - - # - # @TODO: locks - for i in np.arange(self.JOB_COUNT) : - _id = 'segment # '.join([str(i),' ',self.name]) - indexes = rows[i] - segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') - if segment.shape[0] == 0 : - continue - proc = Post(target = self._oargs,rows = segment,name=_id) - self.jobs.append(proc) - proc.start() - - self.log(module='write',action='working',segment=_id) - # while poc : - # proc = [job for job in proc if job.is_alive()] - # time.sleep(1) - except Exception as e: - print (e) - - def is_done(self): - self.jobs = [proc for proc in self.jobs if proc.is_alive()] - return len(self.jobs) == 0 -def apply(_args) : - """ - This function will apply a set of commands against a data-store. The expected structure is as follows : - {"store":...,"apply":[]} - """ - handler = transport.factory.instance(**_args['store']) - for cmd in _args['apply'] : - handler.apply(cmd) - handler.close() if __name__ == '__main__' : - _info = json.loads(open (SYS_ARGS['config']).read()) - index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None - procs = [] - for _config in _info : - if 'source' in SYS_ARGS : - _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} - - _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) - etl = ETL (**_config) - if index is None: - - etl.start() - procs.append(etl) - - elif _info.index(_config) == index : - - # print (_config) - procs = [etl] - etl.start() - break # - # - N = len(procs) - while procs : - procs = [thread for thread in procs if not thread.is_done()] - if len(procs) < N : - print (["Finished ",(N-len(procs)), " remaining ", len(procs)]) - N = len(procs) - time.sleep(1) - print ("We're done !!") \ No newline at end of file + # Load information from the file ... + if 'help' in SYS_ARGS : + print (__doc__) + else: + try: + _info = json.loads(open(SYS_ARGS['config']).read()) + if 'index' in SYS_ARGS : + _index = int(SYS_ARGS['index']) + _info = [_item for _item in _info if _info.index(_item) == _index] + pass + + procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs']) + jobs = transport.factory.instance(provider='etl',info=_info,procs=procs) + while jobs : + x = len(jobs) + jobs = [_job for _job in jobs if _job.is_alive()] + if x != len(jobs) : + print ([len(jobs),'... jobs running']) + time.sleep(1) + except Exception as e: + + print (e) + + \ No newline at end of file diff --git a/setup.py b/setup.py index 971f538..288e3f7 100644 --- a/setup.py +++ b/setup.py @@ -8,12 +8,12 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.4.6", + "version":"1.4.8", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','sqlalchemy','pandas','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pymongo','sqlalchemy','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : diff --git a/transport/__init__.py b/transport/__init__.py index 1feca91..d21e412 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -28,22 +28,24 @@ import importlib import sys import sqlalchemy if sys.version_info[0] > 2 : - from transport.common import Reader, Writer #, factory - from transport import disk + from transport.common import Reader, Writer,Console #, factory + from transport import disk - from transport import s3 as s3 - from transport import rabbitmq as queue - from transport import couch as couch - from transport import mongo as mongo - from transport import sql as sql + from transport import s3 as s3 + from transport import rabbitmq as queue + from transport import couch as couch + from transport import mongo as mongo + from transport import sql as sql + from transport import etl as etl else: - from common import Reader, Writer #, factory - import disk - import queue - import couch - import mongo - import s3 - import sql + from common import Reader, Writer,Console #, factory + import disk + import queue + import couch + import mongo + import s3 + import sql + import etl import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq @@ -51,9 +53,12 @@ import nzpy as nz #--- netezza drivers import os + class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { + "etl":{"class":{"read":etl.instance}}, + "console":{"class":{"write":Console,"read":Console}}, "file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}}, "sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}}, "postgresql":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}}, @@ -140,8 +145,9 @@ def instance(**_args): # # Let us try to establish an sqlalchemy wrapper try: + host = '' - if provider not in ['bigquery','mongodb','couchdb','sqlite'] : + if provider not in ['bigquery','mongodb','couchdb','sqlite','console','etl','file'] : # # In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery username = args['username'] if 'username' in args else '' @@ -159,7 +165,7 @@ def instance(**_args): account = '' host = '' database = args['path'] if 'path' in args else args['database'] - if provider not in ['mongodb','couchdb','bigquery'] : + if provider not in ['mongodb','couchdb','bigquery','console','etl','file'] : uri = ''.join([provider,"://",account,host,'/',database]) e = sqlalchemy.create_engine (uri,future=True) @@ -170,6 +176,6 @@ def instance(**_args): except Exception as e: print (e) - return pointer(**args) + return pointer(**args) return None diff --git a/transport/common.py b/transport/common.py index f706e52..377d9a6 100644 --- a/transport/common.py +++ b/transport/common.py @@ -21,6 +21,7 @@ __author__ = 'The Phi Technology' import numpy as np import json import importlib +from multiprocessing import RLock # import couch # import mongo class IO: @@ -89,6 +90,29 @@ class ReadWriter(Reader,Writer) : This class implements the read/write functions aggregated """ pass +class Console(Writer): + lock = RLock() + def __init__(self,**_args): + self.lock = _args['lock'] if 'lock' in _args else False + self.info = self.write + self.debug = self.write + self.log = self.write + pass + def write (self,info,**_args): + if self.lock : + Console.lock.acquire() + try: + if type(info) == list: + for row in info : + print (row) + else: + print (info) + except Exception as e : + print (e) + finally: + if self.lock : + Console.lock.release() + # class factory : # @staticmethod # def instance(**args): diff --git a/transport/disk.py b/transport/disk.py index 14bb8a0..16e57de 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -21,14 +21,17 @@ class DiskReader(Reader) : """ Reader.__init__(self) - self.path = params['path'] ; + self.path = params['path'] if 'path' in params else None self.delimiter = params['delimiter'] if 'delimiter' in params else ',' + def isready(self): return os.path.exists(self.path) + def meta(self,**_args): + return [] def read(self,**args): _path = self.path if 'path' not in args else args['path'] _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter'] - return pd.read_csv(self.path,delimiter=self.delimiter) + return pd.read_csv(_path,delimiter=self.delimiter) def stream(self,**args): """ This function reads the rows from a designated location on disk @@ -84,15 +87,16 @@ class DiskWriter(Writer): self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys()) self.cache['meta']['rows'] += 1 return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n" - def write(self,info): + def write(self,info,**_args): """ This function writes a record to a designated file @param label @param row row to be written """ try: + _mode = 'a' if 'overwrite' not in _args else 'w' DiskWriter.THREAD_LOCK.acquire() - f = open(self.path,'a') + f = open(self.path,_mode) if self.delimiter : if type(info) == list : for row in info :