From d0e655e7e3ddbb4e40192e21688a7cf2fd90ef00 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Tue, 29 Oct 2024 09:48:59 -0500
Subject: [PATCH 01/16] update, community edition baseline

---
 transport/registry.py | 172 ++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 165 insertions(+), 7 deletions(-)

diff --git a/transport/registry.py b/transport/registry.py
index 6764f1b..f3dc8ac 100644
--- a/transport/registry.py
+++ b/transport/registry.py
@@ -3,6 +3,10 @@ import json
 from info import __version__
 import copy
 import transport
+import importlib
+import importlib.util
+import shutil
+
 
 """
 This class manages data from the registry and allows (read only)
@@ -16,28 +20,182 @@ REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
 if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
     REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
 REGISTRY_FILE= 'transport-registry.json'
-
 DATA = {}
+class plugins:
+    #
+    # This is a utility function that should enable management of plugins-registry
+    # The class allows to add/remove elements 
+    #
+    # @TODO: add read/write properties to the class (better design practice)
+    #
+    _data = {}
+    FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
+    CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
+    FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
+    @staticmethod
+    def init():
+        
+        if not os.path.exists(plugins.FOLDER) :
+            os.makedirs(plugins.FOLDER)
+        if not os.path.exists(plugins.CODE):
+            os.makedirs(plugins.CODE)
+        if not os.path.exists(plugins.FILE):
+            f = open(plugins.FILE,'w')
+            f.write("{}")
+            f.close()
+        plugins._read() #-- will load data as a side effect
+
+    @staticmethod
+    def copy (path) :
+        
+        shutil.copy2(path,plugins.CODE)
+    @staticmethod
+    def _read ():
+        f = open(plugins.FILE)
+        try:
+            _data = json.loads(f.read())
+            f.close()
+        except Exception as e:
+            print (f"Corrupted registry, resetting ...")
+            _data = {}
+            plugins._write(_data)
+            
+        plugins._data = _data
+    @staticmethod
+    def _write (_data):
+        f = open(plugins.FILE,'w')
+        f.write(json.dumps(_data))
+        f.close()
+        plugins._data = _data
+
+    @staticmethod
+    def inspect (_path):
+        _names = []
+        
+        if os.path.exists(_path) :
+            _filename = _path.split(os.sep)[-1]
+            spec = importlib.util.spec_from_file_location(_filename, _path)
+            module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(module)
+
+            # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
+            for _name in dir(module) :
+                _pointer = getattr(module,_name) 
+                if hasattr(_pointer,'transport') :
+                    _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
+                    _names.append(_item)
+
+            
+        return _names
+    @staticmethod
+    def add (alias,path):
+        """
+        Add overwrite the registry entries
+        """
+        _names = plugins.inspect (path)
+        _log = []
+        
+        if _names :
+            #
+            # We should make sure we have all the plugins with the attributes (transport,name) set
+            _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
+            if _names :
+                plugins.copy(path)
+                _content = []
+                
+                for _item in _names :
+                    _key = '@'.join([alias,_item['name']])
+                    _log.append(_item['name'])
+                #
+                # Let us update the registry 
+                # 
+                plugins.update(alias,path,_log)        
+        return _log
+    
+    @staticmethod
+    def update (alias,path,_log) :
+        """
+        updating the registry entries of the plugins (management data)
+        """
+        # f = open(plugins.FILE)
+        # _data = json.loads(f.read())
+        # f.close()
+        _data = plugins._data
+        # _log = plugins.add(alias,path)
+        
+        if _log :
+            _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
+            plugins._write(_data) #-- will update data as a side effect
+
+        return _log
+    @staticmethod
+    def get(**_args) :
+        # f = open(plugins.FILE)
+        # _data = json.loads(f.read())
+        # f.close()
+        # if 'key' in _args :
+        #     alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+        # else :
+        #     alias = _args['alias']
+        #     name  = _args['name']
+        
+        # if alias in _data :
+            
+        #     _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+        #     _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+            
+        #     _item = _item[0] if _item else None
+        #     if _item :
+                
+        #         return _item['pointer'] 
+        # return None
+        _item = plugins.has(**_args)
+        return _item['pointer'] if _item else None
+    
+    @staticmethod
+    def has (**_args):
+        f = open(plugins.FILE)
+        _data = json.loads(f.read())
+        f.close()
+        if 'key' in _args :
+            alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+        else :
+            alias = _args['alias']
+            name  = _args['name']
+        
+        if alias in _data :
+            
+            _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+            _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+            
+            _item = _item[0] if _item else None
+            if _item :
+                
+                return copy.copy(_item)
+        return None
+    @staticmethod
+    def synch():
+        pass
 
 def isloaded ():
     return DATA not in [{},None]
-def exists (path=REGISTRY_PATH) :
+def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :
     """
     This function determines if there is a registry at all
     """
     p = os.path.exists(path)
-    q = os.path.exists( os.sep.join([path,REGISTRY_FILE]))
+    q = os.path.exists( os.sep.join([path,_file]))
     
     return p and q
-def load (_path=REGISTRY_PATH):
+def load (_path=REGISTRY_PATH,_file=REGISTRY_FILE):
     global DATA
     
     if exists(_path) :
-        path = os.sep.join([_path,REGISTRY_FILE])
+        path = os.sep.join([_path,_file])
         f = open(path)
         DATA = json.loads(f.read())
         f.close()
-def init (email,path=REGISTRY_PATH,override=False):
+def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE):
     """
     Initializing the registry and will raise an exception in the advent of an issue
     """
@@ -47,7 +205,7 @@ def init (email,path=REGISTRY_PATH,override=False):
         _config = {"email":email,'version':__version__}
         if not os.path.exists(path):
             os.makedirs(path)
-        filename = os.sep.join([path,REGISTRY_FILE])
+        filename = os.sep.join([path,_file])
         if not os.path.exists(filename) or override == True :
 
             f = open(filename,'w')

From 2a72de4cd6a9acc40f66ac16557c4eac9094d048 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Tue, 31 Dec 2024 12:20:22 -0600
Subject: [PATCH 02/16] bug fixes: registry and handling cli parameters as well
 as adding warehousing

---
 bin/transport                   | 114 ++++++++++++++++++------
 setup.py                        |   2 +-
 transport/__init__.py           |  91 +++++++++++++------
 transport/iowrapper.py          |  20 +++--
 transport/plugins/__init__.py   | 109 ++++++++++++++---------
 transport/providers/__init__.py |   8 +-
 transport/registry.py           |   2 +
 transport/warehouse/__init__.py |   7 ++
 transport/warehouse/drill.py    |  55 ++++++++++++
 transport/warehouse/iceberg.py  | 151 ++++++++++++++++++++++++++++++++
 10 files changed, 458 insertions(+), 101 deletions(-)
 create mode 100644 transport/warehouse/__init__.py
 create mode 100644 transport/warehouse/drill.py
 create mode 100644 transport/warehouse/iceberg.py

diff --git a/bin/transport b/bin/transport
index 4053c4e..d2072f7 100755
--- a/bin/transport
+++ b/bin/transport
@@ -24,19 +24,25 @@ from multiprocessing import Process
 
 import os
 import transport
-from transport import etl
+# from transport import etl
+from transport.iowrapper import IETL
 # from transport import providers
 import typer
 from typing_extensions import Annotated
 from typing import Optional
 import time
 from termcolor import colored
+from enum import Enum
+from rich import print
 
 app = typer.Typer()
+app_x = typer.Typer()
+app_i = typer.Typer()
+app_r = typer.Typer()
 REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
 REGISTRY_FILE= 'transport-registry.json'
-CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']'])
-TIMES_MARK= ' '.join(['[',colored(u'\u2717','red'),']'])
+CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
+TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
 # @app.command()
 def help() :     
 	print (__doc__)
@@ -44,10 +50,15 @@ def wait(jobs):
     while jobs :
         jobs = [thread for thread in jobs if thread.is_alive()]
         time.sleep(1)
+def wait (jobs):
+    while jobs :
+            jobs = [pthread for pthread in jobs if pthread.is_alive()]
 
-@app.command(name="apply")
+@app.command(name="etl")
 def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
-        index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")):
+        index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
+        batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
+        ):
     """
     This function applies data transport ETL feature to read data from one source to write it one or several others
     """
@@ -56,23 +67,34 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
         file = open(path)
         _config = json.loads (file.read() )
         file.close()
-        if index :
+        if index is not None:            
             _config = [_config[ int(index)]]
-        jobs = []            
+        jobs = []          
         for _args in _config :
-            pthread = etl.instance(**_args) #-- automatically starts the process
+            # pthread = etl.instance(**_args) #-- automatically starts the process
+            def bootup ():
+                _worker = IETL(**_args)
+                _worker.run()
+            pthread = Process(target=bootup)
+            pthread.start()
             jobs.append(pthread)
+            if len(jobs) == batch :
+                wait(jobs)
+                jobs = []
+        
+        if jobs :
+            wait (jobs)
         #
-        # @TODO: Log the number of processes started and estimated time
-        while jobs :
-             jobs = [pthread for pthread in jobs if pthread.is_alive()]
-             time.sleep(1)
+        # @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time
+        # while jobs :
+        #      jobs = [pthread for pthread in jobs if pthread.is_alive()]
+        #      time.sleep(1)
         #
         # @TODO: Log the job termination here ...
-@app.command(name="providers")
+@app_i.command(name="supported")
 def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
     """
