bug fixes and enhancements, added console output(for logging)

This commit is contained in:
Steve Nyemba 2022-04-11 18:34:32 -05:00
parent 6c406407b2
commit 98eaa99820
5 changed files with 81 additions and 166 deletions

View File

@ -53,148 +53,29 @@ if len(sys.argv) > 1:
i += 2
class Post(Process):
def __init__(self,**args):
super().__init__()
if 'provider' not in args['target'] :
self.PROVIDER = args['target']['type']
self.writer = transport.factory.instance(**args['target'])
else:
self.PROVIDER = args['target']['provider']
args['target']['context'] = 'write'
self.store = args['target']
# self.writer = transport.instance(**args['target'])
#
# If the table doesn't exists maybe create it ?
#
self.rows = args['rows'].fillna('')
def run(self):
_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
ltypes = self.rows.dtypes.values
columns = self.rows.dtypes.index.tolist()
# if not self.writer.has() :
# self.writer.make(fields=columns)
# self.log(module='write',action='make-table',input={"name":self.writer.table})
for name in columns :
if _info[name].dtype in ['int32','int64','int','float','float32','float64'] :
value = 0
else:
value = ''
_info[name] = _info[name].fillna(value)
writer = transport.factory.instance(**self.store)
writer.write(_info)
writer.close()
class ETL (Process):
def __init__(self,**_args):
super().__init__()
self.name = _args['id']
if 'provider' not in _args['source'] :
#@deprecate
self.reader = transport.factory.instance(**_args['source'])
else:
#
# This is the new interface
_args['source']['context'] = 'read'
self.reader = transport.instance(**_args['source'])
#
# do we have an sql query provided or not ....
# self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None
self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None
self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
self.JOB_COUNT = _args['jobs']
self.jobs = []
# self.logger = transport.factory.instance(**_args['logger'])
def log(self,**_args) :
_args['name'] = self.name
print (_args)
def run(self):
if self.cmd :
idf = self.reader.read(**self.cmd)
else:
idf = self.reader.read()
idf = pd.DataFrame(idf)
# idf = idf.replace({np.nan: None}, inplace = True)
idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
#
# writing the data to a designated data source
#
try:
self.log(module='write',action='partitioning')
rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT)
#
# @TODO: locks
for i in np.arange(self.JOB_COUNT) :
_id = 'segment # '.join([str(i),' ',self.name])
indexes = rows[i]
segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
if segment.shape[0] == 0 :
continue
proc = Post(target = self._oargs,rows = segment,name=_id)
self.jobs.append(proc)
proc.start()
self.log(module='write',action='working',segment=_id)
# while poc :
# proc = [job for job in proc if job.is_alive()]
# time.sleep(1)
except Exception as e:
print (e)
def is_done(self):
self.jobs = [proc for proc in self.jobs if proc.is_alive()]
return len(self.jobs) == 0
def apply(_args) :
"""
This function will apply a set of commands against a data-store. The expected structure is as follows :
{"store":...,"apply":[]}
"""
handler = transport.factory.instance(**_args['store'])
for cmd in _args['apply'] :
handler.apply(cmd)
handler.close()
if __name__ == '__main__' :
_info = json.loads(open (SYS_ARGS['config']).read())
index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None
procs = []
for _config in _info :
if 'source' in SYS_ARGS :
_config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
_config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
etl = ETL (**_config)
if index is None:
etl.start()
procs.append(etl)
elif _info.index(_config) == index :
# print (_config)
procs = [etl]
etl.start()
break
#
#
N = len(procs)
while procs :
procs = [thread for thread in procs if not thread.is_done()]
if len(procs) < N :
print (["Finished ",(N-len(procs)), " remaining ", len(procs)])
N = len(procs)
time.sleep(1)
print ("We're done !!")
# Load information from the file ...
if 'help' in SYS_ARGS :
print (__doc__)
else:
try:
_info = json.loads(open(SYS_ARGS['config']).read())
if 'index' in SYS_ARGS :
_index = int(SYS_ARGS['index'])
_info = [_item for _item in _info if _info.index(_item) == _index]
pass
procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs'])
jobs = transport.factory.instance(provider='etl',info=_info,procs=procs)
while jobs :
x = len(jobs)
jobs = [_job for _job in jobs if _job.is_alive()]
if x != len(jobs) :
print ([len(jobs),'... jobs running'])
time.sleep(1)
except Exception as e:
print (e)

View File

