240 lines
8.5 KiB
Python
240 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
This file will perform basic tasks to finalize the GAN process by performing the following :
|
|
- basic stats & analytics
|
|
- rebuild io to another dataset
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
from multiprocessing import Process, Lock
|
|
from google.oauth2 import service_account
|
|
from google.cloud import bigquery as bq
|
|
import transport
|
|
from data.params import SYS_ARGS
|
|
import json
|
|
|
|
class Analytics :
|
|
"""
|
|
This class will compile basic analytics about a given dataset i.e compare original/synthetic
|
|
"""
|
|
@staticmethod
|
|
def distribution(**args):
|
|
context = args['context']
|
|
df = args['data']
|
|
#
|
|
#-- This data frame counts unique values for each feature (space)
|
|
df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts
|
|
#
|
|
#-- Get the distributions for common values
|
|
#
|
|
names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False]
|
|
ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0)
|
|
ddf[context] = ddf.index
|
|
|
|
pass
|
|
def distance(**args):
|
|
"""
|
|
This function will measure the distance between
|
|
"""
|
|
pass
|
|
class Utils :
|
|
@staticmethod
|
|
def log(**args):
|
|
logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"})
|
|
logger.write(args)
|
|
logger.close()
|
|
class get :
|
|
@staticmethod
|
|
def pipeline(table,path) :
|
|
# contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts']
|
|
config = json.loads((open(path)).read())
|
|
pipeline = config['pipeline']
|
|
# return [ item for item in pipeline if item['context'] in contexts]
|
|
pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table]
|
|
Utils.log(module=table,action='init',input={"pipeline":pipeline})
|
|
return pipeline
|
|
@staticmethod
|
|
def sql(**args) :
|
|
"""
|
|
This function is intended to build SQL query for the remainder of the table that was not synthesized
|
|
:config configuration entries
|
|
:from source of the table name
|
|
:dataset name of the source dataset
|
|
|
|
"""
|
|
SQL = ["SELECT * FROM :from "]
|
|
SQL_FILTER = []
|
|
NO_FILTERS_FOUND = True
|
|
# pipeline = Utils.get.config(**args)
|
|
pipeline = args['pipeline']
|
|
REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='}
|
|
for item in pipeline :
|
|
|
|
|
|
if 'filter' in item :
|
|
if NO_FILTERS_FOUND :
|
|
NO_FILTERS_FOUND = False
|
|
SQL += ['WHERE']
|
|
#
|
|
# Let us load the filter in the SQL Query
|
|
FILTER = item['filter']
|
|
QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()]
|
|
SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])]
|
|
src = ".".join([args['dataset'],args['from']])
|
|
SQL += [" AND ".join(SQL_FILTER)]
|
|
#
|
|
# let's pull the field schemas out of the table definition
|
|
#
|
|
Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) })
|
|
return " ".join(SQL).replace(":from",src)
|
|
|
|
|
|
def mk(**args) :
|
|
dataset = args['dataset']
|
|
client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key'])
|
|
#
|
|
# let us see if we have a dataset handy here
|
|
#
|
|
datasets = list(client.list_datasets())
|
|
found = [item for item in datasets if item.dataset_id == dataset]
|
|
|
|
if not found :
|
|
|
|
return client.create_dataset(dataset)
|
|
return found[0]
|
|
|
|
def move (args):
|
|
"""
|
|
This function will move a table from the synthetic dataset into a designated location
|
|
This is the simplest case for finalizing a synthetic data set
|
|
:private_key
|
|
"""
|
|
pipeline = Utils.get.pipeline(args['from'],args['config'])
|
|
_args = json.loads((open(args['config'])).read())
|
|
_args['pipeline'] = pipeline
|
|
# del _args['pipeline']
|
|
args = dict(args,**_args)
|
|
# del args['pipeline']
|
|
# private_key = args['private_key']
|
|
client = bq.Client.from_service_account_json(args['private_key'])
|
|
|
|
dataset = args['dataset']
|
|
if pipeline :
|
|
SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline]
|
|
SQL += [Utils.get.sql(**args)]
|
|
SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io'))
|
|
else:
|
|
#
|
|
# moving a table to a designated location
|
|
tablename = args['from']
|
|
if 'sql' not in args :
|
|
SQL = "SELECT * FROM :dataset.:table"
|
|
else:
|
|
SQL = args['sql']
|
|
SQL = SQL.replace(":dataset",dataset).replace(":table",tablename)
|
|
Utils.log(module=args['from'],action='sql',input={'sql':SQL})
|
|
#
|
|
# At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table
|
|
#
|
|
|
|
|
|
|
|
odataset = mk(dataset=dataset+'_io',client=client)
|
|
# SQL = "SELECT * FROM io.:context_full_io".replace(':context',context)
|
|
config = bq.QueryJobConfig()
|
|
config.destination = client.dataset(odataset.dataset_id).table(args['from'])
|
|
config.use_query_cache = True
|
|
config.allow_large_results = True
|
|
config.priority = 'INTERACTIVE'
|
|
#
|
|
#
|
|
|
|
schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
|
|
fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema]
|
|
SQL = SQL.replace("*"," , ".join(fields))
|
|
# print (SQL)
|
|
out = client.query(SQL,location='US',job_config=config)
|
|
Utils.log(module=args['from'],action='move',input={'job':out.job_id})
|
|
return (out.job_id)
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from google.oauth2 import service_account
|
|
import json
|
|
|
|
# path = '../curation-prod.json'
|
|
# credentials = service_account.Credentials.from_service_account_file(path)
|
|
# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard')
|
|
filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config']
|
|
f = open(filename)
|
|
config = json.loads(f.read())
|
|
args = config['pipeline']
|
|
f.close()
|
|
|
|
|
|
if __name__ == '__main__' :
|
|
"""
|
|
Usage :
|
|
finalize --<move|stats> --contexts <c1,c2,...c3> --from <table>
|
|
"""
|
|
|
|
if 'move' in SYS_ARGS :
|
|
|
|
if 'init' in SYS_ARGS :
|
|
dep = config['dep'] if 'dep' in config else {}
|
|
info = []
|
|
|
|
if 'queries' in dep :
|
|
info += dep['queries']
|
|
print ('________')
|
|
if 'tables' in dep :
|
|
info += dep['tables']
|
|
args = {}
|
|
jobs = []
|
|
for item in info :
|
|
args = {}
|
|
if type(item) == str :
|
|
args['from'] = item
|
|
name = item
|
|
else:
|
|
args = item
|
|
name = item['from']
|
|
args['config'] = SYS_ARGS['config']
|
|
# args['pipeline'] = []
|
|
job = Process(target=move,args=(args,))
|
|
job.name = name
|
|
jobs.append(job)
|
|
job.start()
|
|
|
|
|
|
# while len(jobs) > 0 :
|
|
# jobs = [job for job in jobs if job.is_alive()]
|
|
# time.sleep(1)
|
|
|
|
|
|
else:
|
|
move(SYS_ARGS)
|
|
# # table = SYS_ARGS['from']
|
|
# # args = dict(config,**{"private_key":"../curation-prod.json"})
|
|
# args = dict(args,**SYS_ARGS)
|
|
# contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']]
|
|
# log = []
|
|
# if contexts :
|
|
# args['contexts'] = contexts
|
|
# log = move(**args)
|
|
|
|
# else:
|
|
# tables = args['from'].split(',')
|
|
# for name in tables :
|
|
# name = name.strip()
|
|
# args['from'] = name
|
|
# log += [move(**args)]
|
|
# print ("\n".join(log))
|
|
|
|
|
|
|
|
else:
|
|
print ("NOT YET READY !") |