From f5187790ced0b23c820738467476e20fe8c11825 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 10 Jun 2024 00:42:42 -0500 Subject: [PATCH] refactor: etl,better reusability & streamlined and threaded --- bin/transport | 51 +++++++----- info/__init__.py | 8 +- setup.py | 9 +-- transport/__init__.py | 75 ++++++++++++++---- transport/etl.py | 141 ++++++++++++++++------------------ transport/iowrapper.py | 84 +++++++++++++++++++- transport/other/files.py | 5 +- transport/plugins/__init__.py | 9 ++- 8 files changed, 251 insertions(+), 131 deletions(-) diff --git a/bin/transport b/bin/transport index f483d94..fd5d41b 100755 --- a/bin/transport +++ b/bin/transport @@ -44,12 +44,15 @@ import sys import transport import time from multiprocessing import Process -import typer + import os import transport from transport import etl # from transport import providers - +import typer +from typing_extensions import Annotated +from typing import Optional +import time app = typer.Typer() @@ -62,28 +65,33 @@ def wait(jobs): time.sleep(1) @app.command(name="apply") -def apply (path,index=None): +def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], + index:int = typer.Option(help="index of the item of interest, otherwise everything in the file will be processed")): """ This function applies data transport from one source to one or several others - - :path path of the configuration file - - :index index of the _item of interest (otherwise everything will be processed) """ - _proxy = lambda _object: _object.write(_object.read()) + # _proxy = lambda _object: _object.write(_object.read()) if os.path.exists(path): file = open(path) _config = json.loads (file.read() ) file.close() if index : - _config = _config[ int(index)] - etl.instance(**_config) - else: - etl.instance(config=_config) + _config = [_config[ int(index)]] + jobs = [] + for _args in _config : + pthread = etl.instance(**_args) #-- automatically starts the process + jobs.append(pthread) + # + # @TODO: Log the number of processes started and estimated time + while jobs : + jobs = [pthread for pthread in jobs if pthread.is_alive()] + time.sleep(1) + # + # @TODO: Log the job termination here ... @app.command(name="providers") -def supported (format:str="table") : +def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") : """ - This function will print supported providers and their associated classifications + This function will print supported providers/vendors and their associated classifications """ _df = (transport.supported()) if format in ['list','json'] : @@ -94,9 +102,15 @@ def supported (format:str="table") : @app.command() def version(): - print (transport.version.__version__) + """ + This function will display version and license information + """ + + print (transport.__app_name__,'version ',transport.__version__) + print (transport.__license__) + @app.command() -def generate (path:str): +def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]): """ This function will generate a configuration template to give a sense of how to create one """ @@ -104,15 +118,12 @@ def generate (path:str): { "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"}, "target": - [{"provider":"file","path":"addresses.csv","delimiter":"csv"},{"provider":"sqlite","database":"sample.db3","table":"addresses"}] + [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}] } ] file = open(path,'w') file.write(json.dumps(_config)) file.close() -@app.command() -def usage(): - print (__doc__) if __name__ == '__main__' : app() # # diff --git a/info/__init__.py b/info/__init__.py index 0594d12..f45fdcd 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,8 +1,8 @@ +__app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.0.2' -__license__=""" - - +__version__= '2.0.4' +__email__ = "info@the-phi.com" +__license__=f""" Copyright 2010 - 2024, Steve L. Nyemba Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/setup.py b/setup.py index 8e9de26..002feb8 100644 --- a/setup.py +++ b/setup.py @@ -5,19 +5,16 @@ from setuptools import setup, find_packages import os import sys # from version import __version__,__author__ -from info import __version__, __author__ +from info import __version__, __author__,__app_name__,__license__ -# __author__ = 'The Phi Technology' -# __version__= '1.8.0' - def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { - "name":"data-transport", + "name":__app_name__, "version":__version__, "author":__author__,"author_email":"info@the-phi.com", - "license":"MIT", + "license":__license__, # "packages":["transport","info","transport/sql"]}, "packages": find_packages(include=['info','transport', 'transport.*'])} diff --git a/transport/__init__.py b/transport/__init__.py index 333931b..d7d4518 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -22,8 +22,8 @@ from transport import sql, nosql, cloud, other import pandas as pd import json import os -from info import __version__,__author__ -from transport.iowrapper import IWriter, IReader +from info import __version__,__author__,__email__,__license__,__app_name__ +from transport.iowrapper import IWriter, IReader, IETL from transport.plugins import PluginLoader from transport import providers @@ -32,26 +32,35 @@ def init(): global PROVIDERS for _module in [cloud,sql,nosql,other] : for _provider_name in dir(_module) : - if _provider_name.startswith('__') : + if _provider_name.startswith('__') or _provider_name == 'common': continue PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__} def instance (**_args): """ - type: - read: true|false (default true) - auth_file + This function returns an object of to read or write from a supported database provider/vendor + @provider provider + @context read/write (default is read) + @auth_file: Optional if the database information provided is in a file. Useful for not sharing passwords + kwargs These are arguments that are provider/vendor specific """ global PROVIDERS if 'auth_file' in _args: if os.path.exists(_args['auth_file']) : + # + # @TODO: add encryption module and decryption to enable this to be secure + # + f = open(_args['auth_file']) - _args = dict (_args,** json.loads(f.read()) ) + #_args = dict (_args,** json.loads(f.read()) ) + # + # we overrite file parameters with arguments passed + _args = dict (json.loads(f.read()),**_args ) f.close() else: filename = _args['auth_file'] raise Exception(f" {filename} was not found or is invalid") - if _args['provider'] in PROVIDERS : + if 'provider' in _args and _args['provider'] in PROVIDERS : _info = PROVIDERS[_args['provider']] _module = _info['module'] if 'context' in _args : @@ -62,22 +71,54 @@ def instance (**_args): _agent = _pointer (**_args) # loader = None - if 'plugins' in _args : - _params = _args['plugins'] - - if 'path' in _params and 'names' in _params : - loader = PluginLoader(**_params) - elif type(_params) == list: - loader = PluginLoader() - for _delegate in _params : - loader.set(_delegate) + + # + # @TODO: + # define a logger object here that will used by the wrapper + # this would allow us to know what the data-transport is doing and where/how it fails + # + # if 'plugins' in _args : + # _params = _args['plugins'] + # if 'path' in _params and 'names' in _params : + # loader = PluginLoader(**_params) + # elif type(_params) == list: + # loader = PluginLoader() + # for _delegate in _params : + # loader.set(_delegate) + + loader = None if 'plugins' not in _args else _args['plugins'] return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader) else: + # + # We can handle the case for an ETL object + # raise Exception ("Missing or Unknown provider") pass +class get : + """ + This class is just a wrapper to make the interface (API) more conversational and easy to understand + """ + @staticmethod + def reader (**_args): + _args['context'] = 'read' + return instance(**_args) + @staticmethod + def writer(**_args): + """ + This function is a wrapper that will return a writer to a database. It disambiguates the interface + """ + _args['context'] = 'write' + return instance(**_args) + @staticmethod + def etl (**_args): + if 'source' in _args and 'target' in _args : + return IETL(**_args) + else: + raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes") + def supported (): _info = {} for _provider in PROVIDERS : diff --git a/transport/etl.py b/transport/etl.py index 25750de..2c60e04 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -39,22 +39,22 @@ import os from multiprocessing import Process -SYS_ARGS = {} -if len(sys.argv) > 1: +# SYS_ARGS = {} +# if len(sys.argv) > 1: - N = len(sys.argv) - for i in range(1,N): - value = None - if sys.argv[i].startswith('--'): - key = sys.argv[i][2:] #.replace('-','') - SYS_ARGS[key] = 1 - if i + 1 < N: - value = sys.argv[i + 1] = sys.argv[i+1].strip() - if key and value and not value.startswith('--'): - SYS_ARGS[key] = value +# N = len(sys.argv) +# for i in range(1,N): +# value = None +# if sys.argv[i].startswith('--'): +# key = sys.argv[i][2:] #.replace('-','') +# SYS_ARGS[key] = 1 +# if i + 1 < N: +# value = sys.argv[i + 1] = sys.argv[i+1].strip() +# if key and value and not value.startswith('--'): +# SYS_ARGS[key] = value - i += 2 +# i += 2 class Transporter(Process): """ The transporter (Jason Stathem) moves data from one persistant store to another @@ -74,81 +74,72 @@ class Transporter(Process): # # Let's insure we can support multiple targets self._target = [self._target] if type(self._target) != list else self._target - pass - def read(self,**_args): - """ - This function - """ - _reader = transport.factory.instance(**self._source) + def run(self): + + _reader = transport.get.etl(source=self._source,target=self._target) # - # If arguments are provided then a query is to be executed (not just a table dump) if 'cmd' in self._source or 'query' in self._source : _query = self._source['cmd'] if 'cmd' in self._source else self._source['query'] return _reader.read(**_query) else: return _reader.read() - # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query']) + + # def _read(self,**_args): + # """ + # This function + # """ + # _reader = transport.factory.instance(**self._source) + # # + # # If arguments are provided then a query is to be executed (not just a table dump) + # if 'cmd' in self._source or 'query' in self._source : + # _query = self._source['cmd'] if 'cmd' in self._source else self._source['query'] + # return _reader.read(**_query) + # else: + # return _reader.read() + # # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query']) - def _delegate_write(self,_data,**_args): - """ - This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern - :data data-frame or object to be written - """ - if _data.shape[0] > 0 : - for _target in self._target : - if 'write' not in _target : - _target['context'] = 'write' - # _target['lock'] = True - else: - # _target['write']['lock'] = True - pass - _writer = transport.factory.instance(**_target) - _writer.write(_data,**_args) - if hasattr(_writer,'close') : - _writer.close() + # def _delegate_write(self,_data,**_args): + # """ + # This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern + # :data data-frame or object to be written + # """ + # if _data.shape[0] > 0 : + # for _target in self._target : + # if 'write' not in _target : + # _target['context'] = 'write' + # # _target['lock'] = True + # else: + # # _target['write']['lock'] = True + # pass + # _writer = transport.factory.instance(**_target) + # _writer.write(_data,**_args) + # if hasattr(_writer,'close') : + # _writer.close() - def write(self,_df,**_args): - """ - """ - SEGMENT_COUNT = 6 - MAX_ROWS = 1000000 - # _df = self.read() - _segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])]) - # _index = 0 + # def write(self,_df,**_args): + # """ + # """ + # SEGMENT_COUNT = 6 + # MAX_ROWS = 1000000 + # # _df = self.read() + # _segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])]) + # # _index = 0 - for _indexes in _segments : - _fwd_args = {} if not _args else _args + # for _indexes in _segments : + # _fwd_args = {} if not _args else _args - self._delegate_write(_df.iloc[_indexes],**_fwd_args) - time.sleep(1) - # - # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) - pass + # self._delegate_write(_df.iloc[_indexes],**_fwd_args) + # time.sleep(1) + # # + # # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?) + # pass def instance(**_args): - _proxy = lambda _agent: _agent.write(_agent.read()) - if 'source' in _args and 'target' in _args : - - _agent = Transporter(**_args) - _proxy(_agent) - - else: - _config = _args['config'] - _items = [Transporter(**_item) for _item in _config ] - _MAX_JOBS = 5 - _items = np.array_split(_items,_MAX_JOBS) - for _batch in _items : - jobs = [] - for _item in _batch : - thread = Process(target=_proxy,args = (_item,)) - thread.start() - jobs.append(thread) - while jobs : - jobs = [thread for thread in jobs if thread.is_alive()] - time.sleep(1) - + pthread = Transporter (**_args) + pthread.start() + return pthread pass # class Post(Process): # def __init__(self,**args): @@ -360,4 +351,4 @@ def instance(**_args): # print (["Finished ",(N-len(procs)), " remaining ", len(procs)]) # N = len(procs) # time.sleep(1) -# # print ("We're done !!") \ No newline at end of file +# # print ("We're done !!") diff --git a/transport/iowrapper.py b/transport/iowrapper.py index df6b2ec..e3ff611 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -1,14 +1,39 @@ """ This class is a wrapper around read/write classes of cloud,sql,nosql,other packages -The wrapper allows for application of plugins as pre-post conditions +The wrapper allows for application of plugins as pre-post conditions. +NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading or writing: + - upon initialization we will load plugins + - on read/write we apply a pipeline (if passed as an argument) """ +from transport.plugins import plugin, PluginLoader +import transport +from transport import providers +from multiprocessing import Process +import time + + class IO: """ - Base wrapper class for read/write + Base wrapper class for read/write and support for logs """ def __init__(self,_agent,plugins): self._agent = _agent - self._plugins = plugins + if plugins : + self._init_plugins(plugins) + else: + self._plugins = None + + def _init_plugins(self,_args): + """ + This function will load pipelined functions as a plugin loader + """ + if 'path' in _args and 'names' in _args : + self._plugins = PluginLoader(**_args) + else: + self._plugins = PluginLoader() + [self._plugins.set(_pointer) for _pointer in _args] + # + # @TODO: We should have a way to log what plugins are loaded and ready to use def meta (self,**_args): if hasattr(self._agent,'meta') : return self._agent.meta(**_args) @@ -28,9 +53,14 @@ class IO: return self._agent.apply(_query) return None class IReader(IO): + """ + This is a wrapper for read functionalities + """ def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) def read(self,**_args): + if 'pipeline' in _args : + self._init_plugins(_args['pipeline']) _data = self._agent.read(**_args) if self._plugins and self._plugins.ratio() > 0 : _data = self._plugins.apply(_data) @@ -41,7 +71,55 @@ class IWriter(IO): def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) def write(self,_data,**_args): + if 'pipeline' in _args : + self._init_plugins(_args['pipeline']) if self._plugins and self._plugins.ratio() > 0 : _data = self._plugins.apply(_data) self._agent.write(_data,**_args) + +# +# The ETL object in its simplest form is an aggregation of read/write objects +# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process + +def _ProcessWriter (_data,_args): + writer = transport.get.writer(**_args) + writer.write(_data) + +class IETL(IReader) : + """ + This class performs an ETL operation by ineriting a read and adding writes as pipeline functions + """ + def __init__(self,**_args): + super().__init__(transport.get.reader(**_args['source'])) + if 'target' in _args: + self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] + else: + self._targets = [] + self.jobs = [] + # + # If the parent is already multiprocessing + self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] + def read(self,**_args): + _data = super().read(**_args) + + for _kwargs in self._targets : + self.post(_data,**_kwargs) + # pthread = Process(target=_ProcessWriter,args=(_data,_kwargs)) + # pthread.start() + # self.jobs.append(pthread) + + # if not self._hasParentProcess : + # while self.jobs : + # jobs = [pthread for pthread in self.jobs if pthread.is_alive()] + # time.sleep(1) + + return _data + def post (self,_data,**_args) : + """ + This function returns an instance of a process that will perform the write operation + :_args parameters associated with writer object + """ + writer = transport.get.writer(**_args) + writer.write(_data) + writer.close() \ No newline at end of file diff --git a/transport/other/files.py b/transport/other/files.py index a4e8a08..62ee3c4 100644 --- a/transport/other/files.py +++ b/transport/other/files.py @@ -53,8 +53,8 @@ class Writer (File): """ try: - _delim = self._delimiter if 'delimiter' not in _args else _args['delimiter'] - _path = self._path if 'path' not in _args else _args['path'] + _delim = self.delimiter if 'delimiter' not in _args else _args['delimiter'] + _path = self.path if 'path' not in _args else _args['path'] _mode = self._mode if 'mode' not in _args else _args['mode'] info.to_csv(_path,index=False,sep=_delim) @@ -62,6 +62,7 @@ class Writer (File): except Exception as e: # # Not sure what should be done here ... + print (e) pass finally: # DiskWriter.THREAD_LOCK.release() diff --git a/transport/plugins/__init__.py b/transport/plugins/__init__.py index 6117664..26e5782 100644 --- a/transport/plugins/__init__.py +++ b/transport/plugins/__init__.py @@ -25,9 +25,9 @@ class plugin : self._name = _args['name'] self._about = _args['about'] self._mode = _args['mode'] if 'mode' in _args else 'rw' - def __call__(self,pointer): - def wrapper(_args): - return pointer(_args) + def __call__(self,pointer,**kwargs): + def wrapper(_args,**kwargs): + return pointer(_args,**kwargs) # # @TODO: # add attributes to the wrapper object @@ -55,6 +55,7 @@ class PluginLoader : self._names = [] if path and os.path.exists(path) and _names: for _name in self._names : + spec = importlib.util.spec_from_file_location('private', path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) #--loads it into sys.modules @@ -101,7 +102,7 @@ class PluginLoader : return _name in self._modules def ratio (self): """ - how many modules loaded vs unloaded given the list of names + This functiion determines how many modules loaded vs unloaded given the list of names """ _n = len(self._names)