From 469c6f89a2a3f7ccc2391831578b266b9f8e7cb4 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 5 Mar 2025 21:24:15 -0600 Subject: [PATCH] fixes with plugin handler --- bin/transport | 70 +++++++---- transport/iowrapper.py | 52 +++++--- transport/registry.py | 264 ++++++++++++++++++++--------------------- 3 files changed, 212 insertions(+), 174 deletions(-) diff --git a/bin/transport b/bin/transport index 6ca01bc..19b664e 100755 --- a/bin/transport +++ b/bin/transport @@ -34,6 +34,8 @@ import time from termcolor import colored from enum import Enum from rich import print +import plugin_ix as pix + app = typer.Typer() app_e = typer.Typer() #-- handles etl (run, generate) @@ -147,7 +149,7 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")], path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), override:bool=typer.Option(default=False,help="override existing configuration or not")): """ - This functiion will initialize the registry and have both application and calling code loading the database parameters by a label + This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label """ try: @@ -179,42 +181,62 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be pass @app_x.command(name='add') def register_plugs ( - alias:Annotated[str,typer.Argument(help="unique alias fo the file being registered")], - path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")] + alias:Annotated[str,typer.Argument(help="unique function name within a file")], + path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")], + folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"), + ): """ - This function will register a file and the functions within will be refrences . in a configuration file + This function will register a file and the functions within we are interested in using """ - transport.registry.plugins.init() - _log = transport.registry.plugins.add(alias,path) + if ',' in alias : + alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ] + else: + alias = [alias.strip()] + _pregistry = pix.Registry(folder=folder,plugin_folder='plugins/code') + _log = _pregistry.set(path,alias) + # transport.registry.plugins.init() + # _log = transport.registry.plugins.add(alias,path) _mark = TIMES_MARK if not _log else CHECK_MARK - _msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added""" + _msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered""" print (f"""{_mark} {_msg}""") @app_x.command(name="list") -def registry_list (): - - transport.registry.plugins.init() - _d = [] - for _alias in transport.registry.plugins._data : - _data = transport.registry.plugins._data[_alias] - _d += [{'alias':_alias,"plugin-count":len(_data['content']),'e.g':'@'.join([_alias,_data['content'][0]]),'plugins':json.dumps(_data['content'])}] - if _d: - print (pd.DataFrame(_d)) +def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")): + """ + This function will list all the plugins (python functions/files) that are registered and can be reused + """ + _pregistry = pix.Registry(folder=folder) + _df = _pregistry.stats() + if _df.empty : + print (f"{TIMES_MARK} registry at {folder} is not ready") else: - print (f"""{TIMES_MARK}, Plugin registry is not available or needs initialization""") + print (_df) +@app_x.command ("has") +def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")], + folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) : + _pregistry = pix.Registry(folder=folder) + if _pregistry.has(alias) : + _msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry " + else: + _msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry " + print (_msg) + @app_x.command(name="test") -def registry_test (key): +def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")], + folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) : + _pregistry = pix.Registry(folder=folder) """ This function allows to test syntax for a plugin i.e in terms of alias@function """ - _item = transport.registry.plugins.has(key=key) - if _item : - del _item['pointer'] - print (f"""{CHECK_MARK} successfully loaded \033[1m{key}\033[0m found, version {_item['version']}""") - print (pd.DataFrame([_item])) + # _item = transport.registry.plugins.has(key=key) + _pointer = _pregistry.get(alias) if _pregistry.has(alias) else None + + if _pointer: + print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""") + else: - print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered") + print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered") app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file") app.add_typer(app_r,name='registry',help='This function allows labeling database access information') app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies") diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 700b589..396135e 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -11,6 +11,7 @@ from transport import providers from multiprocessing import Process import time +import plugin_ix class IO: """ @@ -21,20 +22,25 @@ class IO: plugins = _args['plugins'] if 'plugins' not in _args else None self._agent = _agent + self._ixloader = plugin_ix.Loader () #-- if plugins : - self._init_plugins(plugins) - else: - self._plugins = None + self.init_plugins(plugins) + # for _ref in plugins : + # self._ixloader.set(_ref) + # if plugins : + # self._init_plugins(plugins) + # else: + # self._plugins = None - def _init_plugins(self,_args): - """ - This function will load pipelined functions as a plugin loader - """ - if 'path' in _args and 'names' in _args : - self._plugins = PluginLoader(**_args) - else: - self._plugins = PluginLoader() - [self._plugins.set(_pointer) for _pointer in _args] + # def _init_plugins(self,_args): + # """ + # This function will load pipelined functions as a plugin loader + # """ + # if 'path' in _args and 'names' in _args : + # self._plugins = PluginLoader(**_args) + # else: + # self._plugins = PluginLoader() + # [self._plugins.set(_pointer) for _pointer in _args] # # @TODO: We should have a way to log what plugins are loaded and ready to use def meta (self,**_args): @@ -62,6 +68,10 @@ class IO: pointer = getattr(self._agent,_name) return pointer(_query) return None + def init_plugins(self,plugins): + for _ref in plugins : + self._ixloader.set(_ref) + class IReader(IO): """ This is a wrapper for read functionalities @@ -71,22 +81,28 @@ class IReader(IO): def read(self,**_args): if 'plugins' in _args : - self._init_plugins(_args['plugins']) + self.init_plugins(_args['plugins']) + _data = self._agent.read(**_args) - if self._plugins and self._plugins.ratio() > 0 : - _data = self._plugins.apply(_data) + # if self._plugins and self._plugins.ratio() > 0 : + # _data = self._plugins.apply(_data) # # output data + + # + # applying the the design pattern + _data = self._ixloader.visitor(_data) return _data class IWriter(IO): def __init__(self,**_args): #_agent,pipeline=None): super().__init__(**_args) #_agent,pipeline) def write(self,_data,**_args): + # if 'plugins' in _args : + # self._init_plugins(_args['plugins']) if 'plugins' in _args : - self._init_plugins(_args['plugins']) - if self._plugins and self._plugins.ratio() > 0 : - _data = self._plugins.apply(_data) + self.init_plugins(_args['plugins']) + self._ixloader.visitor(_data) self._agent.write(_data,**_args) # diff --git a/transport/registry.py b/transport/registry.py index 1f612dc..71909f6 100644 --- a/transport/registry.py +++ b/transport/registry.py @@ -21,161 +21,161 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ : REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH'] REGISTRY_FILE= 'transport-registry.json' DATA = {} -class plugins: - # - # This is a utility function that should enable management of plugins-registry - # The class allows to add/remove elements - # - # @TODO: add read/write properties to the class (better design practice) - # - _data = {} - FOLDER = os.sep.join([REGISTRY_PATH,'plugins']) - CODE = os.sep.join([REGISTRY_PATH,'plugins','code']) - FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json']) - @staticmethod - def init(): +# class plugins: +# # +# # This is a utility function that should enable management of plugins-registry +# # The class allows to add/remove elements +# # +# # @TODO: add read/write properties to the class (better design practice) +# # +# _data = {} +# FOLDER = os.sep.join([REGISTRY_PATH,'plugins']) +# CODE = os.sep.join([REGISTRY_PATH,'plugins','code']) +# FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json']) +# @staticmethod +# def init(): - if not os.path.exists(plugins.FOLDER) : - os.makedirs(plugins.FOLDER) - if not os.path.exists(plugins.CODE): - os.makedirs(plugins.CODE) - if not os.path.exists(plugins.FILE): - f = open(plugins.FILE,'w') - f.write("{}") - f.close() - plugins._read() #-- will load data as a side effect +# if not os.path.exists(plugins.FOLDER) : +# os.makedirs(plugins.FOLDER) +# if not os.path.exists(plugins.CODE): +# os.makedirs(plugins.CODE) +# if not os.path.exists(plugins.FILE): +# f = open(plugins.FILE,'w') +# f.write("{}") +# f.close() +# plugins._read() #-- will load data as a side effect - @staticmethod - def copy (path) : +# @staticmethod +# def copy (path) : - shutil.copy2(path,plugins.CODE) - @staticmethod - def _read (): - f = open(plugins.FILE) - try: - _data = json.loads(f.read()) - f.close() - except Exception as e: - print (f"Corrupted registry, resetting ...") - _data = {} - plugins._write(_data) +# shutil.copy2(path,plugins.CODE) +# @staticmethod +# def _read (): +# f = open(plugins.FILE) +# try: +# _data = json.loads(f.read()) +# f.close() +# except Exception as e: +# print (f"Corrupted registry, resetting ...") +# _data = {} +# plugins._write(_data) - plugins._data = _data - @staticmethod - def _write (_data): - f = open(plugins.FILE,'w') - f.write(json.dumps(_data)) - f.close() - plugins._data = _data +# plugins._data = _data +# @staticmethod +# def _write (_data): +# f = open(plugins.FILE,'w') +# f.write(json.dumps(_data)) +# f.close() +# plugins._data = _data - @staticmethod - def inspect (_path): - _names = [] +# @staticmethod +# def inspect (_path): +# _names = [] - if os.path.exists(_path) : - _filename = _path.split(os.sep)[-1] - spec = importlib.util.spec_from_file_location(_filename, _path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) +# if os.path.exists(_path) : +# _filename = _path.split(os.sep)[-1] +# spec = importlib.util.spec_from_file_location(_filename, _path) +# module = importlib.util.module_from_spec(spec) +# spec.loader.exec_module(module) - # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function'] - for _name in dir(module) : - _pointer = getattr(module,_name) - if hasattr(_pointer,'transport') : - _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')} - _names.append(_item) +# # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function'] +# for _name in dir(module) : +# _pointer = getattr(module,_name) +# if hasattr(_pointer,'transport') : +# _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')} +# _names.append(_item) - return _names - @staticmethod - def add (alias,path): - """ - Add overwrite the registry entries - """ - _names = plugins.inspect (path) - _log = [] +# return _names +# @staticmethod +# def add (alias,path): +# """ +# Add overwrite the registry entries +# """ +# _names = plugins.inspect (path) +# _log = [] - if _names : - # - # We should make sure we have all the plugins with the attributes (transport,name) set - _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ] - if _names : - plugins.copy(path) - _content = [] +# if _names : +# # +# # We should make sure we have all the plugins with the attributes (transport,name) set +# _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ] +# if _names : +# plugins.copy(path) +# _content = [] - for _item in _names : - _key = '@'.join([alias,_item['name']]) - _log.append(_item['name']) - # - # Let us update the registry - # - plugins.update(alias,path,_log) - return _log +# for _item in _names : +# _key = '@'.join([alias,_item['name']]) +# _log.append(_item['name']) +# # +# # Let us update the registry +# # +# plugins.update(alias,path,_log) +# return _log - @staticmethod - def update (alias,path,_log) : - """ - updating the registry entries of the plugins (management data) - """ - # f = open(plugins.FILE) - # _data = json.loads(f.read()) - # f.close() - _data = plugins._data - # _log = plugins.add(alias,path) +# @staticmethod +# def update (alias,path,_log) : +# """ +# updating the registry entries of the plugins (management data) +# """ +# # f = open(plugins.FILE) +# # _data = json.loads(f.read()) +# # f.close() +# _data = plugins._data +# # _log = plugins.add(alias,path) - if _log : - _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]} - plugins._write(_data) #-- will update data as a side effect +# if _log : +# _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]} +# plugins._write(_data) #-- will update data as a side effect - return _log - @staticmethod - def get(**_args) : - # f = open(plugins.FILE) - # _data = json.loads(f.read()) - # f.close() - # if 'key' in _args : - # alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') - # else : - # alias = _args['alias'] - # name = _args['name'] +# return _log +# @staticmethod +# def get(**_args) : +# # f = open(plugins.FILE) +# # _data = json.loads(f.read()) +# # f.close() +# # if 'key' in _args : +# # alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') +# # else : +# # alias = _args['alias'] +# # name = _args['name'] - # if alias in _data : +# # if alias in _data : - # _path = os.sep.join([plugins.CODE,_data[alias]['name']]) - # _item = [_item for _item in plugins.inspect(_path) if name == _item['name']] +# # _path = os.sep.join([plugins.CODE,_data[alias]['name']]) +# # _item = [_item for _item in plugins.inspect(_path) if name == _item['name']] - # _item = _item[0] if _item else None - # if _item : +# # _item = _item[0] if _item else None +# # if _item : - # return _item['pointer'] - # return None - _item = plugins.has(**_args) - return _item['pointer'] if _item else None +# # return _item['pointer'] +# # return None +# _item = plugins.has(**_args) +# return _item['pointer'] if _item else None - @staticmethod - def has (**_args): - f = open(plugins.FILE) - _data = json.loads(f.read()) - f.close() - if 'key' in _args : - alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') - else : - alias = _args['alias'] - name = _args['name'] +# @staticmethod +# def has (**_args): +# f = open(plugins.FILE) +# _data = json.loads(f.read()) +# f.close() +# if 'key' in _args : +# alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') +# else : +# alias = _args['alias'] +# name = _args['name'] - if alias in _data : +# if alias in _data : - _path = os.sep.join([plugins.CODE,_data[alias]['name']]) - _item = [_item for _item in plugins.inspect(_path) if name == _item['name']] +# _path = os.sep.join([plugins.CODE,_data[alias]['name']]) +# _item = [_item for _item in plugins.inspect(_path) if name == _item['name']] - _item = _item[0] if _item else None - if _item : +# _item = _item[0] if _item else None +# if _item : - return copy.copy(_item) - return None - @staticmethod - def synch(): - pass +# return copy.copy(_item) +# return None +# @staticmethod +# def synch(): +# pass def isloaded (): return DATA not in [{},None]