diff --git a/bin/transport b/bin/transport index 8edaecc..2225f3b 100755 --- a/bin/transport +++ b/bin/transport @@ -14,19 +14,27 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI Usage : - transport --config --procs -@TODO: Create tables if they don't exist for relational databases -example of configuration : + transport help -- will print this page -1. Move data from a folder to a data-store - transport [--folder ] --config #-- assuming the configuration doesn't have folder - transport --folder --provider -- --table|doc -In this case the configuration should look like : - {folder:..., target:{}} -2. Move data from one source to another - transport --config - {source:{..},target:{..}} or [{source:{..},target:{..}},{source:{..},target:{..}}] - + transport move [index] + path to the configuration file + optional index within the configuration file + +e.g: configuration file (JSON formatted) + - single source to a single target + + {"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"} + "target":{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement"} + } + + - single source to multiple targets + { + "source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"}, + "target":[ + {"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement}, + {"provider":"mongodb","db":"transport-demo","collection":"agreement"} + ] + } """ import pandas as pd @@ -36,51 +44,111 @@ import sys import transport import time from multiprocessing import Process -SYS_ARGS = {} -if len(sys.argv) > 1: +import typer +import os +from transport import etl +from transport import providers + +# SYS_ARGS = {} +# if len(sys.argv) > 1: - N = len(sys.argv) - for i in range(1,N): - value = None - if sys.argv[i].startswith('--'): - key = sys.argv[i][2:] #.replace('-','') - SYS_ARGS[key] = 1 - if i + 1 < N: - value = sys.argv[i + 1] = sys.argv[i+1].strip() - if key and value and not value.startswith('--'): - SYS_ARGS[key] = value +# N = len(sys.argv) +# for i in range(1,N): +# value = None +# if sys.argv[i].startswith('--'): +# key = sys.argv[i][2:] #.replace('-','') +# SYS_ARGS[key] = 1 +# if i + 1 < N: +# value = sys.argv[i + 1] = sys.argv[i+1].strip() +# if key and value and not value.startswith('--'): +# SYS_ARGS[key] = value - i += 2 +# i += 2 -if __name__ == '__main__' : - # - # Load information from the file ... - if 'help' in SYS_ARGS : - print (__doc__) - else: - try: - _info = json.loads(open(SYS_ARGS['config']).read()) - if 'index' in SYS_ARGS : - _index = int(SYS_ARGS['index']) - _info = [_item for _item in _info if _info.index(_item) == _index] - pass - elif 'id' in SYS_ARGS : - _info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']] +app = typer.Typer() + +# @app.command() +def help() : + print (__doc__) +def wait(jobs): + while jobs : + jobs = [thread for thread in jobs if thread.is_alive()] + time.sleep(1) + +@app.command() +def move (path,index=None): + + _proxy = lambda _object: _object.write(_object.read()) + if os.path.exists(path): + file = open(path) + _config = json.loads (file.read() ) + file.close() + if index : + _config = _config[ int(index)] + etl.instance(**_config) + else: + etl.instance(_config) + + # + # if type(_config) == dict : + # _object = transport.etl.instance(**_config) + # _proxy(_object) + # else: + # # + # # here we are dealing with a list of objects (long ass etl job) + # jobs = [] + # failed = [] + # for _args in _config : + # if index and _config.index(_args) != index : + # continue + + # _object=transport.etl.instance(**_args) + # thread = Process(target=_proxy,args=(_object,)) + # thread.start() + # jobs.append(thread()) + # if _config.index(_args) == 0 : + # thread.join() + wait(jobs) + +@app.command() +def generate (path:str): + __doc__=""" + + """ + _config = [{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},"target":{"provider":"file","path":"addresses.csv","delimiter":"csv"}}] + file = open(path,'w') + file.write(json.dumps(_config)) + file.close() + +# if __name__ == '__main__' : +# # +# # Load information from the file ... +# if 'help' in SYS_ARGS : +# print (__doc__) +# else: +# try: +# _info = json.loads(open(SYS_ARGS['config']).read()) +# if 'index' in SYS_ARGS : +# _index = int(SYS_ARGS['index']) +# _info = [_item for _item in _info if _info.index(_item) == _index] +# pass +# elif 'id' in SYS_ARGS : +# _info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']] - procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs']) - jobs = transport.factory.instance(provider='etl',info=_info,procs=procs) - print ([len(jobs),' Jobs are running']) - N = len(jobs) - while jobs : - x = len(jobs) - jobs = [_job for _job in jobs if _job.is_alive()] - if x != len(jobs) : - print ([len(jobs),'... jobs still running']) - time.sleep(1) - print ([N,' Finished running']) - except Exception as e: +# procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs']) +# jobs = transport.factory.instance(provider='etl',info=_info,procs=procs) +# print ([len(jobs),' Jobs are running']) +# N = len(jobs) +# while jobs : +# x = len(jobs) +# jobs = [_job for _job in jobs if _job.is_alive()] +# if x != len(jobs) : +# print ([len(jobs),'... jobs still running']) +# time.sleep(1) +# print ([N,' Finished running']) +# except Exception as e: - print (e) +# print (e) - \ No newline at end of file + diff --git a/setup.py b/setup.py index 7eff1e4..254bb5c 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ args = { "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pymongo','sqlalchemy<2.0.0','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : diff --git a/transport/__init__.py b/transport/__init__.py index bbb2e50..e139aa5 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -28,7 +28,7 @@ import importlib import sys import sqlalchemy if sys.version_info[0] > 2 : - from transport.common import Reader, Writer,Console #, factory + # from transport.common import Reader, Writer,Console #, factory from transport import disk from transport import s3 as s3 @@ -97,7 +97,7 @@ class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { "etl":{"class":{"read":etl.instance,"write":etl.instance}}, - "console":{"class":{"write":Console,"read":Console}}, + # "console":{"class":{"write":Console,"read":Console}}, "file":{"class":{"read":disk.DiskReader,"write":disk.DiskWriter}}, "sqlite":{"class":{"read":disk.SQLiteReader,"write":disk.SQLiteWriter}}, "postgresql":{"port":5432,"host":"localhost","database":None,"driver":pg,"default":{"type":"VARCHAR"},"class":{"read":sql.SQLReader,"write":sql.SQLWriter}}, @@ -124,6 +124,9 @@ class factory : # # Legacy code being returned return factory._instance(**_args); + + + else: return instance(**_args) @staticmethod @@ -175,22 +178,31 @@ def instance(**_pargs): file.close() _provider = _args['provider'] - _group = None + _context = list( set(['read','write','listen']) & set(_args.keys()) ) + if _context : + _context = _context[0] + else: + _context = _args['context'] if 'context' in _args else 'read' + # _group = None - for _id in providers.CATEGORIES : - if _provider in providers.CATEGORIES[_id] : - _group = _id - break - if _group : + # for _id in providers.CATEGORIES : + # if _provider in providers.CATEGORIES[_id] : + # _group = _id + # break + # if _group : + + if _provider in providers.PROVIDERS and _context in providers.PROVIDERS[_provider]: - _classPointer = _getClassInstance(_group,**_args) + # _classPointer = _getClassInstance(_group,**_args) + _classPointer = providers.PROVIDERS[_provider][_context] # # Let us reformat the arguments - if 'read' in _args or 'write' in _args : - _args = _args['read'] if 'read' in _args else _args['write'] - _args['provider'] = _provider - if _group == 'sql' : + # if 'read' in _args or 'write' in _args : + # _args = _args['read'] if 'read' in _args else _args['write'] + # _args['provider'] = _provider + # if _group == 'sql' : + if _provider in providers.CATEGORIES['sql'] : _info = _get_alchemyEngine(**_args) _args = dict(_args,**_info) @@ -215,57 +227,68 @@ def _get_alchemyEngine(**_args): This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items :_args arguments passed to the factory {provider and other} """ - #@TODO: Enable authentication files (private_key) - _username = _args['username'] if 'username' in _args else '' - _password = _args['password'] if 'password' in _args else '' - _account = _args['account'] if 'account' in _args else '' - _database = _args['database'] _provider = _args['provider'] - if _username != '': - _account = _username + ':'+_password+'@' - _host = _args['host'] if 'host' in _args else '' - _port = _args['port'] if 'port' in _args else '' - if _provider in providers.DEFAULT : - _default = providers.DEFAULT[_provider] - _host = _host if _host != '' else (_default['host'] if 'host' in _default else '') - _port = _port if _port != '' else (_default['port'] if 'port' in _default else '') - if _port == '': - _port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else '' - # - - if _host != '' and _port != '' : - _fhost = _host+":"+str(_port) #--formatted hostname + _pargs = {} + if _provider == providers.SQLITE3 : + _path = _args['database'] if 'database' in _args else _args['path'] + uri = ''.join([_provider,':///',_path]) + else: - _fhost = _host - # Let us update the parameters we have thus far + + #@TODO: Enable authentication files (private_key) + _username = _args['username'] if 'username' in _args else '' + _password = _args['password'] if 'password' in _args else '' + _account = _args['account'] if 'account' in _args else '' + _database = _args['database'] if 'database' in _args else _args['path'] + + if _username != '': + _account = _username + ':'+_password+'@' + _host = _args['host'] if 'host' in _args else '' + _port = _args['port'] if 'port' in _args else '' + if _provider in providers.DEFAULT : + _default = providers.DEFAULT[_provider] + _host = _host if _host != '' else (_default['host'] if 'host' in _default else '') + _port = _port if _port != '' else (_default['port'] if 'port' in _default else '') + if _port == '': + _port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else '' + # + + if _host != '' and _port != '' : + _fhost = _host+":"+str(_port) #--formatted hostname + else: + _fhost = _host + # Let us update the parameters we have thus far # - uri = ''.join([_provider,"://",_account,_fhost,'/',_database]) + uri = ''.join([_provider,"://",_account,_fhost,'/',_database]) + _pargs = {'host':_host,'port':_port,'username':_username,'password':_password} _engine = sqlalchemy.create_engine (uri,future=True) _out = {'sqlalchemy':_engine} - _pargs = {'host':_host,'port':_port,'username':_username,'password':_password} + for key in _pargs : if _pargs[key] != '' : _out[key] = _pargs[key] return _out +@DeprecationWarning def _getClassInstance(_group,**_args): """ This function returns the class instance we are attempting to instanciate :_group items in providers.CATEGORIES.keys() :_args arguments passed to the factory class """ - if 'read' in _args or 'write' in _args : - _context = 'read' if 'read' in _args else _args['write'] - _info = _args[_context] - else: - _context = _args['context'] if 'context' in _args else 'read' - _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group] - if type(_class) == dict and _args['provider'] in _class: - _class = _class[_args['provider']] + # if 'read' in _args or 'write' in _args : + # _context = 'read' if 'read' in _args else _args['write'] + # _info = _args[_context] + # else: + # _context = _args['context'] if 'context' in _args else 'read' + # _class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group] + # if type(_class) == dict and _args['provider'] in _class: + # _class = _class[_args['provider']] - return _class + # return _class +@DeprecationWarning def __instance(**_args): """ diff --git a/transport/common.py b/transport/common.py index 39df6a3..59f57ea 100644 --- a/transport/common.py +++ b/transport/common.py @@ -93,29 +93,29 @@ class ReadWriter(Reader,Writer) : This class implements the read/write functions aggregated """ pass -class Console(Writer): - lock = RLock() - def __init__(self,**_args): - self.lock = _args['lock'] if 'lock' in _args else False - self.info = self.write - self.debug = self.write - self.log = self.write - pass - def write (self,logs=None,**_args): - if self.lock : - Console.lock.acquire() - try: - _params = _args if logs is None and _args else logs - if type(_params) == list: - for row in _params : - print (row) - else: - print (_params) - except Exception as e : - print (e) - finally: - if self.lock : - Console.lock.release() +# class Console(Writer): +# lock = RLock() +# def __init__(self,**_args): +# self.lock = _args['lock'] if 'lock' in _args else False +# self.info = self.write +# self.debug = self.write +# self.log = self.write +# pass +# def write (self,logs=None,**_args): +# if self.lock : +# Console.lock.acquire() +# try: +# _params = _args if logs is None and _args else logs +# if type(_params) == list: +# for row in _params : +# print (row) +# else: +# print (_params) +# except Exception as e : +# print (e) +# finally: +# if self.lock : +# Console.lock.release() """ diff --git a/transport/etl.py b/transport/etl.py index 9d520d4..dac58c4 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -35,6 +35,9 @@ import json import sys import transport import time +import os + + from multiprocessing import Process SYS_ARGS = {} if len(sys.argv) > 1: @@ -52,199 +55,301 @@ if len(sys.argv) > 1: i += 2 - -class Post(Process): - def __init__(self,**args): - super().__init__() - self.store = args['target'] - if 'provider' not in args['target'] : - pass - self.PROVIDER = args['target']['type'] - # self.writer = transport.factory.instance(**args['target']) - else: - self.PROVIDER = args['target']['provider'] - self.store['context'] = 'write' - # self.store = args['target'] - self.store['lock'] = True - # self.writer = transport.instance(**args['target']) - # - # If the table doesn't exists maybe create it ? - # - self.rows = args['rows'] - # self.rows = args['rows'].fillna('') - - def log(self,**_args) : - if ETL.logger : - ETL.logger.info(**_args) - - def run(self): - _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows - - writer = transport.factory.instance(**self.store) - writer.write(_info) - writer.close() - - -class ETL (Process): - logger = None +class Transporter(Process): + """ + The transporter (Jason Stathem) moves data from one persistant store to another + - callback functions + :onFinish callback function when finished + :onError callback function when an error occurs + :source source data specification + :target destination(s) to move the data to + """ def __init__(self,**_args): super().__init__() - - self.name = _args['id'] if 'id' in _args else 'UNREGISTERED' - # if 'provider' not in _args['source'] : - # #@deprecate - # self.reader = transport.factory.instance(**_args['source']) - # else: - # # - # # This is the new interface - # _args['source']['context'] = 'read' - - # self.reader = transport.instance(**_args['source']) - - # - # do we have an sql query provided or not .... - # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None - # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None - # self._oargs = _args['target'] #transport.factory.instance(**_args['target']) - self._source = _args ['source'] + # self.onfinish = _args['onFinish'] + # self._onerror = _args['onError'] + self._source = _args['source'] self._target = _args['target'] - self._source['context'] = 'read' - self._target['context'] = 'write' + + # + # Let's insure we can support multiple targets + self._target = [self._target] if type(self._target) != list else self._target - self.JOB_COUNT = _args['jobs'] - self.jobs = [] - # self.logger = transport.factory.instance(**_args['logger']) - def log(self,**_args) : - if ETL.logger : - ETL.logger.info(**_args) + pass + def read(self,**_args): + """ + This function + """ + _reader = transport.factory.instance(**self._source) + # + # If arguments are provided then a query is to be executed (not just a table dump) + return _reader.read() if 'args' not in self._source else _reader.read(**self._source['args']) - def run(self): - # if self.cmd : - # idf = self.reader.read(**self.cmd) - # else: - # idf = self.reader.read() - # idf = pd.DataFrame(idf) - # # idf = idf.replace({np.nan: None}, inplace = True) + def _delegate_write(self,_data,**_args): + """ + This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern + :data data-frame or object to be written + """ + for _target in self._target : + if 'write' not in _target : + _target['context'] = 'write' + _target['lock'] = True + else: + _target['write']['lock'] = True + _writer = transport.factory.instance(**_target) + _writer.write(_data,**_args) + if hasattr(_writer,'close') : + _writer.close() + + def write(self,_df,**_args): + """ + """ + SEGMENT_COUNT = 6 + MAX_ROWS = 1000000 + # _df = self.read() + _segments = np.array_split(np.range(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])]) + # _index = 0 + + + for _indexes in _segments : + _fwd_args = {} if not _args else _args + + self._delegate_write(_df.iloc[_indexes],**_fwd_args) + # + # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) + pass - # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] - # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) +def instance(**_args): + _proxy = lambda _agent: _agent.write(_agent.read()) + if 'source' in _args and 'target' in _args : + + _agent = Transporter(**_args) + _proxy(_agent) + + else: + _config = _args['config'] + _items = [Transporter(**_item) for _item in _config ] + _MAX_JOBS = 5 + _items = np.array_split(_items,_MAX_JOBS) + for _batch in _items : + jobs = [] + for _item in _batch : + thread = Process(target=_proxy,args = (_item,)) + thread.start() + jobs.append(thread) + while jobs : + jobs = [thread for thread in jobs if thread.is_alive()] + time.sleep(1) + + pass +# class Post(Process): +# def __init__(self,**args): +# super().__init__() +# self.store = args['target'] +# if 'provider' not in args['target'] : +# pass +# self.PROVIDER = args['target']['type'] +# # self.writer = transport.factory.instance(**args['target']) +# else: +# self.PROVIDER = args['target']['provider'] +# self.store['context'] = 'write' +# # self.store = args['target'] +# self.store['lock'] = True +# # self.writer = transport.instance(**args['target']) +# # +# # If the table doesn't exists maybe create it ? +# # +# self.rows = args['rows'] +# # self.rows = args['rows'].fillna('') + +# def log(self,**_args) : +# if ETL.logger : +# ETL.logger.info(**_args) + +# def run(self): +# _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows + +# writer = transport.factory.instance(**self.store) +# writer.write(_info) +# writer.close() - # - # writing the data to a designated data source - # - try: + +# class ETL (Process): +# logger = None +# def __init__(self,**_args): +# super().__init__() + +# self.name = _args['id'] if 'id' in _args else 'UNREGISTERED' +# # if 'provider' not in _args['source'] : +# # #@deprecate +# # self.reader = transport.factory.instance(**_args['source']) +# # else: +# # # +# # # This is the new interface +# # _args['source']['context'] = 'read' + +# # self.reader = transport.instance(**_args['source']) + +# # +# # do we have an sql query provided or not .... +# # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None +# # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None +# # self._oargs = _args['target'] #transport.factory.instance(**_args['target']) +# self._source = _args ['source'] +# self._target = _args['target'] +# self._source['context'] = 'read' +# self._target['context'] = 'write' + +# self.JOB_COUNT = _args['jobs'] +# self.jobs = [] +# # self.logger = transport.factory.instance(**_args['logger']) +# def log(self,**_args) : +# if ETL.logger : +# ETL.logger.info(**_args) + +# def run(self): +# # if self.cmd : +# # idf = self.reader.read(**self.cmd) +# # else: +# # idf = self.reader.read() +# # idf = pd.DataFrame(idf) +# # # idf = idf.replace({np.nan: None}, inplace = True) + +# # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] +# # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) + +# # +# # writing the data to a designated data source +# # +# try: - _log = {"name":self.name,"rows":{"input":0,"output":0}} - _reader = transport.factory.instance(**self._source) - if 'table' in self._source : - _df = _reader.read() - else: - _df = _reader.read(**self._source['cmd']) - _log['rows']['input'] = _df.shape[0] - # - # Let's write the input data-frame to the target ... - _writer = transport.factory.instance(**self._target) - _writer.write(_df) - _log['rows']['output'] = _df.shape[0] +# _log = {"name":self.name,"rows":{"input":0,"output":0}} +# _reader = transport.factory.instance(**self._source) +# if 'table' in self._source : +# _df = _reader.read() +# else: +# _df = _reader.read(**self._source['cmd']) +# _log['rows']['input'] = _df.shape[0] +# # +# # Let's write the input data-frame to the target ... +# _writer = transport.factory.instance(**self._target) +# _writer.write(_df) +# _log['rows']['output'] = _df.shape[0] - # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) - # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) +# # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) +# # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) - # # - # # @TODO: locks - # for i in np.arange(self.JOB_COUNT) : - # # _id = ' '.join([str(i),' table ',self.name]) - # indexes = rows[i] - # segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') - # _name = "partition-"+str(i) - # if segment.shape[0] == 0 : - # continue +# # # +# # # @TODO: locks +# # for i in np.arange(self.JOB_COUNT) : +# # # _id = ' '.join([str(i),' table ',self.name]) +# # indexes = rows[i] +# # segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') +# # _name = "partition-"+str(i) +# # if segment.shape[0] == 0 : +# # continue - # proc = Post(target = self._oargs,rows = segment,name=_name) - # self.jobs.append(proc) - # proc.start() +# # proc = Post(target = self._oargs,rows = segment,name=_name) +# # self.jobs.append(proc) +# # proc.start() - # self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) - # while self.jobs : - # jobs = [job for job in proc if job.is_alive()] - # time.sleep(1) - except Exception as e: - print (e) - self.log(**_log) - def is_done(self): - self.jobs = [proc for proc in self.jobs if proc.is_alive()] - return len(self.jobs) == 0 -def instance(**_args): - """ - :path ,index, id - :param _info list of objects with {source,target}` - :param logger - """ - logger = _args['logger'] if 'logger' in _args else None - if 'path' in _args : - _info = json.loads((open(_args['path'])).read()) - - - if 'index' in _args : - _index = int(_args['index']) - _info = _info[_index] - - elif 'id' in _args : - _info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']] - _info = _info[0] if _info else _info - else: - _info = _args['info'] +# # self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) +# # while self.jobs : +# # jobs = [job for job in proc if job.is_alive()] +# # time.sleep(1) +# except Exception as e: +# print (e) +# self.log(**_log) +# def is_done(self): +# self.jobs = [proc for proc in self.jobs if proc.is_alive()] +# return len(self.jobs) == 0 + + +# def instance (**_args): +# """ +# path to configuration file +# """ +# _path = _args['path'] +# _config = {} +# jobs = [] +# if os.path.exists(_path) : +# file = open(_path) +# _config = json.loads(file.read()) +# file.close() +# if _config and type - if logger and type(logger) != str: - ETL.logger = logger - elif logger == 'console': - ETL.logger = transport.factory.instance(provider='console',context='write',lock=True) - if type(_info) in [list,dict] : - _info = _info if type(_info) != dict else [_info] - # - # The assumption here is that the objects within the list are {source,target} - jobs = [] - for _item in _info : - - _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs']) - _job = ETL(**_item) - - _job.start() - jobs.append(_job) - return jobs + +# def _instance(**_args): +# """ +# :path ,index, id +# :param _info list of objects with {source,target}` +# :param logger +# """ +# logger = _args['logger'] if 'logger' in _args else None +# if 'path' in _args : +# _info = json.loads((open(_args['path'])).read()) - else: - return None - -if __name__ == '__main__' : - _info = json.loads(open (SYS_ARGS['config']).read()) - index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None - procs = [] - for _config in _info : - if 'source' in SYS_ARGS : - _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} - - _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) - etl = ETL (**_config) - if index is None: + +# if 'index' in _args : +# _index = int(_args['index']) +# _info = _info[_index] - etl.start() - procs.append(etl) - - elif _info.index(_config) == index : +# elif 'id' in _args : +# _info = [_item for _item in _info if '_id' in _item and _item['id'] == _args['id']] +# _info = _info[0] if _info else _info +# else: +# _info = _args['info'] + +# if logger and type(logger) != str: +# ETL.logger = logger +# elif logger == 'console': +# ETL.logger = transport.factory.instance(provider='console',context='write',lock=True) +# if type(_info) in [list,dict] : +# _info = _info if type(_info) != dict else [_info] +# # +# # The assumption here is that the objects within the list are {source,target} +# jobs = [] +# for _item in _info : - # print (_config) - procs = [etl] - etl.start() - break - # - # - N = len(procs) - while procs : - procs = [thread for thread in procs if not thread.is_done()] - if len(procs) < N : - print (["Finished ",(N-len(procs)), " remaining ", len(procs)]) - N = len(procs) - time.sleep(1) - # print ("We're done !!") \ No newline at end of file +# _item['jobs'] = 5 if 'procs' not in _args else int(_args['procs']) +# _job = ETL(**_item) + +# _job.start() +# jobs.append(_job) +# return jobs + +# else: +# return None + +# if __name__ == '__main__' : +# _info = json.loads(open (SYS_ARGS['config']).read()) +# index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None +# procs = [] +# for _config in _info : +# if 'source' in SYS_ARGS : +# _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} + +# _config['jobs'] = 3 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs']) +# etl = ETL (**_config) +# if index is None: + +# etl.start() +# procs.append(etl) + +# elif _info.index(_config) == index : + +# # print (_config) +# procs = [etl] +# etl.start() +# break +# # +# # +# N = len(procs) +# while procs : +# procs = [thread for thread in procs if not thread.is_done()] +# if len(procs) < N : +# print (["Finished ",(N-len(procs)), " remaining ", len(procs)]) +# N = len(procs) +# time.sleep(1) +# # print ("We're done !!") \ No newline at end of file diff --git a/transport/providers.py b/transport/providers.py index c1c4bae..fc394f3 100644 --- a/transport/providers.py +++ b/transport/providers.py @@ -1,4 +1,4 @@ -from transport.common import Reader, Writer,Console #, factory +# from transport.common import Reader, Writer,Console #, factory from transport import disk import sqlite3 from transport import s3 as s3 @@ -9,6 +9,7 @@ from transport import sql as sql from transport import etl as etl from transport import qlistener from transport import bricks +from transport import session import psycopg2 as pg import mysql.connector as my from google.cloud import bigquery as bq @@ -33,6 +34,8 @@ MARIADB = 'mariadb' COUCHDB = 'couch' CONSOLE = 'console' ETL = 'etl' + + # # synonyms of the above BQ = BIGQUERY @@ -54,13 +57,37 @@ CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,C READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader}, 'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader}, 'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}, - 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} + # 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader } WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter}, 'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter}, - 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},'cli':{CONSOLE:Console},'memory':{CONSOLE:Console} + 'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}, + # 'cli':{CONSOLE:Console}, + # 'memory':{CONSOLE:Console}, 'http':session.HttpReader } +# SQL_PROVIDERS = [POSTGRESQL,MYSQL,NETEZZA,MARIADB,SQLITE] +PROVIDERS = { + FILE:{'read':disk.DiskReader,'write':disk.DiskWriter}, + SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3}, + + POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}}, + NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}}, + REDSHIFT:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}}, + RABBITMQ:{'read':queue.QueueReader,'writer':queue.QueueWriter,'context':queue.QueueListener,'default':{'host':'localhost','port':5432}}, + + MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, + MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}}, + S3:{'read':s3.s3Reader,'write':s3.s3Writer}, + BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter}, + QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}}, + CONSOLE:{'read':qlistener.Console,"write":qlistener.Console}, + HTTP:{'read':session.HttpReader,'write':session.HttpWriter}, + DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter}, + MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}}, + COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}}, + ETL :{'read':etl.Transporter,'write':etl.Transporter} +} DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}} DEFAULT[MONGODB] = {'port':27017,'host':'localhost'} DEFAULT[REDSHIFT] = DEFAULT[PG] diff --git a/transport/qlistener.py b/transport/qlistener.py index 495b731..26f0ba8 100644 --- a/transport/qlistener.py +++ b/transport/qlistener.py @@ -40,3 +40,8 @@ class qListener : _q = qListener._queue[_id] _q.put(_data) _q.join() +class Console (qListener): + def __init__(self,**_args): + super().__init__(callback=print) + + # self.callback = print \ No newline at end of file diff --git a/transport/session.py b/transport/session.py index 915d2b5..d74669a 100644 --- a/transport/session.py +++ b/transport/session.py @@ -1,54 +1,60 @@ from flask import request, session from datetime import datetime import re -from common import Reader, Writer +from transport.common import Reader, Writer import json +import requests +from io import StringIO +import pandas as pd -class HttpRequestReader(Reader): + +class HttpReader(Reader): """ This class is designed to read data from an Http request file handler provided to us by flask The file will be heald in memory and processed accordingly NOTE: This is inefficient and can crash a micro-instance (becareful) """ - def __init__(self,**params): - self.file_length = 0 - try: - - #self.file = params['file'] - #self.file.seek(0, os.SEEK_END) - #self.file_length = self.file.tell() - - #print 'size of file ',self.file_length - self.content = params['file'].readlines() - self.file_length = len(self.content) - except Exception as e: - print ("Error ... ",e) - pass + def __init__(self,**_args): + self._url = _args['url'] + self._headers = None if 'headers' not in _args else _args['headers'] + + # def isready(self): + # return self.file_length > 0 + def format(self,_response): + _mimetype= _response.headers['Content-Type'] + if _mimetype == 'text/csv' or 'text/csv': + _content = _response.text + return pd.read_csv(StringIO(_content)) + # + # @TODO: Add support for excel, JSON and other file formats that fit into a data-frame + # - def isready(self): - return self.file_length > 0 - def read(self,size =-1): - i = 1 - for row in self.content: - i += 1 - if size == i: - break - yield row + return _response.text + def read(self,**_args): + if self._headers : + r = requests.get(self._url,headers = self._headers) + else: + r = requests.get(self._url,headers = self._headers) + return self.format(r) -class HttpSessionWriter(Writer): +class HttpWriter(Writer): """ - This class is designed to write data to a session/cookie + This class is designed to submit data to an endpoint (url) """ - def __init__(self,**params): + def __init__(self,**_args): """ @param key required session key """ - self.session = params['queue'] - self.session['sql'] = [] - self.session['csv'] = [] - self.tablename = re.sub('..+$','',params['filename']) - self.session['uid'] = params['uid'] + self._url = _args['url'] + self._name = _args['name'] + self._method = 'post' if 'method' not in _args else _args['method'] + + # self.session = params['queue'] + # self.session['sql'] = [] + # self.session['csv'] = [] + # self.tablename = re.sub('..+$','',params['filename']) + # self.session['uid'] = params['uid'] #self.xchar = params['xchar'] @@ -57,10 +63,26 @@ class HttpSessionWriter(Writer): return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename) def isready(self): return True - def write(self,**params): - label = params['label'] - row = params ['row'] + def write(self,_data,**_args): + # + # + _method = self._method if 'method' not in _args else _args['method'] + _method = _method.lower() + _mimetype = 'text/csv' + if type(_data) == dict : + _mimetype = 'application/json' + _content = _data + else: + _content = _data.to_dict(orient='records') + _headers = {'Content-Type':_mimetype} + _pointer = getattr(requests,_method) - if label == 'usable': - self.session['csv'].append(self.format(row,',')) - self.session['sql'].append(self.format_sql(row)) + _pointer ({self._name:_content},headers=_headers) + + + # label = params['label'] + # row = params ['row'] + + # if label == 'usable': + # self.session['csv'].append(self.format(row,',')) + # self.session['sql'].append(self.format_sql(row)) diff --git a/transport/sql.py b/transport/sql.py index 3c555f5..019db78 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -291,17 +291,17 @@ class SQLWriter(SQLRW,Writer): """ # inspect = False if 'inspect' not in _args else _args['inspect'] # cast = False if 'cast' not in _args else _args['cast'] - if not self.fields : - if type(info) == list : - _fields = info[0].keys() - elif type(info) == dict : - _fields = info.keys() - elif type(info) == pd.DataFrame : - _fields = info.columns.tolist() - - # _fields = info.keys() if type(info) == dict else info[0].keys() - _fields = list (_fields) - self.init(_fields) + # if not self.fields : + # if type(info) == list : + # _fields = info[0].keys() + # elif type(info) == dict : + # _fields = info.keys() + # elif type(info) == pd.DataFrame : + # _fields = info.columns.tolist() + + # # _fields = info.keys() if type(info) == dict else info[0].keys() + # # _fields = list (_fields) + # self.init(_fields) try: table = _args['table'] if 'table' in _args else self.table diff --git a/transport/version.py b/transport/version.py index ec087c4..5e7e7b7 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.8.6' +__version__= '1.9.0'