2020-12-31 17:12:34 +00:00
"""
This file is intended to perform read / writes against an SQL database such as PostgreSQL , Redshift , Mysql , MsSQL . . .
LICENSE ( MIT )
Copyright 2016 - 2020 , The Phi Technology LLC
Permission is hereby granted , free of charge , to any person obtaining a copy of this software and associated documentation files ( the " Software " ) , to deal in the Software without restriction , including without limitation the rights to use , copy , modify , merge , publish , distribute , sublicense , and / or sell copies of the Software , and to permit persons to whom the Software is furnished to do so , subject to the following conditions :
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software .
THE SOFTWARE IS PROVIDED " AS IS " , WITHOUT WARRANTY OF ANY KIND , EXPRESS OR IMPLIED , INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY , FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT . IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM , DAMAGES OR OTHER LIABILITY , WHETHER IN AN ACTION OF CONTRACT , TORT OR OTHERWISE , ARISING FROM , OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE .
2022-08-03 17:07:27 +00:00
@TODO :
- Migrate SQLite to SQL hierarchy
- Include Write in Chunks from pandas
2020-12-31 17:12:34 +00:00
"""
import psycopg2 as pg
import mysql . connector as my
import sys
2022-03-03 22:08:24 +00:00
import sqlalchemy
2020-12-31 17:12:34 +00:00
if sys . version_info [ 0 ] > 2 :
from transport . common import Reader , Writer #, factory
else :
from common import Reader , Writer
import json
2021-03-15 17:38:54 +00:00
from google . oauth2 import service_account
2021-03-30 04:30:54 +00:00
from google . cloud import bigquery as bq
2022-03-08 00:50:29 +00:00
from multiprocessing import Lock , RLock
2020-12-31 17:12:34 +00:00
import pandas as pd
2021-01-02 23:39:27 +00:00
import numpy as np
2021-06-29 20:12:28 +00:00
import nzpy as nz #--- netezza drivers
2021-03-30 04:30:54 +00:00
import copy
2021-11-18 21:21:26 +00:00
import os
2021-03-30 04:30:54 +00:00
2020-12-31 17:12:34 +00:00
class SQLRW :
2022-03-08 00:50:29 +00:00
lock = RLock ( )
2022-08-03 17:07:27 +00:00
MAX_CHUNK = 2000000
2021-06-29 20:12:28 +00:00
DRIVERS = { " postgresql " : pg , " redshift " : pg , " mysql " : my , " mariadb " : my , " netezza " : nz }
REFERENCE = {
" netezza " : { " port " : 5480 , " handler " : nz , " dtype " : " VARCHAR(512) " } ,
" postgresql " : { " port " : 5432 , " handler " : pg , " dtype " : " VARCHAR " } ,
" redshift " : { " port " : 5432 , " handler " : pg , " dtype " : " VARCHAR " } ,
" mysql " : { " port " : 3360 , " handler " : my , " dtype " : " VARCHAR(256) " } ,
" mariadb " : { " port " : 3360 , " handler " : my , " dtype " : " VARCHAR(256) " } ,
}
2020-12-31 17:12:34 +00:00
def __init__ ( self , * * _args ) :
_info = { }
2021-06-29 20:12:28 +00:00
_info [ ' dbname ' ] = _args [ ' db ' ] if ' db ' in _args else _args [ ' database ' ]
2021-11-18 21:21:26 +00:00
self . table = _args [ ' table ' ] if ' table ' in _args else None
2020-12-31 17:12:34 +00:00
self . fields = _args [ ' fields ' ] if ' fields ' in _args else [ ]
2022-03-04 21:00:30 +00:00
self . schema = _args [ ' schema ' ] if ' schema ' in _args else ' '
2022-08-03 17:07:27 +00:00
self . _chunks = 1 if ' chunks ' not in _args else int ( _args [ ' chunks ' ] )
2022-03-03 22:08:24 +00:00
self . _provider = _args [ ' provider ' ] if ' provider ' in _args else None
2021-11-18 21:21:26 +00:00
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
_info [ ' host ' ] = _args [ ' host ' ]
_info [ ' port ' ] = _args [ ' port ' ]
# if 'host' in _args :
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
# # _info['port'] = SQLWriter.PROVIDERS[_args['provider']] if 'port' not in _args else _args['port']
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
2022-06-10 15:11:37 +00:00
self . lock = False if ' lock ' not in _args else _args [ ' lock ' ]
2020-12-31 17:12:34 +00:00
if ' username ' in _args or ' user ' in _args :
key = ' username ' if ' username ' in _args else ' user '
_info [ ' user ' ] = _args [ key ]
2022-03-03 22:08:24 +00:00
_info [ ' password ' ] = _args [ ' password ' ] if ' password ' in _args else ' '
2022-05-16 16:27:36 +00:00
if ' auth_file ' in _args :
_auth = json . loads ( open ( _args [ ' auth_file ' ] ) . read ( ) )
key = ' username ' if ' username ' in _auth else ' user '
_info [ ' user ' ] = _auth [ key ]
_info [ ' password ' ] = _auth [ ' password ' ] if ' password ' in _auth else ' '
_info [ ' host ' ] = _auth [ ' host ' ] if ' host ' in _auth else _info [ ' host ' ]
_info [ ' port ' ] = _auth [ ' port ' ] if ' port ' in _auth else _info [ ' port ' ]
if ' database ' in _auth :
_info [ ' dbname ' ] = _auth [ ' database ' ]
self . table = _auth [ ' table ' ] if ' table ' in _auth else self . table
2020-12-31 17:12:34 +00:00
#
# We need to load the drivers here to see what we are dealing with ...
2021-11-18 21:21:26 +00:00
# _handler = SQLWriter.REFERENCE[_provider]['handler']
_handler = _args [ ' driver ' ] #-- handler to the driver
self . _dtype = _args [ ' default ' ] [ ' type ' ] if ' default ' in _args and ' type ' in _args [ ' default ' ] else ' VARCHAR(256) '
2022-03-08 00:50:29 +00:00
# self._provider = _args['provider']
2021-11-18 21:21:26 +00:00
# self._dtype = SQLWriter.REFERENCE[_provider]['dtype'] if 'dtype' not in _args else _args['dtype']
# self._provider = _provider
2021-06-29 20:12:28 +00:00
if _handler == nz :
_info [ ' database ' ] = _info [ ' dbname ' ]
_info [ ' securityLevel ' ] = 0
del _info [ ' dbname ' ]
2022-03-03 22:08:24 +00:00
if _handler == my :
_info [ ' database ' ] = _info [ ' dbname ' ]
del _info [ ' dbname ' ]
2020-12-31 17:12:34 +00:00
self . conn = _handler . connect ( * * _info )
2022-03-03 22:08:24 +00:00
self . _engine = _args [ ' sqlalchemy ' ] if ' sqlalchemy ' in _args else None
2022-03-04 21:00:30 +00:00
def meta ( self , * * _args ) :
2022-06-13 01:05:03 +00:00
schema = [ ]
try :
if self . _engine :
table = _args [ ' table ' ] if ' table ' in _args else self . table
2023-06-16 22:39:02 +00:00
if sqlalchemy . __version__ . startswith ( ' 1. ' ) :
_m = sqlalchemy . MetaData ( bind = self . _engine )
_m . reflect ( )
else :
_m = sqlalchemy . MetaData ( )
_m . reflect ( bind = self . _engine )
2022-06-13 01:05:03 +00:00
schema = [ { " name " : _attr . name , " type " : str ( _attr . type ) } for _attr in _m . tables [ table ] . columns ]
2022-08-03 17:07:27 +00:00
#
# Some house keeping work
_m = { ' BIGINT ' : ' INTEGER ' , ' TEXT ' : ' STRING ' , ' DOUBLE_PRECISION ' : ' FLOAT ' , ' NUMERIC ' : ' FLOAT ' , ' DECIMAL ' : ' FLOAT ' , ' REAL ' : ' FLOAT ' }
for _item in schema :
if _item [ ' type ' ] in _m :
_item [ ' type ' ] = _m [ _item [ ' type ' ] ]
2022-06-13 01:05:03 +00:00
except Exception as e :
2023-06-16 22:39:02 +00:00
print ( e )
2022-06-13 02:14:46 +00:00
pass
2022-06-13 01:05:03 +00:00
return schema
2022-03-04 21:00:30 +00:00
def _tablename ( self , name ) :
return self . schema + ' . ' + name if self . schema not in [ None , ' ' ] and ' . ' not in name else name
2022-01-29 17:15:45 +00:00
def has ( self , * * _args ) :
2023-06-16 21:07:29 +00:00
return self . meta ( * * _args )
# found = False
# try:
2022-03-04 21:00:30 +00:00
2023-06-16 21:07:29 +00:00
# table = self._tablename(_args['table'])if 'table' in _args else self._tablename(self.table)
# sql = "SELECT * FROM :table LIMIT 1".replace(":table",table)
# if self._engine :
# _conn = self._engine.connect()
# else:
# _conn = self.conn
# found = pd.read_sql(sql,_conn).shape[0]
# found = True
2022-03-02 19:15:35 +00:00
2023-06-16 21:07:29 +00:00
# except Exception as e:
# print (e)
# pass
# finally:
# if not self._engine :
# _conn.close()
# return found
2021-01-02 11:24:12 +00:00
def isready ( self ) :
_sql = " SELECT * FROM :table LIMIT 1 " . replace ( " :table " , self . table )
try :
2023-06-16 21:07:29 +00:00
_conn = self . conn if not hasattr ( self , ' _engine ' ) else self . _engine
return pd . read_sql ( _sql , _conn ) . columns . tolist ( )
2021-01-02 11:24:12 +00:00
except Exception as e :
pass
return False
2020-12-31 17:12:34 +00:00
def apply ( self , _sql ) :
"""
This function applies a command and / or a query against the current relational data - store
: param _sql insert / select statement
@TODO : Store procedure calls
"""
2023-06-16 21:07:29 +00:00
#
2021-01-10 15:55:04 +00:00
_out = None
2020-12-31 17:12:34 +00:00
try :
2023-06-16 21:07:29 +00:00
if _sql . lower ( ) . startswith ( ' select ' ) :
2022-03-12 18:25:29 +00:00
2023-06-16 21:07:29 +00:00
_conn = self . _engine if self . _engine else self . conn
return pd . read_sql ( _sql , _conn )
2021-01-02 11:35:49 +00:00
else :
# Executing a command i.e no expected return values ...
2023-06-16 21:07:29 +00:00
cursor = self . conn . cursor ( )
2021-01-02 11:35:49 +00:00
cursor . execute ( _sql )
2021-01-02 11:38:06 +00:00
self . conn . commit ( )
2021-07-08 22:31:29 +00:00
except Exception as e :
print ( e )
2020-12-31 17:12:34 +00:00
finally :
2023-06-16 21:07:29 +00:00
if not self . _engine :
self . conn . commit ( )
# cursor.close()
2020-12-31 17:12:34 +00:00
def close ( self ) :
try :
2021-07-08 22:31:29 +00:00
self . conn . close ( )
2020-12-31 17:12:34 +00:00
except Exception as error :
print ( error )
pass
class SQLReader ( SQLRW , Reader ) :
def __init__ ( self , * * _args ) :
2022-03-03 22:08:24 +00:00
super ( ) . __init__ ( * * _args )
2020-12-31 17:12:34 +00:00
def read ( self , * * _args ) :
if ' sql ' in _args :
_sql = ( _args [ ' sql ' ] )
else :
2023-06-16 21:07:29 +00:00
if ' table ' in _args :
table = _args [ ' table ' ]
else :
table = self . table
# table = self.table if self.table is not None else _args['table']
2022-03-12 18:25:29 +00:00
_sql = " SELECT :fields FROM " + self . _tablename ( table )
2020-12-31 17:12:34 +00:00
if ' filter ' in _args :
_sql = _sql + " WHERE " + _args [ ' filter ' ]
2023-06-16 21:07:29 +00:00
if ' fields ' in _args :
_fields = _args [ ' fields ' ]
else :
_fields = ' * ' if not self . fields else " , " . join ( self . fields )
2020-12-31 17:12:34 +00:00
_sql = _sql . replace ( " :fields " , _fields )
2023-06-16 21:07:29 +00:00
#
# At this point we have a query we can execute gracefully
2020-12-31 17:12:34 +00:00
if ' limit ' in _args :
_sql = _sql + " LIMIT " + str ( _args [ ' limit ' ] )
2022-12-09 22:19:39 +00:00
#
# @TODO:
# It is here that we should inspect to see if there are any pre/post conditions
#
2020-12-31 17:12:34 +00:00
return self . apply ( _sql )
2021-07-08 22:31:29 +00:00
def close ( self ) :
try :
self . conn . close ( )
except Exception as error :
print ( error )
pass
2020-12-31 17:12:34 +00:00
class SQLWriter ( SQLRW , Writer ) :
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
2021-01-02 23:39:27 +00:00
#
# In the advent that data typing is difficult to determine we can inspect and perform a default case
# This slows down the process but improves reliability of the data
# NOTE: Proper data type should be set on the target system if their source is unclear.
2022-03-08 00:50:29 +00:00
2021-01-02 23:39:27 +00:00
self . _cast = False if ' cast ' not in _args else _args [ ' cast ' ]
2022-03-03 22:08:24 +00:00
2021-07-08 22:31:29 +00:00
def init ( self , fields = None ) :
2020-12-31 17:12:34 +00:00
if not fields :
try :
2022-03-04 21:00:30 +00:00
table = self . _tablename ( self . table )
self . fields = pd . read_sql_query ( " SELECT * FROM :table LIMIT 1 " . replace ( " :table " , table ) , self . conn ) . columns . tolist ( )
2020-12-31 17:12:34 +00:00
finally :
pass
else :
self . fields = fields ;
2022-03-03 22:08:24 +00:00
def make ( self , * * _args ) :
2022-03-04 21:00:30 +00:00
table = self . _tablename ( self . table ) if ' table ' not in _args else self . _tablename ( _args [ ' table ' ] )
2022-03-03 22:08:24 +00:00
if ' fields ' in _args :
2022-03-04 21:00:30 +00:00
fields = _args [ ' fields ' ]
# table = self._tablename(self.table)
sql = " " . join ( [ " CREATE TABLE " , table , " ( " , " , " . join ( [ name + ' ' + self . _dtype for name in fields ] ) , " ) " ] )
2022-03-07 20:17:27 +00:00
2022-03-03 22:08:24 +00:00
else :
2022-03-31 22:13:24 +00:00
schema = _args [ ' schema ' ] if ' schema ' in _args else [ ]
2022-03-04 21:00:30 +00:00
2022-03-03 22:08:24 +00:00
_map = _args [ ' map ' ] if ' map ' in _args else { }
sql = [ ] # ["CREATE TABLE ",_args['table'],"("]
for _item in schema :
_type = _item [ ' type ' ]
if _type in _map :
_type = _map [ _type ]
sql = sql + [ " " . join ( [ _item [ ' name ' ] , ' ' , _type ] ) ]
sql = " , " . join ( sql )
2022-03-04 21:00:30 +00:00
# table = self._tablename(_args['table'])
sql = [ " CREATE TABLE " , table , " ( " , sql , " ) " ]
2022-03-03 22:08:24 +00:00
sql = " " . join ( sql )
2022-03-31 22:13:24 +00:00
2020-12-31 17:12:34 +00:00
cursor = self . conn . cursor ( )
try :
2022-03-03 22:08:24 +00:00
2020-12-31 17:12:34 +00:00
cursor . execute ( sql )
except Exception as e :
2021-06-29 20:12:28 +00:00
print ( e )
2022-03-07 20:17:27 +00:00
# print (sql)
2020-12-31 17:12:34 +00:00
pass
finally :
2022-03-03 22:08:24 +00:00
# cursor.close()
self . conn . commit ( )
pass
2022-03-12 18:25:29 +00:00
def write ( self , info , * * _args ) :
2020-12-31 17:12:34 +00:00
"""
: param info writes a list of data to a given set of fields
"""
2021-01-02 23:39:27 +00:00
# inspect = False if 'inspect' not in _args else _args['inspect']
# cast = False if 'cast' not in _args else _args['cast']
2020-12-31 17:12:34 +00:00
if not self . fields :
2021-07-23 20:22:23 +00:00
if type ( info ) == list :
_fields = info [ 0 ] . keys ( )
elif type ( info ) == dict :
_fields = info . keys ( )
elif type ( info ) == pd . DataFrame :
2022-03-03 22:08:24 +00:00
_fields = info . columns . tolist ( )
2021-07-23 20:22:23 +00:00
# _fields = info.keys() if type(info) == dict else info[0].keys()
2020-12-31 17:12:34 +00:00
_fields = list ( _fields )
self . init ( _fields )
2022-08-03 17:07:27 +00:00
2020-12-31 17:12:34 +00:00
try :
2022-06-10 15:27:44 +00:00
table = _args [ ' table ' ] if ' table ' in _args else self . table
2023-05-25 14:39:51 +00:00
self . schema = _args [ ' schema ' ] if ' schema ' in _args else self . schema
2022-06-10 15:27:44 +00:00
table = self . _tablename ( table )
2023-05-25 14:39:51 +00:00
2022-03-04 21:00:30 +00:00
_sql = " INSERT INTO :table (:fields) VALUES (:values) " . replace ( " :table " , table ) #.replace(":table",self.table).replace(":fields",_fields)
2022-03-08 00:50:29 +00:00
if type ( info ) == list :
_info = pd . DataFrame ( info )
elif type ( info ) == dict :
_info = pd . DataFrame ( [ info ] )
2021-01-02 23:39:27 +00:00
else :
2022-03-08 00:50:29 +00:00
_info = pd . DataFrame ( info )
2022-03-03 22:08:24 +00:00
2022-03-08 00:50:29 +00:00
if _info . shape [ 0 ] == 0 :
2022-03-03 22:08:24 +00:00
2022-03-08 00:50:29 +00:00
return
2022-06-10 15:11:37 +00:00
if self . lock :
SQLRW . lock . acquire ( )
2022-08-03 17:07:27 +00:00
#
# we will adjust the chunks here in case we are not always sure of the
if self . _chunks == 1 and _info . shape [ 0 ] > SQLRW . MAX_CHUNK :
self . _chunks = 10
_indexes = np . array_split ( np . arange ( _info . shape [ 0 ] ) , self . _chunks )
for i in _indexes :
#
# In case we have an invalid chunk ...
if _info . iloc [ i ] . shape [ 0 ] == 0 :
continue
#
# We are enabling writing by chunks/batches because some persistent layers have quotas or limitations on volume of data
2022-03-08 00:50:29 +00:00
2022-08-03 17:07:27 +00:00
if self . _engine is not None :
# pd.to_sql(_info,self._engine)
if self . schema in [ ' ' , None ] :
rows = _info . iloc [ i ] . to_sql ( table , self . _engine , if_exists = ' append ' , index = False )
else :
#
# Writing with schema information ...
rows = _info . iloc [ i ] . to_sql ( self . table , self . _engine , schema = self . schema , if_exists = ' append ' , index = False )
else :
_fields = " , " . join ( self . fields )
_sql = _sql . replace ( " :fields " , _fields )
values = " , " . join ( " ? " * len ( self . fields ) ) if self . _provider == ' netezza ' else " , " . join ( [ " %s " for name in self . fields ] )
_sql = _sql . replace ( " :values " , values )
cursor = self . conn . cursor ( )
cursor . executemany ( _sql , _info . iloc [ i ] . values . tolist ( ) )
cursor . close ( )
2022-03-08 00:50:29 +00:00
# cursor.commit()
2021-06-29 20:12:28 +00:00
2021-01-10 15:55:04 +00:00
# self.conn.commit()
2020-12-31 17:12:34 +00:00
except Exception as e :
2021-07-08 22:31:29 +00:00
print ( e )
2021-06-29 20:12:28 +00:00
pass
2020-12-31 17:12:34 +00:00
finally :
2022-03-08 00:50:29 +00:00
if self . _engine is None :
self . conn . commit ( )
2022-06-10 15:11:37 +00:00
if self . lock :
SQLRW . lock . release ( )
2022-03-03 22:08:24 +00:00
# cursor.close()
2020-12-31 17:12:34 +00:00
pass
def close ( self ) :
try :
self . conn . close ( )
finally :
pass
2021-03-15 17:38:54 +00:00
class BigQuery :
def __init__ ( self , * * _args ) :
2021-03-30 04:30:54 +00:00
path = _args [ ' service_key ' ] if ' service_key ' in _args else _args [ ' private_key ' ]
2021-03-15 17:38:54 +00:00
self . credentials = service_account . Credentials . from_service_account_file ( path )
2021-03-30 04:30:54 +00:00
self . dataset = _args [ ' dataset ' ] if ' dataset ' in _args else None
self . path = path
2021-06-14 19:31:30 +00:00
self . dtypes = _args [ ' dtypes ' ] if ' dtypes ' in _args else None
2021-11-18 21:21:26 +00:00
self . table = _args [ ' table ' ] if ' table ' in _args else None
2022-03-03 22:08:24 +00:00
self . client = bq . Client . from_service_account_json ( self . path )
2021-03-30 04:30:54 +00:00
def meta ( self , * * _args ) :
"""
This function returns meta data for a given table or query with dataset / table properly formatted
: param table name of the name WITHOUT including dataset
: param sql sql query to be pulled ,
"""
2023-07-05 19:51:14 +00:00
table = _args [ ' table ' ] if ' table ' in _args else self . table
2023-07-05 20:53:54 +00:00
2022-05-11 16:17:27 +00:00
try :
2023-07-05 19:51:14 +00:00
if table :
2023-07-05 20:53:54 +00:00
_dataset = self . dataset if ' dataset ' not in _args else _args [ ' dataset ' ]
2023-07-05 20:58:51 +00:00
sql = f """ SELECT column_name as field_name, data_type as field_type FROM { _dataset } .INFORMATION_SCHEMA.COLUMNS WHERE table_name = ' { table } ' """
2023-07-05 20:53:54 +00:00
return self . read ( sql = sql ) . to_dict ( orient = ' records ' )
# ref = self.client.dataset(self.dataset).table(table)
# _schema = self.client.get_table(ref).schema
# return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
2023-07-05 19:51:14 +00:00
else :
return [ ]
2022-05-11 16:17:27 +00:00
except Exception as e :
2023-07-05 20:53:54 +00:00
2022-05-11 16:17:27 +00:00
return [ ]
2022-01-29 17:15:45 +00:00
def has ( self , * * _args ) :
found = False
try :
found = self . meta ( * * _args ) is not None
except Exception as e :
pass
2022-03-03 22:08:24 +00:00
return found
2021-03-15 17:38:54 +00:00
class BQReader ( BigQuery , Reader ) :
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
2022-03-12 18:25:29 +00:00
def apply ( self , sql ) :
self . read ( sql = sql )
2021-03-15 17:38:54 +00:00
pass
def read ( self , * * _args ) :
SQL = None
2021-11-18 21:21:26 +00:00
table = self . table if ' table ' not in _args else _args [ ' table ' ]
2021-03-15 17:38:54 +00:00
if ' sql ' in _args :
SQL = _args [ ' sql ' ]
2021-11-18 21:21:26 +00:00
elif table :
2021-03-15 17:38:54 +00:00
2021-11-18 21:21:26 +00:00
table = " " . join ( [ " ` " , table , " ` " ] ) if ' . ' in table else " " . join ( [ " `:dataset. " , table , " ` " ] )
2021-03-15 17:38:54 +00:00
SQL = " SELECT * FROM :table " . replace ( " :table " , table )
2021-11-18 21:21:26 +00:00
if not SQL :
return None
2021-03-15 17:38:54 +00:00
if SQL and ' limit ' in _args :
SQL + = " LIMIT " + str ( _args [ ' limit ' ] )
2021-03-30 04:30:54 +00:00
if ( ' :dataset ' in SQL or ' :DATASET ' in SQL ) and self . dataset :
SQL = SQL . replace ( ' :dataset ' , self . dataset ) . replace ( ' :DATASET ' , self . dataset )
2021-06-20 23:56:12 +00:00
_info = { ' credentials ' : self . credentials , ' dialect ' : ' standard ' }
2022-03-03 22:08:24 +00:00
return pd . read_gbq ( SQL , * * _info ) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None
2021-11-18 21:21:26 +00:00
2021-03-15 17:38:54 +00:00
class BQWriter ( BigQuery , Writer ) :
2021-04-13 22:29:54 +00:00
lock = Lock ( )
2021-03-15 17:38:54 +00:00
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
self . parallel = False if ' lock ' not in _args else _args [ ' lock ' ]
self . table = _args [ ' table ' ] if ' table ' in _args else None
self . mode = { ' if_exists ' : ' append ' , ' chunksize ' : 900000 , ' destination_table ' : self . table , ' credentials ' : self . credentials }
2022-08-03 17:07:27 +00:00
self . _chunks = 1 if ' chunks ' not in _args else int ( _args [ ' chunks ' ] )
2021-03-15 17:38:54 +00:00
def write ( self , _info , * * _args ) :
try :
2021-06-14 19:31:30 +00:00
if self . parallel or ' lock ' in _args :
2021-04-13 22:27:23 +00:00
BQWriter . lock . acquire ( )
2022-03-12 18:25:29 +00:00
_args [ ' table ' ] = self . table if ' table ' not in _args else _args [ ' table ' ]
2021-03-15 17:38:54 +00:00
self . _write ( _info , * * _args )
finally :
if self . parallel :
2021-04-13 22:27:23 +00:00
BQWriter . lock . release ( )
2021-03-15 17:38:54 +00:00
def _write ( self , _info , * * _args ) :
_df = None
if type ( _info ) in [ list , pd . DataFrame ] :
if type ( _info ) == list :
_df = pd . DataFrame ( _info )
elif type ( _info ) == pd . DataFrame :
_df = _info
2021-03-30 04:30:54 +00:00
if ' . ' not in _args [ ' table ' ] :
self . mode [ ' destination_table ' ] = ' . ' . join ( [ self . dataset , _args [ ' table ' ] ] )
else :
self . mode [ ' destination_table ' ] = _args [ ' table ' ] . strip ( )
if ' schema ' in _args :
self . mode [ ' table_schema ' ] = _args [ ' schema ' ]
2022-11-13 21:45:21 +00:00
#
# Let us insure that the types are somewhat compatible ...
# _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
2021-09-03 06:25:45 +00:00
# _mode = copy.deepcopy(self.mode)
_mode = self . mode
2022-08-03 17:07:27 +00:00
# _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
#
# Let us adjust the chunking here
self . _chunkks = 10 if _df . shape [ 0 ] > SQLRW . MAX_CHUNK and self . _chunks == 1 else self . _chunks
_indexes = np . array_split ( np . arange ( _df . shape [ 0 ] ) , self . _chunks )
for i in _indexes :
_df . iloc [ i ] . to_gbq ( * * self . mode )
2021-03-15 17:38:54 +00:00
pass
2021-11-18 21:21:26 +00:00
#
# Aliasing the big query classes allowing it to be backward compatible
#
BigQueryReader = BQReader
BigQueryWriter = BQWriter