From eaa2b99a2d48c990d44d8cdf07ec8cb1a5b77184 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 24 Feb 2025 09:26:15 -0600 Subject: [PATCH] bug fix: schema (postgresql) construct --- transport/iowrapper.py | 12 ++++++------ transport/sql/common.py | 11 +++++++++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/transport/iowrapper.py b/transport/iowrapper.py index cf5d717..700b589 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -45,12 +45,12 @@ class IO: def close(self): if hasattr(self._agent,'close') : self._agent.close() - def apply(self): - """ - applying pre/post conditions given a pipeline expression - """ - for _pointer in self._plugins : - _data = _pointer(_data) + # def apply(self): + # """ + # applying pre/post conditions given a pipeline expression + # """ + # for _pointer in self._plugins : + # _data = _pointer(_data) def apply(self,_query): if hasattr(self._agent,'apply') : return self._agent.apply(_query) diff --git a/transport/sql/common.py b/transport/sql/common.py index f647acb..304e945 100644 --- a/transport/sql/common.py +++ b/transport/sql/common.py @@ -71,7 +71,7 @@ class Base: @TODO: Execution of stored procedures """ if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'): - print (self._engine) + return pd.read_sql(sql,self._engine) else: _handler = self._engine.connect() @@ -83,6 +83,7 @@ class Base: class SQLBase(Base): def __init__(self,**_args): super().__init__(**_args) + self._schema = _args.get('schema',None) def get_provider(self): raise Exception ("Provider Needs to be set ...") def get_default_port(self) : @@ -122,6 +123,8 @@ class BaseReader(SQLBase): sql = _args['sql'] else: _table = _args['table'] if 'table' in _args else self._table + if self._schema and type(self._schema) == str : + _table = f'{self._schema}.{_table}' sql = f'SELECT * FROM {_table}' return self.apply(sql) @@ -132,6 +135,7 @@ class BaseWriter (SQLBase): """ def __init__(self,**_args): super().__init__(**_args) + def write(self,_data,**_args): if type(_data) == dict : _df = pd.DataFrame(_data) @@ -151,5 +155,8 @@ class BaseWriter (SQLBase): # _mode['schema'] = _args['schema'] # if 'if_exists' in _args : # _mode['if_exists'] = _args['if_exists'] - + if 'schema' in _args and type(_args['schema']) == str: + self._schema = _args.get('schema',None) + if self._schema : + _mode['schema'] = self._schema _df.to_sql(_table,self._engine,**_mode) \ No newline at end of file