-    This function will print supported providers/vendors and their associated classifications
+    This function will print supported database technologies
     """
     _df =  (transport.supported())
     if format in ['list','json'] :
@@ -81,13 +103,14 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
          print (_df)
     print ()
 
-@app.command()
-def version():
+@app_i.command(name="license")
+def info():
     """
     This function will display version and license information
     """
 
-    print (transport.__app_name__,'version ',transport.__version__)
+    print (f'[bold] {transport.__app_name__} ,version {transport.__version__}[/bold]')
+    print ()
     print (transport.__license__)
 
 @app.command()
@@ -99,18 +122,18 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat
             {
                 "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
                 "target":
-            [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
+            [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite3","database":"sample.db3","table":"addresses"}]
             }
             ]
     file = open(path,'w')
     file.write(json.dumps(_config))
     file.close()
-    print (f"""{CHECK_MARK} Successfully generated a template ETL file at {path}""" )
+    print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold]{path}[/bold]""" )
     print ("""NOTE: Each line (source or target) is the content of an auth-file""")
 
 
 
-@app.command(name="init")
+@app_r.command(name="reset")
 def initregistry (email:Annotated[str,typer.Argument(help="email")],
                   path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), 
                   override:bool=typer.Option(default=False,help="override existing configuration or not")):
@@ -120,24 +143,24 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
     """
     try:
         transport.registry.init(email=email, path=path, override=override)
-        _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
+        _msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold]{path}[/bold] from [bold]{email}[/bold]"""
     except Exception as e:
         _msg = f"{TIMES_MARK} {e}"
     print (_msg)
     print ()
-@app.command(name="register")
+@app_r.command(name="add")
 def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
               auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
               default:bool=typer.Option(default=False,help="set the auth_file as default"),
               path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
     """
-    This function will register an auth-file i.e database connection and assign it a label, 
-    Learn more about auth-file at https://healthcareio.the-phi.com/data-transport
+    This function add  a database label for a given auth-file. which allows access to the database using a label of your choice.
+    
     """
     try:
         if transport.registry.exists(path) :
             transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
-            _msg = f"""{CHECK_MARK} Successfully added label "{label}" to data-transport registry"""
+            _msg = f"""{CHECK_MARK} Successfully added label [bold]"{label}"[/bold] to data-transport registry"""
         else:
             _msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
     except Exception as e:
@@ -145,6 +168,47 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
     print (_msg)
     
     pass
+@app_x.command(name='add') 
+def register_plugs (
+    alias:Annotated[str,typer.Argument(help="unique alias fo the file being registered")],
+    path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")]
+    ):
+    """
+    This function will register a file and the functions within will be refrences <alias>.<function> in a configuration file
+    """
+    transport.registry.plugins.init()
+    _log = transport.registry.plugins.add(alias,path)
+    _mark = TIMES_MARK if not _log else CHECK_MARK
+    _msg  = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added"""
+    print (f"""{_mark} {_msg}""")
+@app_x.command(name="list") 
+def registry_list ():
+
+    transport.registry.plugins.init()
+    _d = []
+    for _alias in transport.registry.plugins._data :
+        _data = transport.registry.plugins._data[_alias]
+        _d +=  [{'alias':_alias,"plugin-count":len(_data['content']),'e.g':'@'.join([_alias,_data['content'][0]]),'plugins':json.dumps(_data['content'])}]
+    if _d:
+        print (pd.DataFrame(_d))
+    else:
+        print (f"""{TIMES_MARK}, Plugin registry is not available or needs initialization""")
+
+@app_x.command(name="test") 
+def registry_test (key):
+    """
+    This function allows to test syntax for a plugin i.e in terms of alias@function
+    """
+    _item = transport.registry.plugins.has(key=key)
+    if _item :
+        del _item['pointer']
+        print (f"""{CHECK_MARK} successfully loaded \033[1m{key}\033[0m found, version {_item['version']}""")
+        print (pd.DataFrame([_item]))
+    else:
+        print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered")
+app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
+app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
+app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")
 if __name__ == '__main__' :
      app()
 	
diff --git a/setup.py b/setup.py
index 7bb44e8..f11a6ca 100644
--- a/setup.py
+++ b/setup.py
@@ -19,7 +19,7 @@ args    = {
 
     "packages": find_packages(include=['info','transport', 'transport.*'])}
 args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
+args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args['scripts'] = ['bin/transport']
 # if sys.version_info[0] == 2 :
diff --git a/transport/__init__.py b/transport/__init__.py
index b934760..33a3261 100644
--- a/transport/__init__.py
+++ b/transport/__init__.py
@@ -18,7 +18,7 @@ Source Code is available under MIT License:
 """
 import numpy as np
 
-from transport import sql, nosql, cloud, other
+from transport import sql, nosql, cloud, other, warehouse
 import pandas as pd
 import json
 import os
@@ -28,21 +28,26 @@ from transport.plugins import PluginLoader
 from transport import providers
 import copy 
 from transport import registry
-
+from transport.plugins import Plugin 
 PROVIDERS = {}
 
 def init():
     global PROVIDERS
-    for _module in [cloud,sql,nosql,other] :
+    for _module in [cloud,sql,nosql,other,warehouse] :
         for _provider_name in dir(_module) :
             if _provider_name.startswith('__') or _provider_name == 'common':
                 continue
             PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
-def _getauthfile (path) :
-    f = open(path)
-    _object = json.loads(f.read())
-    f.close()
-    return _object
+    #
+    # loading the registry
+    if not registry.isloaded() :
+        registry.load()
+
+# def _getauthfile (path) :
+#     f = open(path)
+#     _object = json.loads(f.read())
+#     f.close()
+#     return _object
 def instance (**_args):
     """
     This function returns an object of to read or write from a supported database provider/vendor
@@ -52,16 +57,7 @@ def instance (**_args):
     kwargs      These are arguments that are provider/vendor specific
     """
     global PROVIDERS
-    # if not registry.isloaded () :
-    #     if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
-    #         registry.load() if 'path' not in _args else registry.load(_args['path'])
-    #         print ([' GOT IT'])
-    # if 'label' in _args and registry.isloaded():
-    #     _info = registry.get(_args['label'])
-    #     if _info :
-    #         #
-    #         _args = dict(_args,**_info)
-
+    
     if 'auth_file' in _args:
         if os.path.exists(_args['auth_file']) :
             #
@@ -78,7 +74,7 @@ def instance (**_args):
             filename = _args['auth_file']
             raise Exception(f" {filename} was not found or is invalid")
     if 'provider' not in _args and 'auth_file' not in _args :
-        if not registry.isloaded () :
+        if not registry.isloaded () : 
             if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
                 registry.load() if 'path' not in _args else registry.load(_args['path'])
         _info = {}
@@ -87,8 +83,6 @@ def instance (**_args):
         else:
             _info = registry.get()    
         if _info :
-            #
-            # _args = dict(_args,**_info)
             _args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments
 
     if 'provider' in _args and _args['provider'] in PROVIDERS :
@@ -119,8 +113,32 @@ def instance (**_args):
         #         for _delegate in _params :
         #             loader.set(_delegate)
         
-        loader = None if 'plugins' not in _args else _args['plugins']
-        return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader)
+        _plugins = None if 'plugins' not in _args else _args['plugins']
+        
+        # if registry.has('logger') :
+        #     _kwa = registry.get('logger')
+        #     _lmodule = getPROVIDERS[_kwa['provider']]
+            
+        if ( ('label' in _args and _args['label'] != 'logger') and registry.has('logger')):
+            #
+            # We did not request label called logger, so we are setting up a logger if it is specified in the registry
+            #
+            _kwargs = registry.get('logger')
+            _kwargs['context']  = 'write'
+            _kwargs['table']    =_module.__name__.split('.')[-1]+'_logs'
+            # _logger = instance(**_kwargs)
+            _module = PROVIDERS[_kwargs['provider']]['module']
+            _logger = getattr(_module,'Writer')
+            _logger = _logger(**_kwargs)
+        else:
+            _logger = None
+        
+        _kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger}
+        if 'args' in _args :
+            _kwargs['args'] = _args['args']
+        # _datatransport =  IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger)
+        _datatransport =  IReader(**_kwargs) if _context == 'read' else IWriter(**_kwargs)
+        return _datatransport
 
     else:
         #
@@ -137,7 +155,14 @@ class get :
         if not _args or ('provider' not in _args and 'label' not in _args):
             _args['label'] = 'default'
         _args['context'] = 'read'
