cli etl tool

This commit is contained in:
Steve Nyemba 2021-07-08 17:31:29 -05:00
parent 7c2e945996
commit 19ab510125
4 changed files with 109 additions and 6 deletions

91
bin/transport Normal file
View File

@ -0,0 +1,91 @@
#!/usr/bin/env python
__doc__ = """
(c) 2018 - 2021 data-transport
steve@the-phi.com, The Phi Technology LLC
https://dev.the-phi.com/git/steve/data-transport.git
This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File
Usage :
transport --config <path-to-file.json> --procs <number-procs>
@TODO: Create tables if they don't exist for relational databases
"""
import pandas as pd
import numpy as np
import json
import sys
import transport
import time
from multiprocessing import Process
SYS_ARGS = {}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value and not value.startswith('--'):
SYS_ARGS[key] = value
i += 2
class Post(Process):
def __init__(self,**args):
super().__init__()
self.PROVIDER = args['target']['type']
self.writer = transport.factory.instance(**args['target'])
self.rows = args['rows']
def run(self):
_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
self.writer.write(_info)
self.writer.close()
class ETL (Process):
def __init__(self,**_args):
super().__init__()
self.name = _args['id']
self.reader = transport.factory.instance(**_args['source'])
self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
self.JOB_COUNT = _args['jobs']
# self.logger = transport.factory.instance(**_args['logger'])
def log(self,**_args) :
_args['name'] = self.name
print (_args)
def run(self):
idf = self.reader.read()
idf = pd.DataFrame(idf)
idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
self.log(rows=idf.shape[0],cols=idf.shape[1])
#
# writing the data to a designated data source
#
try:
self.log(module='write',action='partitioning')
rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT)
jobs = []
for i in rows :
segment = idf.loc[i,:].to_dict(orient='records')
proc = Post(target = self._oargs,rows = segment)
jobs.append(proc)
proc.start()
self.log(module='write',action='working ...')
while jobs :
jobs = [proc for proc in jobs if proc.is_alive()]
time.sleep(2)
self.log(module='write',action='completed')
except Exception as e:
print (e)
if __name__ == '__main__' :
_config = json.loads(open (SYS_ARGS['config']).read())
_config['jobs'] = 10 if 'jobs' not in SYS_ARGS else SYS_ARGS['jobs']
for _config in _info :
etl = ETL (**_config)
etl.start()

View File

@ -8,14 +8,14 @@ def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = { args = {
"name":"data-transport", "name":"data-transport",
"version":"1.3.8.6.1", "version":"1.3.8.8",
"author":"The Phi Technology LLC","author_email":"info@the-phi.com", "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
"license":"MIT", "license":"MIT",
"packages":["transport"]} "packages":["transport"]}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pymongo','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["install_requires"] = ['pymongo','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport']
if sys.version_info[0] == 2 : if sys.version_info[0] == 2 :
args['use_2to3'] = True args['use_2to3'] = True
args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import'] args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']

View File

@ -42,7 +42,8 @@ The configuration for the data-store is as follows :
} }
""" """
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
import numpy as np import pandas as pd
import numpy as np
import json import json
import importlib import importlib
import sys import sys
@ -97,6 +98,9 @@ class factory :
print(['Error ',e]) print(['Error ',e])
return anObject return anObject
import time
# class Reader: # class Reader:
# def __init__(self): # def __init__(self):
# self.nrows = 0 # self.nrows = 0

View File

@ -87,13 +87,14 @@ class SQLRW :
# Executing a command i.e no expected return values ... # Executing a command i.e no expected return values ...
cursor.execute(_sql) cursor.execute(_sql)
self.conn.commit() self.conn.commit()
except Exception as e :
print (e)
finally: finally:
self.conn.commit() self.conn.commit()
cursor.close() cursor.close()
def close(self): def close(self):
try: try:
self.connect.close() self.conn.close()
except Exception as error : except Exception as error :
print (error) print (error)
pass pass
@ -112,6 +113,12 @@ class SQLReader(SQLRW,Reader) :
if 'limit' in _args : if 'limit' in _args :
_sql = _sql + " LIMIT "+str(_args['limit']) _sql = _sql + " LIMIT "+str(_args['limit'])
return self.apply(_sql) return self.apply(_sql)
def close(self) :
try:
self.conn.close()
except Exception as error :
print (error)
pass
class SQLWriter(SQLRW,Writer): class SQLWriter(SQLRW,Writer):
def __init__(self,**_args) : def __init__(self,**_args) :
@ -122,7 +129,7 @@ class SQLWriter(SQLRW,Writer):
# NOTE: Proper data type should be set on the target system if their source is unclear. # NOTE: Proper data type should be set on the target system if their source is unclear.
self._inspect = False if 'inspect' not in _args else _args['inspect'] self._inspect = False if 'inspect' not in _args else _args['inspect']
self._cast = False if 'cast' not in _args else _args['cast'] self._cast = False if 'cast' not in _args else _args['cast']
def init(self,fields): def init(self,fields=None):
if not fields : if not fields :
try: try:
self.fields = pd.read_sql("SELECT * FROM :table LIMIT 1".replace(":table",self.table),self.conn).columns.tolist() self.fields = pd.read_sql("SELECT * FROM :table LIMIT 1".replace(":table",self.table),self.conn).columns.tolist()
@ -192,6 +199,7 @@ class SQLWriter(SQLRW,Writer):
# self.conn.commit() # self.conn.commit()
except Exception as e: except Exception as e:
print(e)
pass pass
finally: finally:
self.conn.commit() self.conn.commit()