2024-03-28 20:34:39 +00:00
"""
Implementing support for google ' s bigquery
- cloud . bigquery . Read
- cloud . bigquery . Write
"""
import json
from google . oauth2 import service_account
from google . cloud import bigquery as bq
from multiprocessing import Lock , RLock
import pandas as pd
import pandas_gbq as pd_gbq
import numpy as np
import time
MAX_CHUNK = 2000000
class BigQuery :
def __init__ ( self , * * _args ) :
path = _args [ ' service_key ' ] if ' service_key ' in _args else _args [ ' private_key ' ]
self . credentials = service_account . Credentials . from_service_account_file ( path )
self . dataset = _args [ ' dataset ' ] if ' dataset ' in _args else None
self . path = path
self . dtypes = _args [ ' dtypes ' ] if ' dtypes ' in _args else None
self . table = _args [ ' table ' ] if ' table ' in _args else None
self . client = bq . Client . from_service_account_json ( self . path )
def meta ( self , * * _args ) :
"""
This function returns meta data for a given table or query with dataset / table properly formatted
: param table name of the name WITHOUT including dataset
: param sql sql query to be pulled ,
"""
table = _args [ ' table ' ] if ' table ' in _args else self . table
try :
if table :
_dataset = self . dataset if ' dataset ' not in _args else _args [ ' dataset ' ]
sql = f """ SELECT column_name as name, data_type as type FROM { _dataset } .INFORMATION_SCHEMA.COLUMNS WHERE table_name = ' { table } ' """
_info = { ' credentials ' : self . credentials , ' dialect ' : ' standard ' }
return pd_gbq . read_gbq ( sql , * * _info ) . to_dict ( orient = ' records ' )
# return self.read(sql=sql).to_dict(orient='records')
# ref = self.client.dataset(self.dataset).table(table)
# _schema = self.client.get_table(ref).schema
# return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
else :
return [ ]
except Exception as e :
return [ ]
def has ( self , * * _args ) :
found = False
try :
_has = self . meta ( * * _args )
found = _has is not None and len ( _has ) > 0
except Exception as e :
pass
return found
class Reader ( BigQuery ) :
"""
Implementing support for reading from bigquery , This class acts as a wrapper around google ' s API
"""
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
def apply ( self , sql ) :
return self . read ( sql = sql )
def read ( self , * * _args ) :
SQL = None
table = self . table if ' table ' not in _args else _args [ ' table ' ]
if ' sql ' in _args :
SQL = _args [ ' sql ' ]
elif table :
table = " " . join ( [ " ` " , table , " ` " ] ) if ' . ' in table else " " . join ( [ " `:dataset. " , table , " ` " ] )
SQL = " SELECT * FROM :table " . replace ( " :table " , table )
if not SQL :
return None
if SQL and ' limit ' in _args :
SQL + = " LIMIT " + str ( _args [ ' limit ' ] )
if ( ' :dataset ' in SQL or ' :DATASET ' in SQL ) and self . dataset :
SQL = SQL . replace ( ' :dataset ' , self . dataset ) . replace ( ' :DATASET ' , self . dataset )
_info = { ' credentials ' : self . credentials , ' dialect ' : ' standard ' }
return pd_gbq . read_gbq ( SQL , * * _info ) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None
class Writer ( BigQuery ) :
"""
This class implements support for writing against bigquery
"""
lock = RLock ( )
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
self . parallel = False if ' lock ' not in _args else _args [ ' lock ' ]
self . table = _args [ ' table ' ] if ' table ' in _args else None
self . mode = { ' if_exists ' : ' append ' , ' chunksize ' : 900000 , ' destination_table ' : self . table , ' credentials ' : self . credentials }
self . _chunks = 1 if ' chunks ' not in _args else int ( _args [ ' chunks ' ] )
self . _location = ' US ' if ' location ' not in _args else _args [ ' location ' ]
def write ( self , _data , * * _args ) :
"""
This function will perform a write to bigquery
: _data data - frame to be written to bigquery
"""
try :
if self . parallel or ' lock ' in _args :
2024-04-02 21:58:58 +00:00
Writer . lock . acquire ( )
2024-03-28 20:34:39 +00:00
_args [ ' table ' ] = self . table if ' table ' not in _args else _args [ ' table ' ]
self . _write ( _data , * * _args )
finally :
if self . parallel :
2024-04-02 21:58:58 +00:00
Writer . lock . release ( )
2024-03-28 20:34:39 +00:00
def submit ( self , _sql ) :
"""
Write the output of a massive query to a given table , biquery will handle this as a job
This function will return the job identifier
"""
_config = bq . QueryJobConfig ( )
_config . destination = self . client . dataset ( self . dataset ) . table ( self . table )
_config . allow_large_results = True
# _config.write_disposition = bq.bq_consts.WRITE_APPEND
_config . dry_run = False
# _config.priority = 'BATCH'
_resp = self . client . query ( _sql , location = self . _location , job_config = _config )
return _resp . job_id
def status ( self , _id ) :
return self . client . get_job ( _id , location = self . _location )
def _write ( self , _info , * * _args ) :
_df = None
if type ( _info ) in [ list , pd . DataFrame ] :
if type ( _info ) == list :
_df = pd . DataFrame ( _info )
elif type ( _info ) == pd . DataFrame :
_df = _info
if ' . ' not in _args [ ' table ' ] :
self . mode [ ' destination_table ' ] = ' . ' . join ( [ self . dataset , _args [ ' table ' ] ] )
else :
self . mode [ ' destination_table ' ] = _args [ ' table ' ] . strip ( )
if ' schema ' in _args :
self . mode [ ' table_schema ' ] = _args [ ' schema ' ]
#
# Let us insure that the types are somewhat compatible ...
# _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
# _mode = copy.deepcopy(self.mode)
2024-04-02 21:58:58 +00:00
# _mode = self.mode
2024-03-28 20:34:39 +00:00
# _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
#
# Let us adjust the chunking here
2024-04-02 21:58:58 +00:00
if ' if_exists ' in _args :
self . mode [ ' if_exists ' ] = _args [ ' if_exists ' ]
2024-03-28 20:34:39 +00:00
self . _chunks = 10 if _df . shape [ 0 ] > MAX_CHUNK and self . _chunks == 1 else self . _chunks
_indexes = np . array_split ( np . arange ( _df . shape [ 0 ] ) , self . _chunks )
for i in _indexes :
2024-04-02 21:58:58 +00:00
# _df.iloc[i].to_gbq(**self.mode)
pd_gbq . to_gbq ( _df . iloc [ i ] , * * self . mode )
2024-03-28 20:34:39 +00:00
time . sleep ( 1 )
pass