-        return instance(**_args)
+        # return instance(**_args)
+        # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
+        
+        _handler =  instance(**_args)
+        # _handler.setLogger(get.logger())
+        return _handler
+
+    
     @staticmethod
     def writer(**_args):
         """
@@ -146,10 +171,26 @@ class get :
         if not _args or ('provider' not in _args and 'label' not in _args):
             _args['label'] = 'default'
         _args['context'] = 'write'
-        return instance(**_args)
+        # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
+
+        _handler =  instance(**_args)
+        #
+        # Implementing logging with the 'eat-your-own-dog-food' approach
+        # Using dependency injection to set the logger (problem with imports)
+        #
+        # _handler.setLogger(get.logger())
+        return _handler
+    @staticmethod
+    def logger ():
+        if registry.has('logger') :
+            _args = registry.get('logger')
+            _args['context']  = 'write'
+            return instance(**_args)
+        return None
     @staticmethod
     def etl (**_args):
         if 'source' in _args and 'target' in _args :
+
             return IETL(**_args)
         else:
             raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes")
diff --git a/transport/iowrapper.py b/transport/iowrapper.py
index e3abf6c..e532e7d 100644
--- a/transport/iowrapper.py
+++ b/transport/iowrapper.py
@@ -5,7 +5,7 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
         - upon initialization we will load plugins
         - on read/write we apply a pipeline (if passed as an argument)
 """    
-from transport.plugins import plugin, PluginLoader
+from transport.plugins import Plugin, PluginLoader
 import transport
 from transport import providers
 from multiprocessing import Process
@@ -16,7 +16,10 @@ class IO:
     """
     Base wrapper class for read/write and support for logs
     """
-    def __init__(self,_agent,plugins):
+    def __init__(self,**_args):
+        _agent  = _args['agent']
+        plugins = _args['plugins'] if 'plugins' not in _args else None
+
         self._agent = _agent
         if plugins :
             self._init_plugins(plugins)
@@ -63,8 +66,9 @@ class IReader(IO):
     """
     This is a wrapper for read functionalities
     """
-    def __init__(self,_agent,pipeline=None):
-        super().__init__(_agent,pipeline)
+    def __init__(self,**_args):
+        super().__init__(**_args)
+        
     def read(self,**_args):
         if 'plugins' in _args :
             self._init_plugins(_args['plugins'])
@@ -75,8 +79,8 @@ class IReader(IO):
         # output data 
         return _data
 class IWriter(IO):
-    def __init__(self,_agent,pipeline=None):
-        super().__init__(_agent,pipeline)  
+    def __init__(self,**_args): #_agent,pipeline=None):
+        super().__init__(**_args) #_agent,pipeline)  
     def write(self,_data,**_args):
         if 'plugins' in _args :
             self._init_plugins(_args['plugins'])
@@ -94,7 +98,7 @@ class IETL(IReader) :
     This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
     """
     def __init__(self,**_args):
-        super().__init__(transport.get.reader(**_args['source']))
+        super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
         if 'target' in _args:
             self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
         else:
@@ -110,6 +114,8 @@ class IETL(IReader) :
             self.post(_data,**_kwargs)
 
         return _data
+    def run(self) :
+        return self.read()
     def post (self,_data,**_args) :
         """
         This function returns an instance of a process that will perform the write operation
diff --git a/transport/plugins/__init__.py b/transport/plugins/__init__.py
index 26e5782..760b66c 100644
--- a/transport/plugins/__init__.py
+++ b/transport/plugins/__init__.py
@@ -11,8 +11,10 @@ import importlib as IL
 import importlib.util
 import sys
 import os
+import pandas as pd
+import time
 
-class plugin :
+class Plugin :
     """
     Implementing function decorator for data-transport plugins (post-pre)-processing
     """
@@ -22,8 +24,9 @@ class plugin :
         :mode   restrict to reader/writer
         :about  tell what the function is about    
         """
-        self._name = _args['name']
-        self._about = _args['about']
+        self._name = _args['name'] if 'name' in _args else None
+        self._version = _args['version'] if 'version' in _args else '0.1'
+        self._doc = _args['doc'] if 'doc' in _args else "N/A"
         self._mode = _args['mode'] if 'mode' in _args else 'rw'
     def __call__(self,pointer,**kwargs):
         def wrapper(_args,**kwargs):
@@ -32,57 +35,64 @@ class plugin :
         # @TODO:
         # add attributes to the wrapper object
         #
+        self._name = pointer.__name__ if not self._name else self._name
         setattr(wrapper,'transport',True)
         setattr(wrapper,'name',self._name)
-        setattr(wrapper,'mode',self._mode)
-        setattr(wrapper,'about',self._about)
+        setattr(wrapper,'version',self._version)
+        setattr(wrapper,'doc',self._doc)
         return wrapper
 
-
 class PluginLoader :
     """
     This class is intended to load a plugin and make it available and assess the quality of the developed plugin
     """
+   
     def __init__(self,**_args):
         """
-        :path   location of the plugin (should be a single file)
-        :_names of functions to load
         """
-        _names = _args['names'] if 'names' in _args else None
-        path = _args['path'] if 'path' in _args else None
-        self._names = _names if type(_names) == list else [_names]
+        # _names = _args['names'] if 'names' in _args else None
+        # path = _args['path'] if 'path' in _args else None
+        # self._names = _names if type(_names) == list else [_names]
         self._modules = {}
         self._names = []
-        if path and os.path.exists(path) and _names:
-            for _name in self._names :
-                
-                spec = importlib.util.spec_from_file_location('private', path)
-                module = importlib.util.module_from_spec(spec)
-                spec.loader.exec_module(module) #--loads it into sys.modules
-                if hasattr(module,_name) :
-                    if self.isplugin(module,_name) :
-                        self._modules[_name] = getattr(module,_name)
-                    else:
-                        print ([f'Found {_name}', 'not plugin'])
-                else:
-                    #
-                    # @TODO: We should log this somewhere some how
-                    print (['skipping ',_name, hasattr(module,_name)])
-                    pass
-        else:
-            #
-            # Initialization is empty
-            self._names = []
+        self._registry = _args['registry']
+
         pass
-    def set(self,_pointer) :
+    def load (self,**_args):
+        self._modules = {}
+        self._names = []
+        path = _args ['path']
+        if os.path.exists(path) :
+            _alias = path.split(os.sep)[-1]
+            spec = importlib.util.spec_from_file_location(_alias, path)
+            module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(module) #--loads it into sys.modules
+            for _name in dir(module) :
+                if self.isplugin(module,_name) :
+                    self._module[_name] = getattr(module,_name)
+                    # self._names [_name]
+    def format (self,**_args):
+        uri = _args['alias'],_args['name']
+    # def set(self,_pointer) :
+    def set(self,_key) :
         """
         This function will set a pointer to the list of modules to be called
         This should be used within the context of using the framework as a library
         """
-        _name = _pointer.__name__
+        if type(_key).__name__ == 'function':
+            #
+            # The pointer is in the code provided by the user and loaded in memory
+            #
+            _pointer = _key
+            _key = 'inline@'+_key.__name__
+            # self._names.append(_key.__name__)
+        else:
+            _pointer = self._registry.get(key=_key)
+
+        if _pointer  :
+            self._modules[_key] = _pointer
+            self._names.append(_key)
         
-        self._modules[_name] = _pointer
-        self._names.append(_name)
     def isplugin(self,module,name):
         """
         This function determines if a module is a recognized plugin
@@ -107,12 +117,31 @@ class PluginLoader :
 
         _n = len(self._names)
         return len(set(self._modules.keys()) & set (self._names)) / _n
-    def apply(self,_data):
+    def apply(self,_data,_logger=[]):
+        _input= {}
+        
         for _name in self._modules :
-            _pointer = self._modules[_name]
-            #
-            # @TODO: add exception handling
-            _data = _pointer(_data)
+            try:
+                _input = {'action':'plugin','object':_name,'input':{'status':'PASS'}}
+                _pointer = self._modules[_name]
+                if type(_data) == list :
+                    _data = pd.DataFrame(_data)
+                _brow,_bcol = list(_data.shape) 
+                
+                #
+                # @TODO: add exception handling
+                _data = _pointer(_data)
+                
+                _input['input']['shape'] = {'rows-dropped':_brow - _data.shape[0]}
+            except Exception as e:
+                _input['input']['status'] = 'FAILED'
+                print (e)
+            time.sleep(1)
+            if _logger:
+                try:
+                    _logger(**_input)
+                except Exception as e:
+                    pass    
         return _data
     # def apply(self,_data,_name):
     #     """
diff --git a/transport/providers/__init__.py b/transport/providers/__init__.py
index 6422d74..b4cf37a 100644
--- a/transport/providers/__init__.py
+++ b/transport/providers/__init__.py
@@ -11,7 +11,7 @@ BIGQUERY	='bigquery'
 FILE 	= 'file'
 ETL = 'etl'
 
-SQLITE = 'sqlite'
+SQLITE = 'sqlite3'
 SQLITE3= 'sqlite3'
 DUCKDB = 'duckdb'
 
@@ -44,7 +44,9 @@ PGSQL	= POSTGRESQL
 
 AWS_S3  = 's3'
 RABBIT = RABBITMQ
-
-
+ICEBERG='iceberg'
+APACHE_ICEBERG = 'iceberg'
+DRILL = 'drill'
+APACHE_DRILL = 'drill'
 # QLISTENER = 'qlistener'
     
