2020-12-31 17:12:34 +00:00
"""
This file is intended to perform read / writes against an SQL database such as PostgreSQL , Redshift , Mysql , MsSQL . . .
LICENSE ( MIT )
Copyright 2016 - 2020 , The Phi Technology LLC
Permission is hereby granted , free of charge , to any person obtaining a copy of this software and associated documentation files ( the " Software " ) , to deal in the Software without restriction , including without limitation the rights to use , copy , modify , merge , publish , distribute , sublicense , and / or sell copies of the Software , and to permit persons to whom the Software is furnished to do so , subject to the following conditions :
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software .
THE SOFTWARE IS PROVIDED " AS IS " , WITHOUT WARRANTY OF ANY KIND , EXPRESS OR IMPLIED , INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY , FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT . IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM , DAMAGES OR OTHER LIABILITY , WHETHER IN AN ACTION OF CONTRACT , TORT OR OTHERWISE , ARISING FROM , OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE .
"""
import psycopg2 as pg
import mysql . connector as my
import sys
2022-03-03 22:08:24 +00:00
import sqlalchemy
2020-12-31 17:12:34 +00:00
if sys . version_info [ 0 ] > 2 :
from transport . common import Reader , Writer #, factory
else :
from common import Reader , Writer
import json
2021-03-15 17:38:54 +00:00
from google . oauth2 import service_account
2021-03-30 04:30:54 +00:00
from google . cloud import bigquery as bq
2022-03-08 00:50:29 +00:00
from multiprocessing import Lock , RLock
2020-12-31 17:12:34 +00:00
import pandas as pd
2021-01-02 23:39:27 +00:00
import numpy as np
2021-06-29 20:12:28 +00:00
import nzpy as nz #--- netezza drivers
2021-03-30 04:30:54 +00:00
import copy
2021-11-18 21:21:26 +00:00
import os
2021-03-30 04:30:54 +00:00
2020-12-31 17:12:34 +00:00
class SQLRW :
2022-03-08 00:50:29 +00:00
lock = RLock ( )
2021-06-29 20:12:28 +00:00
DRIVERS = { " postgresql " : pg , " redshift " : pg , " mysql " : my , " mariadb " : my , " netezza " : nz }
REFERENCE = {
" netezza " : { " port " : 5480 , " handler " : nz , " dtype " : " VARCHAR(512) " } ,
" postgresql " : { " port " : 5432 , " handler " : pg , " dtype " : " VARCHAR " } ,
" redshift " : { " port " : 5432 , " handler " : pg , " dtype " : " VARCHAR " } ,
" mysql " : { " port " : 3360 , " handler " : my , " dtype " : " VARCHAR(256) " } ,
" mariadb " : { " port " : 3360 , " handler " : my , " dtype " : " VARCHAR(256) " } ,
}
2020-12-31 17:12:34 +00:00
def __init__ ( self , * * _args ) :
_info = { }
2021-06-29 20:12:28 +00:00
_info [ ' dbname ' ] = _args [ ' db ' ] if ' db ' in _args else _args [ ' database ' ]
2021-11-18 21:21:26 +00:00
self . table = _args [ ' table ' ] if ' table ' in _args else None
2020-12-31 17:12:34 +00:00
self . fields = _args [ ' fields ' ] if ' fields ' in _args else [ ]
2022-03-04 21:00:30 +00:00
self . schema = _args [ ' schema ' ] if ' schema ' in _args else ' '
2022-03-03 22:08:24 +00:00
self . _provider = _args [ ' provider ' ] if ' provider ' in _args else None
2021-11-18 21:21:26 +00:00
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
_info [ ' host ' ] = _args [ ' host ' ]
_info [ ' port ' ] = _args [ ' port ' ]
# if 'host' in _args :
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# # _info['port'] = SQLWriter.PROVIDERS[_args['provider']] if 'port' not in _args else _args['port']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
2020-12-31 17:12:34 +00:00
if ' username ' in _args or ' user ' in _args :
key = ' username ' if ' username ' in _args else ' user '
_info [ ' user ' ] = _args [ key ]
2022-03-03 22:08:24 +00:00
_info [ ' password ' ] = _args [ ' password ' ] if ' password ' in _args else ' '
2020-12-31 17:12:34 +00:00
#
# We need to load the drivers here to see what we are dealing with ...
2021-11-18 21:21:26 +00:00
# _handler = SQLWriter.REFERENCE[_provider]['handler']
_handler = _args [ ' driver ' ] #-- handler to the driver
self . _dtype = _args [ ' default ' ] [ ' type ' ] if ' default ' in _args and ' type ' in _args [ ' default ' ] else ' VARCHAR(256) '
2022-03-08 00:50:29 +00:00
# self._provider = _args['provider']
2021-11-18 21:21:26 +00:00
# self._dtype = SQLWriter.REFERENCE[_provider]['dtype'] if 'dtype' not in _args else _args['dtype']
# self._provider = _provider
2021-06-29 20:12:28 +00:00
if _handler == nz :
_info [ ' database ' ] = _info [ ' dbname ' ]
_info [ ' securityLevel ' ] = 0
del _info [ ' dbname ' ]
2022-03-03 22:08:24 +00:00
if _handler == my :
_info [ ' database ' ] = _info [ ' dbname ' ]
del _info [ ' dbname ' ]
2020-12-31 17:12:34 +00:00
self . conn = _handler . connect ( * * _info )
2022-03-03 22:08:24 +00:00
self . _engine = _args [ ' sqlalchemy ' ] if ' sqlalchemy ' in _args else None
2022-03-04 21:00:30 +00:00
def meta ( self , * * _args ) :
return [ ]
def _tablename ( self , name ) :
return self . schema + ' . ' + name if self . schema not in [ None , ' ' ] and ' . ' not in name else name
2022-01-29 17:15:45 +00:00
def has ( self , * * _args ) :
found = False
try :
2022-03-04 21:00:30 +00:00
2022-03-07 20:17:27 +00:00
table = self . _tablename ( _args [ ' table ' ] ) if ' table ' in _args else self . _tablename ( self . table )
2022-01-29 17:15:45 +00:00
sql = " SELECT * FROM :table LIMIT 1 " . replace ( " :table " , table )
2022-03-03 22:08:24 +00:00
if self . _engine :
_conn = self . _engine . connect ( )
else :
_conn = self . conn
found = pd . read_sql ( sql , _conn ) . shape [ 0 ]
2022-03-02 19:15:35 +00:00
found = True
2022-01-29 17:15:45 +00:00
except Exception as e :
pass
2022-03-03 22:08:24 +00:00
finally :
if self . _engine :
_conn . close ( )
2022-01-29 17:15:45 +00:00
return found
2021-01-02 11:24:12 +00:00
def isready ( self ) :
_sql = " SELECT * FROM :table LIMIT 1 " . replace ( " :table " , self . table )
try :
return pd . read_sql ( _sql , self . conn ) . columns . tolist ( )
except Exception as e :
pass
return False
2020-12-31 17:12:34 +00:00
def apply ( self , _sql ) :
"""
This function applies a command and / or a query against the current relational data - store
: param _sql insert / select statement
@TODO : Store procedure calls
"""
cursor = self . conn . cursor ( )
2021-01-10 15:55:04 +00:00
_out = None
2020-12-31 17:12:34 +00:00
try :
2021-01-02 11:35:49 +00:00
if " select " in _sql . lower ( ) :
2022-03-12 18:25:29 +00:00
# _conn = self._engine if self._engine else self.conn
return pd . read_sql ( _sql , self . conn )
2021-01-02 11:35:49 +00:00
else :
# Executing a command i.e no expected return values ...
cursor . execute ( _sql )
2021-01-02 11:38:06 +00:00
self . conn . commit ( )
2021-07-08 22:31:29 +00:00
except Exception as e :
print ( e )
2020-12-31 17:12:34 +00:00
finally :
2021-01-10 15:55:04 +00:00
self . conn . commit ( )
2020-12-31 17:12:34 +00:00
cursor . close ( )
def close ( self ) :
try :
2021-07-08 22:31:29 +00:00
self . conn . close ( )
2020-12-31 17:12:34 +00:00
except Exception as error :
print ( error )
pass
class SQLReader ( SQLRW , Reader ) :
def __init__ ( self , * * _args ) :
2022-03-03 22:08:24 +00:00
super ( ) . __init__ ( * * _args )
2020-12-31 17:12:34 +00:00
def read ( self , * * _args ) :
if ' sql ' in _args :
_sql = ( _args [ ' sql ' ] )
else :
2022-03-12 18:25:29 +00:00
table = self . table if self . table is not None else _args [ ' table ' ]
_sql = " SELECT :fields FROM " + self . _tablename ( table )
2020-12-31 17:12:34 +00:00
if ' filter ' in _args :
_sql = _sql + " WHERE " + _args [ ' filter ' ]
_fields = ' * ' if not self . fields else " , " . join ( self . fields )
_sql = _sql . replace ( " :fields " , _fields )
if ' limit ' in _args :
_sql = _sql + " LIMIT " + str ( _args [ ' limit ' ] )
return self . apply ( _sql )
2021-07-08 22:31:29 +00:00
def close ( self ) :
try :
self . conn . close ( )
except Exception as error :
print ( error )
pass
2020-12-31 17:12:34 +00:00
class SQLWriter ( SQLRW , Writer ) :
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
2021-01-02 23:39:27 +00:00
#
# In the advent that data typing is difficult to determine we can inspect and perform a default case
# This slows down the process but improves reliability of the data
# NOTE: Proper data type should be set on the target system if their source is unclear.
2022-03-08 00:50:29 +00:00
2021-01-02 23:39:27 +00:00
self . _cast = False if ' cast ' not in _args else _args [ ' cast ' ]
2022-03-03 22:08:24 +00:00
2021-07-08 22:31:29 +00:00
def init ( self , fields = None ) :
2020-12-31 17:12:34 +00:00
if not fields :
try :
2022-03-04 21:00:30 +00:00
table = self . _tablename ( self . table )
self . fields = pd . read_sql_query ( " SELECT * FROM :table LIMIT 1 " . replace ( " :table " , table ) , self . conn ) . columns . tolist ( )
2020-12-31 17:12:34 +00:00
finally :
pass
else :
self . fields = fields ;
2022-03-03 22:08:24 +00:00
def make ( self , * * _args ) :
2022-03-04 21:00:30 +00:00
table = self . _tablename ( self . table ) if ' table ' not in _args else self . _tablename ( _args [ ' table ' ] )
2022-03-03 22:08:24 +00:00
if ' fields ' in _args :
2022-03-04 21:00:30 +00:00
fields = _args [ ' fields ' ]
# table = self._tablename(self.table)
sql = " " . join ( [ " CREATE TABLE " , table , " ( " , " , " . join ( [ name + ' ' + self . _dtype for name in fields ] ) , " ) " ] )
2022-03-07 20:17:27 +00:00
2022-03-03 22:08:24 +00:00
else :
2022-03-07 20:17:27 +00:00
schema = _args [ ' schema ' ] if ' schema ' in _args else ' '
2022-03-04 21:00:30 +00:00
2022-03-03 22:08:24 +00:00
_map = _args [ ' map ' ] if ' map ' in _args else { }
sql = [ ] # ["CREATE TABLE ",_args['table'],"("]
for _item in schema :
_type = _item [ ' type ' ]
if _type in _map :
_type = _map [ _type ]
sql = sql + [ " " . join ( [ _item [ ' name ' ] , ' ' , _type ] ) ]
sql = " , " . join ( sql )
2022-03-04 21:00:30 +00:00
# table = self._tablename(_args['table'])
sql = [ " CREATE TABLE " , table , " ( " , sql , " ) " ]
2022-03-03 22:08:24 +00:00
sql = " " . join ( sql )
# sql = " ".join(["CREATE TABLE",_args['table']," (", ",".join([ schema[i]['name'] +' '+ (schema[i]['type'] if schema[i]['type'] not in _map else _map[schema[i]['type'] ]) for i in range(0,N)]),")"])
2020-12-31 17:12:34 +00:00
cursor = self . conn . cursor ( )
try :
2022-03-03 22:08:24 +00:00
2020-12-31 17:12:34 +00:00
cursor . execute ( sql )
except Exception as e :
2021-06-29 20:12:28 +00:00
print ( e )
2022-03-07 20:17:27 +00:00
# print (sql)
2020-12-31 17:12:34 +00:00
pass
finally :
2022-03-03 22:08:24 +00:00
# cursor.close()
self . conn . commit ( )
pass
2022-03-12 18:25:29 +00:00
def write ( self , info , * * _args ) :
2020-12-31 17:12:34 +00:00
"""
: param info writes a list of data to a given set of fields
"""
2021-01-02 23:39:27 +00:00
# inspect = False if 'inspect' not in _args else _args['inspect']
# cast = False if 'cast' not in _args else _args['cast']
2020-12-31 17:12:34 +00:00
if not self . fields :
2021-07-23 20:22:23 +00:00
if type ( info ) == list :
_fields = info [ 0 ] . keys ( )
elif type ( info ) == dict :
_fields = info . keys ( )
elif type ( info ) == pd . DataFrame :
2022-03-03 22:08:24 +00:00
_fields = info . columns . tolist ( )
2021-07-23 20:22:23 +00:00
# _fields = info.keys() if type(info) == dict else info[0].keys()
2020-12-31 17:12:34 +00:00
_fields = list ( _fields )
self . init ( _fields )
2021-06-29 20:12:28 +00:00
#
# @TODO: Use pandas/odbc ? Not sure b/c it requires sqlalchemy
#
2022-03-03 22:08:24 +00:00
# if type(info) != list :
# #
# # We are assuming 2 cases i.e dict or pd.DataFrame
# info = [info] if type(info) == dict else info.values.tolist()
2022-03-08 00:50:29 +00:00
2020-12-31 17:12:34 +00:00
try :
2022-03-04 21:00:30 +00:00
table = self . _tablename ( self . table )
_sql = " INSERT INTO :table (:fields) VALUES (:values) " . replace ( " :table " , table ) #.replace(":table",self.table).replace(":fields",_fields)
2022-03-08 00:50:29 +00:00
if type ( info ) == list :
_info = pd . DataFrame ( info )
elif type ( info ) == dict :
_info = pd . DataFrame ( [ info ] )
2021-01-02 23:39:27 +00:00
else :
2022-03-08 00:50:29 +00:00
_info = pd . DataFrame ( info )
2022-03-03 22:08:24 +00:00
2022-03-08 00:50:29 +00:00
if _info . shape [ 0 ] == 0 :
2022-03-03 22:08:24 +00:00
2022-03-08 00:50:29 +00:00
return
SQLRW . lock . acquire ( )
if self . _engine is not None :
# pd.to_sql(_info,self._engine)
if self . schema in [ ' ' , None ] :
rows = _info . to_sql ( table , self . _engine , if_exists = ' append ' , index = False )
2022-03-07 20:17:27 +00:00
else :
2022-03-08 00:50:29 +00:00
rows = _info . to_sql ( self . table , self . _engine , schema = self . schema , if_exists = ' append ' , index = False )
else :
_fields = " , " . join ( self . fields )
_sql = _sql . replace ( " :fields " , _fields )
values = " , " . join ( " ? " * len ( self . fields ) ) if self . _provider == ' netezza ' else " , " . join ( [ " %s " for name in self . fields ] )
_sql = _sql . replace ( " :values " , values )
cursor = self . conn . cursor ( )
cursor . executemany ( _sql , _info . values . tolist ( ) )
cursor . close ( )
# cursor.commit()
2021-06-29 20:12:28 +00:00
2021-01-10 15:55:04 +00:00
# self.conn.commit()
2020-12-31 17:12:34 +00:00
except Exception as e :
2021-07-08 22:31:29 +00:00
print ( e )
2021-06-29 20:12:28 +00:00
pass
2020-12-31 17:12:34 +00:00
finally :
2022-03-08 00:50:29 +00:00
if self . _engine is None :
self . conn . commit ( )
SQLRW . lock . release ( )
2022-03-03 22:08:24 +00:00
# cursor.close()
2020-12-31 17:12:34 +00:00
pass
def close ( self ) :
try :
self . conn . close ( )
finally :
pass
2021-03-15 17:38:54 +00:00
class BigQuery :
def __init__ ( self , * * _args ) :
2021-03-30 04:30:54 +00:00
path = _args [ ' service_key ' ] if ' service_key ' in _args else _args [ ' private_key ' ]
2021-03-15 17:38:54 +00:00
self . credentials = service_account . Credentials . from_service_account_file ( path )
2021-03-30 04:30:54 +00:00
self . dataset = _args [ ' dataset ' ] if ' dataset ' in _args else None
self . path = path
2021-06-14 19:31:30 +00:00
self . dtypes = _args [ ' dtypes ' ] if ' dtypes ' in _args else None
2021-11-18 21:21:26 +00:00
self . table = _args [ ' table ' ] if ' table ' in _args else None
2022-03-03 22:08:24 +00:00
self . client = bq . Client . from_service_account_json ( self . path )
2021-03-30 04:30:54 +00:00
def meta ( self , * * _args ) :
"""
This function returns meta data for a given table or query with dataset / table properly formatted
: param table name of the name WITHOUT including dataset
: param sql sql query to be pulled ,
"""
2022-03-07 20:17:27 +00:00
table = _args [ ' table ' ]
2022-03-03 22:08:24 +00:00
ref = self . client . dataset ( self . dataset ) . table ( table )
return self . client . get_table ( ref ) . schema
2022-01-29 17:15:45 +00:00
def has ( self , * * _args ) :
found = False
try :
found = self . meta ( * * _args ) is not None
except Exception as e :
pass
2022-03-03 22:08:24 +00:00
return found
2021-03-15 17:38:54 +00:00
class BQReader ( BigQuery , Reader ) :
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
2022-03-12 18:25:29 +00:00
def apply ( self , sql ) :
self . read ( sql = sql )
2021-03-15 17:38:54 +00:00
pass
def read ( self , * * _args ) :
SQL = None
2021-11-18 21:21:26 +00:00
table = self . table if ' table ' not in _args else _args [ ' table ' ]
2021-03-15 17:38:54 +00:00
if ' sql ' in _args :
SQL = _args [ ' sql ' ]
2021-11-18 21:21:26 +00:00
elif table :
2021-03-15 17:38:54 +00:00
2021-11-18 21:21:26 +00:00
table = " " . join ( [ " ` " , table , " ` " ] ) if ' . ' in table else " " . join ( [ " `:dataset. " , table , " ` " ] )
2021-03-15 17:38:54 +00:00
SQL = " SELECT * FROM :table " . replace ( " :table " , table )
2021-11-18 21:21:26 +00:00
if not SQL :
return None
2021-03-15 17:38:54 +00:00
if SQL and ' limit ' in _args :
SQL + = " LIMIT " + str ( _args [ ' limit ' ] )
2021-03-30 04:30:54 +00:00
if ( ' :dataset ' in SQL or ' :DATASET ' in SQL ) and self . dataset :
SQL = SQL . replace ( ' :dataset ' , self . dataset ) . replace ( ' :DATASET ' , self . dataset )
2021-06-20 23:56:12 +00:00
_info = { ' credentials ' : self . credentials , ' dialect ' : ' standard ' }
2022-03-03 22:08:24 +00:00
return pd . read_gbq ( SQL , * * _info ) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None
2021-11-18 21:21:26 +00:00
2021-03-15 17:38:54 +00:00
class BQWriter ( BigQuery , Writer ) :
2021-04-13 22:29:54 +00:00
lock = Lock ( )
2021-03-15 17:38:54 +00:00
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
self . parallel = False if ' lock ' not in _args else _args [ ' lock ' ]
self . table = _args [ ' table ' ] if ' table ' in _args else None
self . mode = { ' if_exists ' : ' append ' , ' chunksize ' : 900000 , ' destination_table ' : self . table , ' credentials ' : self . credentials }
def write ( self , _info , * * _args ) :
try :
2021-06-14 19:31:30 +00:00
if self . parallel or ' lock ' in _args :
2021-04-13 22:27:23 +00:00
BQWriter . lock . acquire ( )
2022-03-12 18:25:29 +00:00
_args [ ' table ' ] = self . table if ' table ' not in _args else _args [ ' table ' ]
2021-03-15 17:38:54 +00:00
self . _write ( _info , * * _args )
finally :
if self . parallel :
2021-04-13 22:27:23 +00:00
BQWriter . lock . release ( )
2021-03-15 17:38:54 +00:00
def _write ( self , _info , * * _args ) :
_df = None
if type ( _info ) in [ list , pd . DataFrame ] :
if type ( _info ) == list :
_df = pd . DataFrame ( _info )
elif type ( _info ) == pd . DataFrame :
_df = _info
2021-03-30 04:30:54 +00:00
if ' . ' not in _args [ ' table ' ] :
self . mode [ ' destination_table ' ] = ' . ' . join ( [ self . dataset , _args [ ' table ' ] ] )
else :
self . mode [ ' destination_table ' ] = _args [ ' table ' ] . strip ( )
if ' schema ' in _args :
self . mode [ ' table_schema ' ] = _args [ ' schema ' ]
2021-09-03 06:25:45 +00:00
# _mode = copy.deepcopy(self.mode)
_mode = self . mode
2021-03-15 17:38:54 +00:00
_df . to_gbq ( * * self . mode ) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
pass
2021-11-18 21:21:26 +00:00
#
# Aliasing the big query classes allowing it to be backward compatible
#
BigQueryReader = BQReader
BigQueryWriter = BQWriter