From 687ffec215e79b4809dcd1fb89472dcc40fbccbc Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 15 Mar 2021 12:38:54 -0500 Subject: [PATCH] bq support --- setup.py | 4 ++-- transport/sql.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 6355c6f..5f09279 100644 --- a/setup.py +++ b/setup.py @@ -8,12 +8,12 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.3.6", + "version":"1.3.8", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','google-cloud-bigquery','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" if sys.version_info[0] == 2 : diff --git a/transport/sql.py b/transport/sql.py index 6ffad70..5e817ca 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -17,8 +17,8 @@ if sys.version_info[0] > 2 : else: from common import Reader,Writer import json -# from threading import Lock - +from google.oauth2 import service_account +from multiprocessing import Lock import pandas as pd import numpy as np class SQLRW : @@ -175,7 +175,64 @@ class SQLWriter(SQLRW,Writer): self.conn.close() finally: pass +class BigQuery: + def __init__(self,**_args): + path = _args['service_key'] + self.credentials = service_account.Credentials.from_service_account_file(path) + +class BQReader(BigQuery,Reader) : + def __init__(self,**_args): + + super().__init__(**_args) + pass + def read(self,**_args): + SQL = None + if 'sql' in _args : + SQL = _args['sql'] + elif 'table' in _args: + + table = "".join(["`",_args['table'],"`"]) + SQL = "SELECT * FROM :table ".replace(":table",table) + if SQL and 'limit' in _args: + SQL += " LIMIT "+str(_args['limit']) + + return pd.read_gbq(SQL,credentials=self.credentials,dialect='standard') if SQL else None +class BQWriter(BigQuery,Writer): + Lock = Lock() + def __init__(self,**_args): + super().__init__(**_args) + + self.parallel = False if 'lock' not in _args else _args['lock'] + self.table = _args['table'] if 'table' in _args else None + self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials} + + def write(self,_info,**_args) : + try: + if self.parallel : + self.lock.acquire() + self._write(_info,**_args) + finally: + if self.parallel: + self.lock.release() + def _write(self,_info,**_args) : + _df = None + if type(_info) in [list,pd.DataFrame] : + if type(_info) == list : + _df = pd.DataFrame(_info) + elif type(_info) == pd.DataFrame : + _df = _info + + self.mode['destination_table'] = _args['table'].strip() + _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) + + pass +# import transport +# reader = transport.factory.instance(type="sql.BQReader",args={"service_key":"/home/steve/dev/google-cloud-sdk/accounts/curation-prod.json"}) +# _df = reader.read(sql="select * from `2019q1r4_combined.person` limit 10") +# writer = transport.factory.instance(type="sql.BQWriter",args={"service_key":"/home/steve/dev/google-cloud-sdk/accounts/curation-prod.json"}) +# writer.write(_df,table='2019q1r4_combined.foo') +# write.write() # _args = {"db":"sample","table":"foo","provider":"postgresql"} # # # w = SQLWriter(**_args) # # # w.write({"name":"kalara.io","email":"ceo@kalara.io","age":10})