\ No newline at end of file
diff --git a/transport/registry.py b/transport/registry.py
index f3dc8ac..1f612dc 100644
--- a/transport/registry.py
+++ b/transport/registry.py
@@ -220,6 +220,8 @@ def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE):
 def lookup (label):
     global DATA
     return label in DATA
+has = lookup 
+
 def get (label='default') :
     global DATA
     return copy.copy(DATA[label]) if label in DATA else {}
diff --git a/transport/warehouse/__init__.py b/transport/warehouse/__init__.py
new file mode 100644
index 0000000..bcd76fd
--- /dev/null
+++ b/transport/warehouse/__init__.py
@@ -0,0 +1,7 @@
+"""
+This namespace/package is intended to handle read/writes against data warehouse solutions like :
+    - apache iceberg
+    - clickhouse (...)
+"""
+
+from . import iceberg, drill
\ No newline at end of file
diff --git a/transport/warehouse/drill.py b/transport/warehouse/drill.py
new file mode 100644
index 0000000..71f0e64
--- /dev/null
+++ b/transport/warehouse/drill.py
@@ -0,0 +1,55 @@
+import sqlalchemy
+import pandas as pd
+from .. sql.common import BaseReader , BaseWriter
+import sqlalchemy as sqa
+
+class Drill :
+    __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
+    def __init__(self,**_args):
+
+        self._host = _args['host'] if 'host' in _args else 'localhost'
+        self._port = _args['port'] if 'port' in _args else self.get_default_port()
+        self._ssl = False if 'ssl' not in _args else _args['ssl']
+        
+        self._table = _args['table'] if 'table' in _args else None
+        if self._table and '.' in self._table :
+            _seg = self._table.split('.')
+            if len(_seg) > 2 :
+                self._schema,self._database = _seg[:2]
+        else:
+            
+            self._database=_args['database']
+            self._schema = self._database.split('.')[0]
+        
+    def _get_uri(self,**_args):
+        return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
+    def get_provider(self):
+        return "drill+sadrill"
+    def get_default_port(self):
+        return "8047"
+    def meta(self,**_args):
+        _table = _args['table'] if 'table' in _args else self._table
+        if '.' in _table :
+            _schema = _table.split('.')[:2]
+            _schema = '.'.join(_schema)
+            _table = _table.split('.')[-1]
+        else:
+            _schema = self._schema
+        
+        # _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
+        _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
+        try:
+            _df  = pd.read_sql(_sql,self._engine)
+            return _df.to_dict(orient='records')
+        except Exception as e:
+            print (e)
+            pass
+        return []
+class Reader (Drill,BaseReader) :
+    def __init__(self,**_args):
+        super().__init__(**_args)
+        self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
+        self._engine= sqa.create_engine(self._get_uri(),future=True)
+class Writer(Drill,BaseWriter):
+    def __init__(self,**_args):
+        super().__init__(self,**_args)
\ No newline at end of file
diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py
new file mode 100644
index 0000000..4e73c62
--- /dev/null
+++ b/transport/warehouse/iceberg.py
@@ -0,0 +1,151 @@
+"""
+dependency:
+    - spark and SPARK_HOME environment variable must be set
+NOTE:
+    When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
+
+"""
+from pyspark.sql import SparkSession
+from pyspark import SparkContext
+from pyspark.sql.types import *
+from pyspark.sql.functions import col, to_date, to_timestamp
+import copy
+
+class Iceberg :
+    def __init__(self,**_args):
+        """
+        providing catalog meta information (you must get this from apache iceberg)
+        """
+        #
+        # Turning off logging (it's annoying & un-professional)
+        #
+        # _spconf = SparkContext()
+        # _spconf.setLogLevel("ERROR")
+        #
+        # @TODO:
+        #   Make arrangements for additional configuration elements 
+        #
+        self._session = SparkSession.builder.appName("data-transport").getOrCreate()
+        self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
+        # self._session.sparkContext.setLogLevel("ERROR")
+        self._catalog = self._session.catalog
+        self._table = _args['table'] if 'table' in _args else None
+        
+        if 'catalog' in _args :
+            #
+            # Let us set the default catalog
+            self._catalog.setCurrentCatalog(_args['catalog'])
+            
+        else:
+            # No current catalog has been set ...
+            pass
+        if 'database' in _args :
+            self._database = _args['database']
+            self._catalog.setCurrentDatabase(self._database)
+        else:
+            #
+            # Should we set the default as the first one if available ?
+            #
+            pass
+        self._catalogName = self._catalog.currentCatalog()
+        self._databaseName = self._catalog.currentDatabase()
+    def meta (self,**_args) :
+        """
+        This function should return the schema of a table (only)
+        """
+        _schema = []
+        try:
+            _table = _args['table'] if 'table' in _args else self._table
+            _tableName = self._getPrefix(**_args) + f".{_table}"
+            _tmp = self._session.table(_tableName).schema
+            _schema = _tmp.jsonValue()['fields']
+            for _item in _schema :
+                del _item['nullable'],_item['metadata']
+        except Exception as e:
+            
+            pass
+        return _schema
+    def _getPrefix (self,**_args):        
+        _catName = self._catalogName if 'catalog' not in _args else _args['catalog']
+        _datName = self._databaseName if 'database' not in _args else _args['database']
+        
+        return '.'.join([_catName,_datName])
+    def apply(self,_query):
+        """
+        sql query/command to run against apache iceberg
+        """
+        return self._session.sql(_query)
+    def has (self,**_args):
+        try:
+            _prefix = self._getPrefix(**_args)
+            if _prefix.endswith('.') :
+                return False
+            return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
+        except Exception as e:
+            print (e)
+            return False
+    
+    def close(self):
+        self._session.stop()
+class Reader(Iceberg) :
+    def __init__(self,**_args):
+        super().__init__(**_args)
+    def read(self,**_args):
+        _table = self._table
+        _prefix = self._getPrefix(**_args)        
+        if 'table' in _args or _table:
+            _table = _args['table'] if 'table' in _args else _table
+            _table = _prefix + f'.{_table}'
+            return self._session.table(_table).toPandas()
+        else:
+            sql = _args['sql']
+            return self._session.sql(sql).toPandas()
+        pass
+class Writer (Iceberg):
+    """
+    Writing data to an Apache Iceberg data warehouse (using pyspark)
+    """
+    def __init__(self,**_args):
+        super().__init__(**_args)
+        self._mode = 'append' if 'mode' not in _args else _args['mode']
+        self._table = None if 'table' not in _args else _args['table']
+    def format (self,_schema) :
+        _iceSchema = StructType([])
+        _map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
+                'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
+        for _item in _schema :
+            _name = _item['name']
+            _type = _item['type'].lower()
+            if _type not in _map :
+                _iceType = StringType()
+            else:
+                _iceType = _map[_type]
+            
+            _iceSchema.add (StructField(_name,_iceType,True))
+        return _iceSchema if len(_iceSchema) else []
+    def write(self,_data,**_args):
+        _prefix = self._getPrefix(**_args)
+        if 'table' not in _args and not self._table :
+            raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
+        _schema = self.format(_args['schema']) if 'schema' in _args else []
+        if not _schema :
+            rdd = self._session.createDataFrame(_data,verifySchema=False)
+        else :
+            rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
+        _mode = self._mode if 'mode' not in _args else _args['mode']
+        _table = self._table if 'table' not in _args else _args['table']
+        
+        # print (_data.shape,_mode,_table)
+        
+        if not self._session.catalog.tableExists(_table):
+        #     # @TODO:
+        #     # add partitioning information here 
+            rdd.writeTo(_table).using('iceberg').create()
+            
+        # #     _mode = 'overwrite'
+        # #     rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
+        else:
+            # rdd.writeTo(_table).append()
+        # #     _table = f'{_prefix}.{_table}'
+
+            rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)

From c3627586b3743acb917b852c020acde10f19f8e3 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Tue, 31 Dec 2024 12:32:14 -0600
Subject: [PATCH 03/16] fix: refactor cli switches

---
 bin/transport | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/bin/transport b/bin/transport
index d2072f7..eb8b17a 100755
--- a/bin/transport
+++ b/bin/transport
@@ -36,9 +36,10 @@ from enum import Enum
 from rich import print
 
 app = typer.Typer()
-app_x = typer.Typer()
-app_i = typer.Typer()
-app_r = typer.Typer()
+app_e = typer.Typer()   #-- handles etl (run, generate)
+app_x = typer.Typer()   #-- handles plugins (list,add, test)
+app_i = typer.Typer()   #-- handles information (version, license)
+app_r = typer.Typer()   #-- handles registry    
 REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
 REGISTRY_FILE= 'transport-registry.json'
 CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
@@ -54,7 +55,7 @@ def wait (jobs):
     while jobs :
             jobs = [pthread for pthread in jobs if pthread.is_alive()]
 
-@app.command(name="etl")
+@app_e.command(name="run")
 def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
         index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
         batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
@@ -113,7 +114,7 @@ def info():
     print ()
     print (transport.__license__)
 