@ -8,12 +8,12 @@ def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {
"name":"data-transport",
"version":"1.4.6",
"version":"1.4.8",
"author":"The Phi Technology LLC","author_email":"info@the-phi.com",
"license":"MIT",
"packages":["transport"]}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pymongo','sqlalchemy','pandas','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
args["install_requires"] = ['pymongo','sqlalchemy','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport']
if sys.version_info[0] == 2 :

View File

@ -28,22 +28,24 @@ import importlib
import sys
import sqlalchemy
if sys.version_info[0] > 2 :
from transport.common import Reader, Writer #, factory
from transport import disk
from transport.common import Reader, Writer,Console #, factory
from transport import disk
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import sql as sql
from transport import s3 as s3
from transport import rabbitmq as queue
from transport import couch as couch
from transport import mongo as mongo
from transport import sql as sql
from transport import etl as etl
else:
from common import Reader, Writer #, factory
import disk
import queue
import couch
import mongo
import s3
import sql
from common import Reader, Writer,Console #, factory
import disk
import queue
import couch
import mongo
import s3
import sql
import etl
import psycopg2 as pg
import mysql.connector as my
from google.cloud import bigquery as bq
@ -51,9 +53,12 @@ import nzpy as nz #--- netezza drivers
import os
class factory :
TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
PROVIDERS = {
"etl":{"class":{"read":etl.instance}},
"console":{"class":{"write":Console,"read":Console}},
"file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}},
"sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}},
"postgresql":{"port":5432,"host":"localhost","database":os.environ['USER'],"driver":pg,"default":{"type":"VARCHAR"}},
@ -140,8 +145,9 @@ def instance(**_args):
#
# Let us try to establish an sqlalchemy wrapper
try:
host = ''
if provider not in ['bigquery','mongodb','couchdb','sqlite'] :
if provider not in ['bigquery','mongodb','couchdb','sqlite','console','etl','file'] :
#
# In these cases we are assuming RDBMS and thus would exclude NoSQL and BigQuery
username = args['username'] if 'username' in args else ''
@ -159,7 +165,7 @@ def instance(**_args):
account = ''
host = ''
database = args['path'] if 'path' in args else args['database']
if provider not in ['mongodb','couchdb','bigquery'] :
if provider not in ['mongodb','couchdb','bigquery','console','etl','file'] :
uri = ''.join([provider,"://",account,host,'/',database])
e = sqlalchemy.create_engine (uri,future=True)
@ -170,6 +176,6 @@ def instance(**_args):
except Exception as e:
print (e)
return pointer(**args)
return pointer(**args)
return None

View File

@ -21,6 +21,7 @@ __author__ = 'The Phi Technology'
import numpy as np
import json
import importlib
from multiprocessing import RLock
# import couch
# import mongo
class IO:
@ -89,6 +90,29 @@ class ReadWriter(Reader,Writer) :
This class implements the read/write functions aggregated
"""
pass
class Console(Writer):
lock = RLock()
def __init__(self,**_args):
self.lock = _args['lock'] if 'lock' in _args else False
self.info = self.write
self.debug = self.write
self.log = self.write
pass
def write (self,info,**_args):
if self.lock :
Console.lock.acquire()
try:
if type(info) == list:
for row in info :
print (row)
else:
print (info)
except Exception as e :
print (e)
finally:
if self.lock :
Console.lock.release()
# class factory :
# @staticmethod
# def instance(**args):

View File

@ -21,14 +21,17 @@ class DiskReader(Reader) :
"""
Reader.__init__(self)
self.path = params['path'] ;
self.path = params['path'] if 'path' in params else None
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
def isready(self):
return os.path.exists(self.path)
def meta(self,**_args):
return []
def read(self,**args):
_path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
return pd.read_csv(self.path,delimiter=self.delimiter)
return pd.read_csv(_path,delimiter=self.delimiter)
def stream(self,**args):
"""
This function reads the rows from a designated location on disk
@ -84,15 +87,16 @@ class DiskWriter(Writer):
self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
self.cache['meta']['rows'] += 1
return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
def write(self,info):
def write(self,info,**_args):
"""
This function writes a record to a designated file
@param label <passed|broken|fixed|stats>
@param row row to be written
"""
try:
_mode = 'a' if 'overwrite' not in _args else 'w'
DiskWriter.THREAD_LOCK.acquire()
f = open(self.path,'a')
f = open(self.path,_mode)
if self.delimiter :
if type(info) == list :
for row in info :