diff --git a/transport/iowrapper.py b/transport/iowrapper.py index e532e7d..cf5d717 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -109,8 +109,10 @@ class IETL(IReader) : self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] def read(self,**_args): _data = super().read(**_args) - + _schema = super().meta() for _kwargs in self._targets : + if _schema : + _kwargs['schema'] = _schema self.post(_data,**_kwargs) return _data @@ -122,5 +124,8 @@ class IETL(IReader) : :_args parameters associated with writer object """ writer = transport.get.writer(**_args) - writer.write(_data) + if 'schema' in _args : + writer.write(_data,schema=_args['schema']) + else: + writer.write(_data) writer.close() \ No newline at end of file diff --git a/transport/sql/common.py b/transport/sql/common.py index 0a55ed7..1a7e8a3 100644 --- a/transport/sql/common.py +++ b/transport/sql/common.py @@ -3,7 +3,7 @@ This file encapsulates common operations associated with SQL databases via SQLAl """ import sqlalchemy as sqa -from sqlalchemy import text +from sqlalchemy import text , MetaData, inspect import pandas as pd @@ -34,20 +34,26 @@ class Base: :table optional name of the table (can be fully qualified) """ _table = self._table if 'table' not in _args else _args['table'] + _map = {'TINYINT':'INTEGER','BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} _schema = [] - if _table : - if sqa.__version__.startswith('1.') : - _handler = sqa.MetaData(bind=self._engine) - _handler.reflect() - else: - # - # sqlalchemy's version 2.+ - _handler = sqa.MetaData() - _handler.reflect(bind=self._engine) - # - # Let us extract the schema with the native types - _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} - _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns] + # if _table : + # if sqa.__version__.startswith('1.') : + # _handler = sqa.MetaData(bind=self._engine) + # _handler.reflect() + # else: + # # + # # sqlalchemy's version 2.+ + # _handler = sqa.MetaData() + # _handler.reflect(bind=self._engine) + # # + # # Let us extract the schema with the native types + # _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'} + # _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns] + # + + _inspector = inspect(self._engine) + _columns = _inspector.get_columns(_table) + _schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns] return _schema def has(self,**_args): return self.meta(**_args) @@ -94,7 +100,11 @@ class SQLBase(Base): # _uri = [_item.strip() for _item in _uri if _item.strip()] # return '/'.join(_uri) return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}' - + def close(self,) : + try: + self._engine.dispose() + except : + pass class BaseReader(SQLBase): def __init__(self,**_args): super().__init__(**_args)