-@app.command()
+@app_e.command()
 def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
     """
     This function will generate a configuration template to give a sense of how to create one
@@ -206,6 +207,7 @@ def registry_test (key):
         print (pd.DataFrame([_item]))
     else:
         print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered")
+app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
 app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
 app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
 app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")

From 49ebd4a43216d883d4cfe31660a7444b1677b4d0 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Sun, 2 Feb 2025 22:36:28 -0600
Subject: [PATCH 04/16] bug fix: close & etl

---
 transport/iowrapper.py  |  9 +++++++--
 transport/sql/common.py | 40 +++++++++++++++++++++++++---------------
 2 files changed, 32 insertions(+), 17 deletions(-)

diff --git a/transport/iowrapper.py b/transport/iowrapper.py
index e532e7d..cf5d717 100644
--- a/transport/iowrapper.py
+++ b/transport/iowrapper.py
@@ -109,8 +109,10 @@ class IETL(IReader) :
         self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
     def read(self,**_args):
         _data = super().read(**_args)
-
+        _schema = super().meta()
         for _kwargs in self._targets :
+            if _schema :
+                _kwargs['schema'] = _schema
             self.post(_data,**_kwargs)
 
         return _data
@@ -122,5 +124,8 @@ class IETL(IReader) :
         :_args  parameters associated with writer object
         """
         writer = transport.get.writer(**_args)
-        writer.write(_data)
+        if 'schema' in _args :
+            writer.write(_data,schema=_args['schema'])
+        else:
+            writer.write(_data)
         writer.close()
\ No newline at end of file
diff --git a/transport/sql/common.py b/transport/sql/common.py
index 0a55ed7..1a7e8a3 100644
--- a/transport/sql/common.py
+++ b/transport/sql/common.py
@@ -3,7 +3,7 @@ This file encapsulates common operations associated with SQL databases via SQLAl
 
 """
 import sqlalchemy as sqa
-from sqlalchemy import text 
+from sqlalchemy import text , MetaData, inspect
 
 import pandas as pd
 
@@ -34,20 +34,26 @@ class Base:
         :table  optional name of the table (can be fully qualified)
         """
         _table = self._table if 'table' not in _args else _args['table']
+        _map = {'TINYINT':'INTEGER','BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
         _schema = []
-        if _table :
-            if sqa.__version__.startswith('1.') :
-                _handler = sqa.MetaData(bind=self._engine)
-                _handler.reflect()
-            else:
-                #
-                # sqlalchemy's version 2.+
-                _handler = sqa.MetaData()
-                _handler.reflect(bind=self._engine)
-            #
-            # Let us extract the schema with the native types
-            _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
-            _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
+        # if _table :
+        #     if sqa.__version__.startswith('1.') :
+        #         _handler = sqa.MetaData(bind=self._engine)
+        #         _handler.reflect()
+        #     else:
+        #         #
+        #         # sqlalchemy's version 2.+
+        #         _handler = sqa.MetaData()
+        #         _handler.reflect(bind=self._engine)
+        #     #
+        #     # Let us extract the schema with the native types
+        #     _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
+        #     _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
+        #
+
+        _inspector = inspect(self._engine)
+        _columns = _inspector.get_columns(_table)
+        _schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns]
         return _schema
     def  has(self,**_args):
         return self.meta(**_args)
@@ -94,7 +100,11 @@ class SQLBase(Base):
         # _uri = [_item.strip() for _item in _uri if _item.strip()]
         # return '/'.join(_uri)
         return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
-
+    def close(self,) :
+        try:
+            self._engine.dispose()
+        except :
+            pass
 class BaseReader(SQLBase):
     def __init__(self,**_args):
         super().__init__(**_args)    

From 1a8112f1521e4aec9d318940834f6358a443f7f8 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Sun, 2 Feb 2025 22:37:07 -0600
Subject: [PATCH 05/16] adding iceberg notebook

---
 notebooks/iceberg.ipynb | 138 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 138 insertions(+)
 create mode 100644 notebooks/iceberg.ipynb

diff --git a/notebooks/iceberg.ipynb b/notebooks/iceberg.ipynb
new file mode 100644
index 0000000..849e088
--- /dev/null
+++ b/notebooks/iceberg.ipynb
@@ -0,0 +1,138 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "#### Writing to Apache Iceberg\n",
+    "\n",
+    "1. Insure you have a Google Bigquery service account key on disk\n",
+    "2. The service key location is set as an environment variable **BQ_KEY**\n",
+    "3. The dataset will be automatically created within the project associated with the service key\n",
+    "\n",
+    "The cell below creates a dataframe that will be stored within Google Bigquery"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 15,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "['data transport version ', '2.4.0']\n"
+     ]
+    }
+   ],
+   "source": [
+    "#\n",
+    "# Writing to Google Bigquery database\n",
+    "#\n",
+    "import transport\n",
+    "from transport import providers\n",
+    "import pandas as pd\n",
+    "import os\n",
+    "\n",
+    "PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
+    "DATASET = 'demo'\n",
+    "_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
+    "# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
+    "bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
+    "bqw.write(_data,if_exists='replace') #-- default is append\n",
+    "print (['data transport version ', transport.__version__])\n"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "#### Reading from Google Bigquery\n",
+    "\n",
+    "The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
+    "\n",
+    "- Basic read of the designated table (friends) created above\n",
+    "- Execute an aggregate SQL against the table\n",
+    "\n",
+    "**NOTE**\n",
+    "\n",
+    "By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
+    "Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 14,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "           name  age\n",
+      "0    James Bond   55\n",
+      "1  Steve Rogers  150\n",
+      "2  Steve Nyemba   44\n",
+      "--------- STATISTICS ------------\n"
+     ]
+    }
+   ],
+   "source": [
+    "\n",
+    "import transport\n",
+    "from transport import providers\n",
+    "import os\n",
+    "PRIVATE_KEY=os.environ['BQ_KEY']\n",
+    "pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
+    "_df = pgr.read(table='friends')\n",
+    "_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
+    "_sdf = pgr.read(sql=_query)\n",
+    "print (_df)\n",
+    "print ('--------- STATISTICS ------------')\n",
+    "# print (_sdf)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "An **auth-file** is a file that contains database parameters used to access the database. \n",
+    "For code in shared environments, we recommend \n",
+    "\n",
+    "1. Having the **auth-file** stored on disk \n",
+    "2. and the location of the file is set to an environment variable.\n",
+    "\n",
+    "To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.9.7"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}

From cdf783143e4b9cdfe2dbe9829370feeec6421be0 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Tue, 11 Feb 2025 12:52:44 -0600
Subject: [PATCH 06/16] ...

---
 transport/plugins/__init__.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/transport/plugins/__init__.py b/transport/plugins/__init__.py
index 760b66c..93ba11c 100644
--- a/transport/plugins/__init__.py
+++ b/transport/plugins/__init__.py
@@ -59,6 +59,9 @@ class PluginLoader :
 
         pass
     def load (self,**_args):
+        """
+        This function loads a plugin
+        """
         self._modules = {}
         self._names = []
         path = _args ['path']

From 30645e46bd538d4cfd916c662137ed521954d481 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Wed, 19 Feb 2025 23:03:14 -0600
Subject: [PATCH 07/16] bug fix: readonly for duckdb

---
 transport/sql/common.py | 12 +++++++++---
 transport/sql/duckdb.py |  4 +++-
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/transport/sql/common.py b/transport/sql/common.py
index 1a7e8a3..f647acb 100644
--- a/transport/sql/common.py
+++ b/transport/sql/common.py
@@ -13,7 +13,13 @@ class Base:
         self._port = None
         self._database = _args['database']
         self._table = _args['table'] if 'table' in _args else None
-        self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
+        _uri = self._get_uri(**_args)
+        if type(_uri) == str :
+            self._engine= sqa.create_engine(_uri,future=True)
+        else:
+            
+            _uri,_kwargs = _uri
+            self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
     def _set_uri(self,**_args) :
         """
         :provider   provider
@@ -64,8 +70,8 @@ class Base:
 
         @TODO: Execution of stored procedures
         """
-        if sql.lower().startswith('select') or sql.lower().startswith('with') :
-
+        if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
+            print (self._engine)
             return pd.read_sql(sql,self._engine) 
         else:
             _handler = self._engine.connect()
diff --git a/transport/sql/duckdb.py b/transport/sql/duckdb.py
index 06f66e5..97fb3fa 100644
--- a/transport/sql/duckdb.py
+++ b/transport/sql/duckdb.py
@@ -15,9 +15,11 @@ class Duck :
     def _get_uri(self,**_args):
         return f"""duckdb:///{self.database}"""
 class Reader(Duck,BaseReader) :
-    def __init__(self,**_args):
+    def __init__(self,**_args):        
         Duck.__init__(self,**_args)
         BaseReader.__init__(self,**_args)
+    def _get_uri(self,**_args):
+        return super()._get_uri(**_args),{'connect_args':{'read_only':True}}
 class Writer(Duck,BaseWriter):
     def __init__(self,**_args):
         Duck.__init__(self,**_args)

From afa442ea8ddb3fc9cf270028f8b3b9b63089a81b Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Wed, 19 Feb 2025 23:07:47 -0600
Subject: [PATCH 08/16] versioning update edition

---
 info/__init__.py | 3 ++-
 setup.py         | 4 ++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/info/__init__.py b/info/__init__.py
