bigquery: job submission and status of said jobs
This commit is contained in:
parent
98964a623d
commit
cedab73e19
|
@ -25,6 +25,8 @@ else:
|
||||||
import json
|
import json
|
||||||
from google.oauth2 import service_account
|
from google.oauth2 import service_account
|
||||||
from google.cloud import bigquery as bq
|
from google.cloud import bigquery as bq
|
||||||
|
# import constants.bq_utils as bq_consts
|
||||||
|
|
||||||
from multiprocessing import Lock, RLock
|
from multiprocessing import Lock, RLock
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -462,7 +464,7 @@ class BQWriter(BigQuery,Writer):
|
||||||
self.table = _args['table'] if 'table' in _args else None
|
self.table = _args['table'] if 'table' in _args else None
|
||||||
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
|
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
|
||||||
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
|
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
|
||||||
|
self._location = 'US' if 'location' not in _args else _args['location']
|
||||||
def write(self,_info,**_args) :
|
def write(self,_info,**_args) :
|
||||||
try:
|
try:
|
||||||
if self.parallel or 'lock' in _args :
|
if self.parallel or 'lock' in _args :
|
||||||
|
@ -472,6 +474,21 @@ class BQWriter(BigQuery,Writer):
|
||||||
finally:
|
finally:
|
||||||
if self.parallel:
|
if self.parallel:
|
||||||
BQWriter.lock.release()
|
BQWriter.lock.release()
|
||||||
|
def submit(self,_sql):
|
||||||
|
"""
|
||||||
|
Write the output of a massive query to a given table, biquery will handle this as a job
|
||||||
|
This function will return the job identifier
|
||||||
|
"""
|
||||||
|
_config = bq.QueryJobConfig()
|
||||||
|
_config.destination = self.client.dataset(self.dataset).table(self.table)
|
||||||
|
_config.allow_large_results = True
|
||||||
|
# _config.write_disposition = bq.bq_consts.WRITE_APPEND
|
||||||
|
_config.dry_run = False
|
||||||
|
# _config.priority = 'BATCH'
|
||||||
|
_resp = self.client.query(_sql,location=self._location,job_config=_config)
|
||||||
|
return _resp.job_id
|
||||||
|
def status (self,_id):
|
||||||
|
return self.client.get_job(_id,location=self._location)
|
||||||
def _write(self,_info,**_args) :
|
def _write(self,_info,**_args) :
|
||||||
_df = None
|
_df = None
|
||||||
if type(_info) in [list,pd.DataFrame] :
|
if type(_info) in [list,pd.DataFrame] :
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
__author__ = 'The Phi Technology'
|
__author__ = 'The Phi Technology'
|
||||||
__version__= '1.9.3'
|
__version__= '1.9.4'
|
||||||
|
|
Loading…
Reference in New Issue