index 3eded86..bbdf8fd 100644
--- a/info/__init__.py
+++ b/info/__init__.py
@@ -1,7 +1,8 @@
 __app_name__  = 'data-transport'
 __author__ = 'The Phi Technology'
-__version__= '2.2.6'
+__version__= '2.2.8'
 __email__  = "info@the-phi.com"
+__edition__= 'ce'
 __license__=f"""
 Copyright 2010 - 2024, Steve L. Nyemba
 
diff --git a/setup.py b/setup.py
index f11a6ca..e8d2de0 100644
--- a/setup.py
+++ b/setup.py
@@ -5,14 +5,14 @@ from setuptools import setup, find_packages
 import os
 import sys
 # from version import __version__,__author__
-from info import __version__, __author__,__app_name__,__license__
+from info import __version__, __author__,__app_name__,__license__,__edition___
 
 
 def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":__app_name__,
-    "version":__version__,
+    "version":'-'.join([__version__,__edition__]),
     "author":__author__,"author_email":"info@the-phi.com",
     "license":__license__,
     # "packages":["transport","info","transport/sql"]},

From a1b5f2743ca8a046d5c721841df90427f96a1347 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Wed, 19 Feb 2025 23:26:12 -0600
Subject: [PATCH 09/16] bug fixes ...

---
 info/__init__.py | 9 +++++----
 setup.py         | 8 ++++++--
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/info/__init__.py b/info/__init__.py
index bbdf8fd..32cdcc4 100644
--- a/info/__init__.py
+++ b/info/__init__.py
@@ -14,9 +14,10 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I
 
 """
 
-__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
+__whatsnew__=f"""version {__version__}, 
+1. Added support for read/write logs as well as plugins (when applied)
+2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
+3. support for streaming data, important to use this with large volumes of data
+
 
-    1. simpler syntax to create readers/writers
-    2. auth-file registry that can be referenced using a label
-    3. duckdb support
 """
diff --git a/setup.py b/setup.py
index e8d2de0..a0cfeed 100644
--- a/setup.py
+++ b/setup.py
@@ -5,14 +5,14 @@ from setuptools import setup, find_packages
 import os
 import sys
 # from version import __version__,__author__
-from info import __version__, __author__,__app_name__,__license__,__edition___
+from info import __version__, __author__,__app_name__,__license__,__edition__
 
 
 def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":__app_name__,
-    "version":'-'.join([__version__,__edition__]),
+    "version":__version__,
     "author":__author__,"author_email":"info@the-phi.com",
     "license":__license__,
     # "packages":["transport","info","transport/sql"]},
@@ -22,6 +22,10 @@ args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write',
 args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args['scripts'] = ['bin/transport']
+args['classifiers'] = ['Programming Language :: Python :: 3',
+                    'License :: OSI Approved :: MIT License',
+                     'Operating System :: OS Independent',
+                                ],
 # if sys.version_info[0] == 2 :
 #     args['use_2to3'] = True
 #     args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']

From eaa2b99a2d48c990d44d8cdf07ec8cb1a5b77184 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Mon, 24 Feb 2025 09:26:15 -0600
Subject: [PATCH 10/16] bug fix: schema (postgresql) construct

---
 transport/iowrapper.py  | 12 ++++++------
 transport/sql/common.py | 11 +++++++++--
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/transport/iowrapper.py b/transport/iowrapper.py
index cf5d717..700b589 100644
--- a/transport/iowrapper.py
+++ b/transport/iowrapper.py
@@ -45,12 +45,12 @@ class IO:
     def close(self):
         if hasattr(self._agent,'close') :
             self._agent.close()
-    def apply(self):
-        """
-        applying pre/post conditions given a pipeline expression
-        """
-        for _pointer in self._plugins :
-            _data = _pointer(_data)
+    # def apply(self):
+    #     """
+    #     applying pre/post conditions given a pipeline expression
+    #     """
+    #     for _pointer in self._plugins :
+    #         _data = _pointer(_data)
     def apply(self,_query):
         if hasattr(self._agent,'apply') :
             return self._agent.apply(_query)
diff --git a/transport/sql/common.py b/transport/sql/common.py
index f647acb..304e945 100644
--- a/transport/sql/common.py
+++ b/transport/sql/common.py
@@ -71,7 +71,7 @@ class Base:
         @TODO: Execution of stored procedures
         """
         if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
-            print (self._engine)
+            
             return pd.read_sql(sql,self._engine) 
         else:
             _handler = self._engine.connect()
@@ -83,6 +83,7 @@ class Base:
 class SQLBase(Base):
     def __init__(self,**_args):
         super().__init__(**_args)
+        self._schema = _args.get('schema',None)
     def get_provider(self):
         raise Exception ("Provider Needs to be set ...")
     def get_default_port(self) :
@@ -122,6 +123,8 @@ class BaseReader(SQLBase):
             sql = _args['sql']
         else:
             _table = _args['table'] if 'table' in _args else self._table
+            if self._schema and type(self._schema) == str :
+                _table = f'{self._schema}.{_table}'
             sql = f'SELECT * FROM {_table}'
         return self.apply(sql)
     
@@ -132,6 +135,7 @@ class BaseWriter (SQLBase):
     """
     def __init__(self,**_args):
         super().__init__(**_args)
+        
     def write(self,_data,**_args):
         if type(_data) == dict :
             _df = pd.DataFrame(_data)
@@ -151,5 +155,8 @@ class BaseWriter (SQLBase):
         #     _mode['schema'] = _args['schema']
         # if 'if_exists' in _args :
         #     _mode['if_exists'] = _args['if_exists']
-
+        if 'schema' in _args and type(_args['schema']) == str:
+            self._schema = _args.get('schema',None)
+        if self._schema :
+           _mode['schema'] = self._schema
         _df.to_sql(_table,self._engine,**_mode)
\ No newline at end of file

From dad2956a8c55d90c842616348abc6b5dbc6f2102 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Mon, 24 Feb 2025 09:29:42 -0600
Subject: [PATCH 11/16] version update

---
 info/__init__.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/info/__init__.py b/info/__init__.py
index 32cdcc4..9355349 100644
--- a/info/__init__.py
+++ b/info/__init__.py
@@ -1,8 +1,8 @@
 __app_name__  = 'data-transport'
 __author__ = 'The Phi Technology'
-__version__= '2.2.8'
+__version__= '2.2.10'
 __email__  = "info@the-phi.com"
-__edition__= 'ce'
+__edition__= 'community'
 __license__=f"""
 Copyright 2010 - 2024, Steve L. Nyemba
 

From dd10f6db78db480f83e57332b0f5b5c4a4d0a67d Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Mon, 24 Feb 2025 09:35:36 -0600
Subject: [PATCH 12/16] bug fix: version & cli

---
 bin/transport         | 14 +++++++++++---
 transport/__init__.py |  2 +-
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/bin/transport b/bin/transport
index eb8b17a..6ca01bc 100755
--- a/bin/transport
+++ b/bin/transport
@@ -103,14 +103,22 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
     else:
          print (_df)
     print ()
-
+@app_i.command(name="version")
+def version ():
+    """
+    This function will return the version of the data-transport
+    """
+    print()
+    print (f'[bold] {transport.__app_name__} ,[blue] {transport.__edition__} edition [/blue], version {transport.__version__}[/bold]')
+    print ()
+ 
 @app_i.command(name="license")
 def info():
     """
     This function will display version and license information
     """
-
-    print (f'[bold] {transport.__app_name__} ,version {transport.__version__}[/bold]')
+    print()
+    print (f'[bold] {transport.__app_name__} ,{transport.__edition__}, version {transport.__version__}[/bold]')
     print ()
     print (transport.__license__)
 
diff --git a/transport/__init__.py b/transport/__init__.py
index 33a3261..c3bb901 100644
--- a/transport/__init__.py
+++ b/transport/__init__.py
@@ -22,7 +22,7 @@ from transport import sql, nosql, cloud, other, warehouse
 import pandas as pd
 import json
 import os
-from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__
+from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
 from transport.iowrapper import IWriter, IReader, IETL
 from transport.plugins import PluginLoader
 from transport import providers

From 469c6f89a2a3f7ccc2391831578b266b9f8e7cb4 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Wed, 5 Mar 2025 21:24:15 -0600
Subject: [PATCH 13/16] fixes with plugin handler

---
 bin/transport          |  70 +++++++----
 transport/iowrapper.py |  52 +++++---
 transport/registry.py  | 264 ++++++++++++++++++++---------------------
 3 files changed, 212 insertions(+), 174 deletions(-)

diff --git a/bin/transport b/bin/transport
index 6ca01bc..19b664e 100755
--- a/bin/transport
+++ b/bin/transport
@@ -34,6 +34,8 @@ import time
 from termcolor import colored
 from enum import Enum
 from rich import print
+import plugin_ix as pix
+
 
 app = typer.Typer()
 app_e = typer.Typer()   #-- handles etl (run, generate)
@@ -147,7 +149,7 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
                   path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), 
                   override:bool=typer.Option(default=False,help="override existing configuration or not")):
     """
-    This functiion will initialize the registry and have both application and calling code loading the database parameters by a label
+    This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label
 
     """
     try:
@@ -179,42 +181,62 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
     pass
 @app_x.command(name='add') 
 def register_plugs (
-    alias:Annotated[str,typer.Argument(help="unique alias fo the file being registered")],
-    path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")]
+    alias:Annotated[str,typer.Argument(help="unique function name within a file")],
+    path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
+    folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
+    
     ):
     """
-    This function will register a file and the functions within will be refrences <alias>.<function> in a configuration file
+    This function will register a file and the functions within we are interested in using
     """
-    transport.registry.plugins.init()
-    _log = transport.registry.plugins.add(alias,path)
+    if ',' in alias :
+        alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ] 
+    else:
+        alias = [alias.strip()]
+    _pregistry  = pix.Registry(folder=folder,plugin_folder='plugins/code')
+    _log = _pregistry.set(path,alias)
+    # transport.registry.plugins.init()
+    # _log = transport.registry.plugins.add(alias,path)
     _mark = TIMES_MARK if not _log else CHECK_MARK
-    _msg  = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added"""
+    _msg  = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
     print (f"""{_mark} {_msg}""")
 @app_x.command(name="list") 
-def registry_list ():
-
-    transport.registry.plugins.init()
-    _d = []
-    for _alias in transport.registry.plugins._data :
-        _data = transport.registry.plugins._data[_alias]
-        _d +=  [{'alias':_alias,"plugin-count":len(_data['content']),'e.g':'@'.join([_alias,_data['content'][0]]),'plugins':json.dumps(_data['content'])}]
-    if _d:
-        print (pd.DataFrame(_d))
+def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
+    """
+    This function will list all the plugins (python functions/files) that are registered and can be reused
+    """
+    _pregistry  = pix.Registry(folder=folder)
+    _df = _pregistry.stats()
+    if _df.empty :
+        print (f"{TIMES_MARK} registry at {folder} is not ready")
     else:
-        print (f"""{TIMES_MARK}, Plugin registry is not available or needs initialization""")
+        print (_df)
 
+@app_x.command ("has")
+def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
+                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
+    _pregistry  = pix.Registry(folder=folder)
+    if _pregistry.has(alias) :
+        _msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
+    else:
+        _msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
+    print (_msg)
+    
 @app_x.command(name="test") 
-def registry_test (key):
+def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
+                  folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
+    _pregistry  = pix.Registry(folder=folder)
     """
     This function allows to test syntax for a plugin i.e in terms of alias@function
     """
-    _item = transport.registry.plugins.has(key=key)
-    if _item :
-        del _item['pointer']
-        print (f"""{CHECK_MARK} successfully loaded \033[1m{key}\033[0m found, version {_item['version']}""")
-        print (pd.DataFrame([_item]))
+    # _item = transport.registry.plugins.has(key=key)
+    _pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
+        
+    if _pointer:
+        print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
+        
     else:
-        print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered")
+        print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
 app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
 app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
 app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
diff --git a/transport/iowrapper.py b/transport/iowrapper.py
index 700b589..396135e 100644
--- a/transport/iowrapper.py
+++ b/transport/iowrapper.py
@@ -11,6 +11,7 @@ from transport import providers
 from multiprocessing import Process
 import time
 
+import plugin_ix 
 
 class IO:
     """
@@ -21,20 +22,25 @@ class IO:
         plugins = _args['plugins'] if 'plugins' not in _args else None
 
         self._agent = _agent
+        self._ixloader = plugin_ix.Loader () #--
         if plugins :
-            self._init_plugins(plugins)
-        else:
-            self._plugins = None
+            self.init_plugins(plugins)
+        #     for _ref in plugins :
+        #         self._ixloader.set(_ref)
+        # if plugins :
+        #     self._init_plugins(plugins)
+        # else:
+        #     self._plugins = None
         
-    def _init_plugins(self,_args):
-        """
-        This function will load pipelined functions as a plugin loader
-        """
-        if 'path' in _args and 'names' in _args :
-            self._plugins = PluginLoader(**_args)
-        else:
-            self._plugins = PluginLoader()
-            [self._plugins.set(_pointer) for _pointer in _args]
+    # def _init_plugins(self,_args):
+    #     """
+    #     This function will load pipelined functions as a plugin loader
+    #     """
+    #     if 'path' in _args and 'names' in _args :
+    #         self._plugins = PluginLoader(**_args)
+    #     else:
+    #         self._plugins = PluginLoader()
+    #         [self._plugins.set(_pointer) for _pointer in _args]
         #
         # @TODO: We should have a way to log what plugins are loaded and ready to use
     def meta (self,**_args):
@@ -62,6 +68,10 @@ class IO:
             pointer = getattr(self._agent,_name)
             return pointer(_query)
         return None
+    def init_plugins(self,plugins):
+        for _ref in plugins :
+            self._ixloader.set(_ref)
+
 class IReader(IO):
     """
     This is a wrapper for read functionalities
@@ -71,22 +81,28 @@ class IReader(IO):
         
     def read(self,**_args):
         if 'plugins' in _args :
-            self._init_plugins(_args['plugins'])
+            self.init_plugins(_args['plugins'])
+
         _data = self._agent.read(**_args)
-        if self._plugins and self._plugins.ratio() > 0 :
-            _data = self._plugins.apply(_data)
+        # if self._plugins and self._plugins.ratio() > 0 :
+        #     _data = self._plugins.apply(_data)
         #
         # output data 
+        
+        #
+        # applying the the design pattern 
+        _data = self._ixloader.visitor(_data)
         return _data
 class IWriter(IO):
     def __init__(self,**_args): #_agent,pipeline=None):
         super().__init__(**_args) #_agent,pipeline)  
     def write(self,_data,**_args):
+        # if 'plugins' in _args :
+        #     self._init_plugins(_args['plugins'])
         if 'plugins' in _args :
-            self._init_plugins(_args['plugins'])
-        if self._plugins and self._plugins.ratio() > 0 :
-            _data = self._plugins.apply(_data)
+            self.init_plugins(_args['plugins'])
 
+        self._ixloader.visitor(_data)
         self._agent.write(_data,**_args)
 
 #
diff --git a/transport/registry.py b/transport/registry.py
index 1f612dc..71909f6 100644
--- a/transport/registry.py
+++ b/transport/registry.py
@@ -21,161 +21,161 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
     REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
 REGISTRY_FILE= 'transport-registry.json'
 DATA = {}
-class plugins:
-    #
-    # This is a utility function that should enable management of plugins-registry
-    # The class allows to add/remove elements 
-    #
-    # @TODO: add read/write properties to the class (better design practice)
-    #
-    _data = {}
-    FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
-    CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
-    FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
-    @staticmethod
-    def init():
+# class plugins:
+#     #
+#     # This is a utility function that should enable management of plugins-registry
+#     # The class allows to add/remove elements 
+#     #
+#     # @TODO: add read/write properties to the class (better design practice)
+#     #
+#     _data = {}
+#     FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
+#     CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
+#     FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
+#     @staticmethod
+#     def init():
         
-        if not os.path.exists(plugins.FOLDER) :
-            os.makedirs(plugins.FOLDER)
-        if not os.path.exists(plugins.CODE):
-            os.makedirs(plugins.CODE)
-        if not os.path.exists(plugins.FILE):
-            f = open(plugins.FILE,'w')
-            f.write("{}")
-            f.close()
-        plugins._read() #-- will load data as a side effect
+#         if not os.path.exists(plugins.FOLDER) :
+#             os.makedirs(plugins.FOLDER)
+#         if not os.path.exists(plugins.CODE):
+#             os.makedirs(plugins.CODE)
+#         if not os.path.exists(plugins.FILE):
+#             f = open(plugins.FILE,'w')
+#             f.write("{}")
+#             f.close()
+#         plugins._read() #-- will load data as a side effect
 
-    @staticmethod
-    def copy (path) :
+#     @staticmethod
+#     def copy (path) :
         
-        shutil.copy2(path,plugins.CODE)
-    @staticmethod
-    def _read ():
-        f = open(plugins.FILE)
-        try:
-            _data = json.loads(f.read())
-            f.close()
-        except Exception as e:
-            print (f"Corrupted registry, resetting ...")
-            _data = {}
-            plugins._write(_data)
+#         shutil.copy2(path,plugins.CODE)
+#     @staticmethod
+#     def _read ():
+#         f = open(plugins.FILE)
+#         try:
+#             _data = json.loads(f.read())
+#             f.close()
+#         except Exception as e:
+#             print (f"Corrupted registry, resetting ...")
+#             _data = {}
+#             plugins._write(_data)
             
-        plugins._data = _data
-    @staticmethod
-    def _write (_data):
-        f = open(plugins.FILE,'w')
-        f.write(json.dumps(_data))
-        f.close()
-        plugins._data = _data
+#         plugins._data = _data
+#     @staticmethod
+#     def _write (_data):
+#         f = open(plugins.FILE,'w')
+#         f.write(json.dumps(_data))
+#         f.close()
+#         plugins._data = _data
 
-    @staticmethod
-    def inspect (_path):
-        _names = []
+#     @staticmethod
+#     def inspect (_path):
+#         _names = []
         
-        if os.path.exists(_path) :
-            _filename = _path.split(os.sep)[-1]
-            spec = importlib.util.spec_from_file_location(_filename, _path)
-            module = importlib.util.module_from_spec(spec)
-            spec.loader.exec_module(module)
+#         if os.path.exists(_path) :
+#             _filename = _path.split(os.sep)[-1]
+#             spec = importlib.util.spec_from_file_location(_filename, _path)
+#             module = importlib.util.module_from_spec(spec)
+#             spec.loader.exec_module(module)
 
-            # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
-            for _name in dir(module) :
-                _pointer = getattr(module,_name) 
-                if hasattr(_pointer,'transport') :
-                    _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
-                    _names.append(_item)
+#             # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
+#             for _name in dir(module) :
+#                 _pointer = getattr(module,_name) 
+#                 if hasattr(_pointer,'transport') :
+#                     _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
+#                     _names.append(_item)
 
             
-        return _names
-    @staticmethod
-    def add (alias,path):
-        """
-        Add overwrite the registry entries
-        """
-        _names = plugins.inspect (path)
-        _log = []
+#         return _names
+#     @staticmethod
+#     def add (alias,path):
+#         """
+#         Add overwrite the registry entries
+#         """
+#         _names = plugins.inspect (path)
+#         _log = []
         
-        if _names :
-            #
-            # We should make sure we have all the plugins with the attributes (transport,name) set
-            _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
-            if _names :
-                plugins.copy(path)
-                _content = []
+#         if _names :
+#             #
+#             # We should make sure we have all the plugins with the attributes (transport,name) set
+#             _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
+#             if _names :
+#                 plugins.copy(path)
+#                 _content = []
                 
-                for _item in _names :
-                    _key = '@'.join([alias,_item['name']])
-                    _log.append(_item['name'])
-                #
-                # Let us update the registry 
-                # 
-                plugins.update(alias,path,_log)        
-        return _log
+#                 for _item in _names :
+#                     _key = '@'.join([alias,_item['name']])
+#                     _log.append(_item['name'])
+#                 #
+#                 # Let us update the registry 
+#                 # 
+#                 plugins.update(alias,path,_log)        
+#         return _log
     
-    @staticmethod
-    def update (alias,path,_log) :
-        """
-        updating the registry entries of the plugins (management data)
-        """
-        # f = open(plugins.FILE)
-        # _data = json.loads(f.read())
-        # f.close()
-        _data = plugins._data
-        # _log = plugins.add(alias,path)
+#     @staticmethod
+#     def update (alias,path,_log) :
+#         """
+#         updating the registry entries of the plugins (management data)
+#         """
+#         # f = open(plugins.FILE)
+#         # _data = json.loads(f.read())
+#         # f.close()
+#         _data = plugins._data
+#         # _log = plugins.add(alias,path)
         
-        if _log :
-            _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
-            plugins._write(_data) #-- will update data as a side effect
+#         if _log :
+#             _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
+#             plugins._write(_data) #-- will update data as a side effect
 
-        return _log
-    @staticmethod
-    def get(**_args) :
-        # f = open(plugins.FILE)
-        # _data = json.loads(f.read())
-        # f.close()
-        # if 'key' in _args :
-        #     alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
-        # else :
-        #     alias = _args['alias']
-        #     name  = _args['name']
+#         return _log
+#     @staticmethod
+#     def get(**_args) :
+#         # f = open(plugins.FILE)
+#         # _data = json.loads(f.read())
+#         # f.close()
+#         # if 'key' in _args :
+#         #     alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+#         # else :
+#         #     alias = _args['alias']
+#         #     name  = _args['name']
         
-        # if alias in _data :
+#         # if alias in _data :
             
-        #     _path = os.sep.join([plugins.CODE,_data[alias]['name']])
-        #     _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+#         #     _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+#         #     _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
             
-        #     _item = _item[0] if _item else None
-        #     if _item :
+#         #     _item = _item[0] if _item else None
+#         #     if _item :
                 
-        #         return _item['pointer'] 
-        # return None
-        _item = plugins.has(**_args)
-        return _item['pointer'] if _item else None
+#         #         return _item['pointer'] 
+#         # return None
+#         _item = plugins.has(**_args)
+#         return _item['pointer'] if _item else None
     
-    @staticmethod
-    def has (**_args):
-        f = open(plugins.FILE)
-        _data = json.loads(f.read())
-        f.close()
-        if 'key' in _args :
-            alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
-        else :
-            alias = _args['alias']
-            name  = _args['name']
+#     @staticmethod
+#     def has (**_args):
+#         f = open(plugins.FILE)
+#         _data = json.loads(f.read())
+#         f.close()
+#         if 'key' in _args :
+#             alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
+#         else :
+#             alias = _args['alias']
+#             name  = _args['name']
         
-        if alias in _data :
+#         if alias in _data :
             
-            _path = os.sep.join([plugins.CODE,_data[alias]['name']])
-            _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
+#             _path = os.sep.join([plugins.CODE,_data[alias]['name']])
+#             _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
             
-            _item = _item[0] if _item else None
-            if _item :
+#             _item = _item[0] if _item else None
+#             if _item :
                 
-                return copy.copy(_item)
-        return None
-    @staticmethod
-    def synch():
-        pass
+#                 return copy.copy(_item)
+#         return None
+#     @staticmethod
+#     def synch():
+#         pass
 
 def isloaded ():
     return DATA not in [{},None]

From 98ef8a848e93abc13634ab7868747d44f45f22a6 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Wed, 5 Mar 2025 22:11:37 -0600
Subject: [PATCH 14/16] bug fixes and dependencies

---
 setup.py               | 2 +-
 transport/iowrapper.py | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/setup.py b/setup.py
index a0cfeed..6888fe5 100644
--- a/setup.py
+++ b/setup.py
@@ -19,7 +19,7 @@ args    = {
 
     "packages": find_packages(include=['info','transport', 'transport.*'])}
 args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill']
+args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill',"git+https://github.com/lnyemba/plugins-ix"]
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args['scripts'] = ['bin/transport']
 args['classifiers'] = ['Programming Language :: Python :: 3',
diff --git a/transport/iowrapper.py b/transport/iowrapper.py
index 396135e..ff49906 100644
--- a/transport/iowrapper.py
+++ b/transport/iowrapper.py
@@ -19,7 +19,7 @@ class IO:
     """
     def __init__(self,**_args):
         _agent  = _args['agent']
-        plugins = _args['plugins'] if 'plugins' not in _args else None
+        plugins = _args['plugins'] if 'plugins' in _args else None
 
         self._agent = _agent
         self._ixloader = plugin_ix.Loader () #--

From 0977ad1b181cf8f1d2e583b36b09d2ccdf569816 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Wed, 5 Mar 2025 22:19:06 -0600
Subject: [PATCH 15/16] setup fixes

---
 setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/setup.py b/setup.py
index 6888fe5..503a2d6 100644
--- a/setup.py
+++ b/setup.py
@@ -19,7 +19,7 @@ args    = {
 
     "packages": find_packages(include=['info','transport', 'transport.*'])}
 args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill',"git+https://github.com/lnyemba/plugins-ix"]
+args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill','sqlalchemy_drill','plugin-ix@git+https://github.com/lnyemba/plugins-ix']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args['scripts'] = ['bin/transport']
 args['classifiers'] = ['Programming Language :: Python :: 3',

From 4b34c746ae80fbbb59ca428c2512b33570457945 Mon Sep 17 00:00:00 2001
From: Steve Nyemba <nyemba@gmail.com>
Date: Thu, 10 Apr 2025 20:51:45 -0500
Subject: [PATCH 16/16] bug fix: missing table

---
 info/__init__.py        |  2 +-
 transport/sql/common.py | 17 ++++++++++++-----
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/info/__init__.py b/info/__init__.py
index 9355349..98966f9 100644
--- a/info/__init__.py
+++ b/info/__init__.py
@@ -1,6 +1,6 @@
 __app_name__  = 'data-transport'
 __author__ = 'The Phi Technology'
-__version__= '2.2.10'
+__version__= '2.2.12'
 __email__  = "info@the-phi.com"
 __edition__= 'community'
 __license__=f"""
diff --git a/transport/sql/common.py b/transport/sql/common.py
index 304e945..7cf303f 100644
--- a/transport/sql/common.py
+++ b/transport/sql/common.py
@@ -56,11 +56,17 @@ class Base:
         #     _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
         #     _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
         #
+        try:
+            if _table :
+                _inspector = inspect(self._engine)
+                _columns = _inspector.get_columns(_table)
+                _schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns]
+                return _schema
+        except Exception as e:
+            pass
 
-        _inspector = inspect(self._engine)
-        _columns = _inspector.get_columns(_table)
-        _schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns]
-        return _schema
+        # else:
+        return []
     def  has(self,**_args):
         return self.meta(**_args)
     def apply(self,sql):
@@ -137,8 +143,9 @@ class BaseWriter (SQLBase):
         super().__init__(**_args)
         
     def write(self,_data,**_args):
+        
         if type(_data) == dict :
-            _df = pd.DataFrame(_data)
+            _df = pd.DataFrame([_data])
         elif type(_data) == list :
             _df = pd.DataFrame(_data)
         else: