377 lines
13 KiB
Python
377 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
from transport import factory
|
|
import numpy as np
|
|
import time
|
|
import os
|
|
from multiprocessing import Process, Lock
|
|
import pandas as pd
|
|
from google.oauth2 import service_account
|
|
import data.maker
|
|
|
|
from data.params import SYS_ARGS
|
|
|
|
#
|
|
# The configuration array is now loaded and we will execute the pipe line as follows
|
|
DATASET='combined20191004v2_deid'
|
|
|
|
class Components :
|
|
lock = Lock()
|
|
class KEYS :
|
|
PIPELINE_KEY = 'pipeline'
|
|
SQL_FILTER = 'filter'
|
|
@staticmethod
|
|
def get_filter (**args):
|
|
if args['qualifier'] == 'IN' :
|
|
return ' '.join([args['field'],args['qualifier'],'(',args['value'],')'])
|
|
else:
|
|
return ' '.join([args['field'],args['qualifier'],args['value']])
|
|
@staticmethod
|
|
def get_logger(**args) :
|
|
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
@staticmethod
|
|
def get(args):
|
|
"""
|
|
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
|
|
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
|
|
:sql basic sql statement
|
|
:condition optional condition and filters
|
|
"""
|
|
SQL = args['sql']
|
|
if Components.KEYS.SQL_FILTER in args :
|
|
FILTER_KEY = Components.KEYS.SQL_FILTER
|
|
SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]]
|
|
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
|
|
|
|
condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER])
|
|
SQL = " ".join([SQL,'WHERE',condition])
|
|
|
|
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
|
|
|
|
if 'limit' in args :
|
|
SQL = SQL + ' LIMIT ' + args['limit']
|
|
#
|
|
# let's log the sql query that has been performed here
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}})
|
|
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
|
|
return df
|
|
|
|
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
|
|
@staticmethod
|
|
def split(X,MAX_ROWS=3,PART_SIZE=3):
|
|
|
|
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
|
|
|
|
def train(self,**args):
|
|
"""
|
|
This function will perform training on the basis of a given pointer that reads data
|
|
|
|
"""
|
|
#
|
|
# @TODO: we need to log something here about the parameters being passed
|
|
# pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args)
|
|
df = args['data']
|
|
|
|
|
|
# if df.shape[0] == 0 :
|
|
# print ("CAN NOT TRAIN EMPTY DATASET ")
|
|
# return
|
|
#
|
|
# Now we can parse the arguments and submit the entire thing to training
|
|
#
|
|
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
log_folder = args['logs'] if 'logs' in args else 'logs'
|
|
PART_SIZE = int(args['part_size'])
|
|
|
|
partition = args['partition']
|
|
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
|
|
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
|
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
|
if 'batch_size' in args :
|
|
_args['batch_size'] = int(args['batch_size'])
|
|
|
|
#
|
|
# We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel
|
|
#
|
|
if int(args['num_gpu']) > 1 :
|
|
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)
|
|
else:
|
|
_args['gpu'] = 0
|
|
_args['num_gpu'] = 1
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
|
|
_args['partition'] = int(partition)
|
|
_args['continuous']= args['continuous'] if 'continuous' in args else []
|
|
_args['store'] = {'type':'mongo.MongoWriter','args':{'dbname':'aou','doc':args['context']}}
|
|
_args['data'] = args['data']
|
|
|
|
# print (['partition ',partition,df.value_source_concept_id.unique()])
|
|
#
|
|
# @log :
|
|
# Logging information about the training process for this partition (or not)
|
|
#
|
|
|
|
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']}
|
|
|
|
logger.write({"module":"train","action":"train","input":info})
|
|
data.maker.train(**_args)
|
|
|
|
pass
|
|
|
|
# @staticmethod
|
|
def generate(self,args):
|
|
"""
|
|
This function will generate data and store it to a given,
|
|
"""
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
log_folder = args['logs'] if 'logs' in args else 'logs'
|
|
partition = args['partition'] if 'partition' in args else ''
|
|
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
|
|
|
|
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
|
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
|
# _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
|
if 'batch_size' in args :
|
|
_args['batch_size'] = int(args['batch_size'])
|
|
|
|
if int(args['num_gpu']) > 1 :
|
|
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)
|
|
else:
|
|
_args['gpu'] = 0
|
|
_args['num_gpu'] = 1
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
|
|
_args['no_value']= args['no_value']
|
|
|
|
# MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
|
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
|
|
|
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
|
|
# reader = args['reader']
|
|
# df = reader()
|
|
df = args['reader']() if 'reader' in args else args['data']
|
|
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
|
# if partition != '' :
|
|
# columns = args['columns']
|
|
# df = np.array_split(df[columns].values,PART_SIZE)
|
|
# df = pd.DataFrame(df[ int (partition) ],columns = columns)
|
|
# max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000
|
|
# N = np.divide(df.shape[0],max_rows).astype(int) + 1
|
|
info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"part_size":int(PART_SIZE),"partition-info":{"count":int(N),"max_rows":max_rows}}
|
|
logger.write({"module":"generate","action":"partition","input":info})
|
|
_args['partition'] = int(partition)
|
|
_args['continuous']= args['continuous'] if 'continuous' in args else []
|
|
#
|
|
# How many rows sub-partition must we divide this into ?
|
|
# -- Let us tray assessing
|
|
|
|
|
|
df = np.array_split(df,N)
|
|
_dc = pd.DataFrame()
|
|
# for mdf in df :
|
|
_args['data'] = df
|
|
_dc = _dc.append(data.maker.generate(**_args))
|
|
#
|
|
# We need to post the generate the data in order to :
|
|
# 1. compare immediately
|
|
# 2. synthetic copy
|
|
#
|
|
|
|
cols = _dc.columns.tolist()
|
|
|
|
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
|
|
#
|
|
# performing basic analytics on the synthetic data generated (easy to quickly asses)
|
|
#
|
|
info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}}
|
|
x = {}
|
|
#
|
|
# @TODO: Send data over to a process for analytics
|
|
|
|
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
|
|
cols = _dc.columns.tolist()
|
|
for name in cols :
|
|
_args['data'][name] = _dc[name]
|
|
|
|
#
|
|
#-- Let us store all of this into bigquery
|
|
prefix = args['notify']+'.'+_args['context']
|
|
partition = str(partition)
|
|
table = '_'.join([prefix,partition,'io']).replace('__','_')
|
|
folder = os.sep.join([args['logs'],args['context'],partition,'output'])
|
|
if 'file' in args :
|
|
|
|
_fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
|
|
_pname = os.sep.join([folder,table])+'.csv'
|
|
data_comp.to_csv( _pname,index=False)
|
|
_args['data'].to_csv(_fname,index=False)
|
|
|
|
_id = 'path'
|
|
else:
|
|
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
_pname = os.sep.join([folder,table+'.csv'])
|
|
_fname = table.replace('_io','_full_io')
|
|
partial = '.'.join(['io',args['context']+'_partial_io'])
|
|
complete= '.'.join(['io',args['context']+'_full_io'])
|
|
data_comp.to_csv(_pname,index=False)
|
|
if 'dump' in args :
|
|
print (_args['data'].head())
|
|
else:
|
|
Components.lock.acquire()
|
|
data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
|
|
|
|
INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append'
|
|
_args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000)
|
|
Components.lock.release()
|
|
_id = 'dataset'
|
|
info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
|
|
if partition :
|
|
info ['partition'] = int(partition)
|
|
logger.write({"module":"generate","action":"write","input":info} )
|
|
|
|
|
|
|
|
if __name__ == '__main__' :
|
|
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
|
|
f = open (filename)
|
|
_config = json.loads(f.read())
|
|
f.close()
|
|
PIPELINE = _config['pipeline']
|
|
index = SYS_ARGS['index']
|
|
if index.isnumeric() :
|
|
index = int(SYS_ARGS['index'])
|
|
else:
|
|
#
|
|
# The index provided is a key to a pipeline entry mainly the context
|
|
#
|
|
N = len(PIPELINE)
|
|
f = [i for i in range(0,N) if PIPELINE[i]['context'] == index]
|
|
index = f[0] if f else 0
|
|
#
|
|
|
|
print ("..::: ",PIPELINE[index]['context'])
|
|
args = (PIPELINE[index])
|
|
for key in _config :
|
|
if key == 'pipeline' or key in args:
|
|
#
|
|
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
|
|
#
|
|
continue
|
|
args[key] = _config[key]
|
|
|
|
args = dict(args,**SYS_ARGS)
|
|
if 'batch_size' not in args :
|
|
args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
|
|
if 'dataset' not in args :
|
|
args['dataset'] = 'combined20191004v2_deid'
|
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
|
#
|
|
# @TODO:
|
|
# Log what was initiated so we have context of this processing ...
|
|
#
|
|
# if 'listen' not in SYS_ARGS :
|
|
if 'file' in args :
|
|
DATA = pd.read_csv(args['file']) ;
|
|
else:
|
|
DATA = Components().get(args)
|
|
COLUMNS = DATA.columns
|
|
DATA = np.array_split(DATA,PART_SIZE)
|
|
|
|
if 'generate' in SYS_ARGS :
|
|
#
|
|
# Let us see if we have partitions given the log folder
|
|
|
|
content = os.listdir( os.sep.join([args['logs'],args['context']]))
|
|
generator = Components()
|
|
|
|
if ''.join(content).isnumeric() :
|
|
#
|
|
# we have partitions we are working with
|
|
|
|
jobs = []
|
|
|
|
# columns = DATA.columns.tolist()
|
|
|
|
# DATA = np.array_split(DATA,PART_SIZE)
|
|
|
|
for index in range(0,PART_SIZE) :
|
|
if 'focus' in args and int(args['focus']) != index :
|
|
#
|
|
# This handles failures/recoveries for whatever reason
|
|
# If we are only interested in generating data for a given partition
|
|
continue
|
|
# index = id.index(id)
|
|
|
|
args['partition'] = index
|
|
args['data'] = DATA[index]
|
|
if int(args['num_gpu']) > 1 :
|
|
args['gpu'] = index
|
|
else:
|
|
args['gpu']=0
|
|
|
|
make = lambda _args: (Components()).generate(_args)
|
|
job = Process(target=make,args=(args,))
|
|
job.name = 'generator # '+str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
# if len(jobs) == 1 :
|
|
# job.join()
|
|
|
|
print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ])
|
|
while len(jobs)> 0 :
|
|
jobs = [job for job in jobs if job.is_alive()]
|
|
time.sleep(2)
|
|
|
|
# generator.generate(args)
|
|
else:
|
|
generator.generate(args)
|
|
# Components.generate(args)
|
|
elif 'finalize' in args :
|
|
#
|
|
# This will finalize a given set of synthetic operations into a table
|
|
#
|
|
idataset = args['input'] if 'input' in args else 'io' #-- input dataset
|
|
odataset = args['output'] #-- output dataset
|
|
labels = [name.strip() for name in args['labels'].split(',') ]
|
|
|
|
else:
|
|
|
|
# DATA = np.array_split(DATA,PART_SIZE)
|
|
|
|
jobs = []
|
|
for index in range(0,PART_SIZE) :
|
|
if 'focus' in args and int(args['focus']) != index :
|
|
continue
|
|
args['part_size'] = PART_SIZE
|
|
args['partition'] = index
|
|
args['data'] = DATA[index]
|
|
if int(args['num_gpu']) > 1 :
|
|
args['gpu'] = index
|
|
else:
|
|
args['gpu']=0
|
|
|
|
make = lambda _args: (Components()).train(**_args)
|
|
job = Process(target=make,args=( dict(args),))
|
|
job.name = 'Trainer # ' + str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
# args['gpu']
|
|
print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ])
|
|
while len(jobs)> 0 :
|
|
jobs = [job for job in jobs if job.is_alive()]
|
|
time.sleep(2)
|
|
|
|
# trainer = Components()
|
|
# trainer.train(**args)
|
|
|
|
|
|
# Components.train(**args)
|
|
#for args in PIPELINE :
|
|
#args['dataset'] = 'combined20190510'
|
|
#process = Process(target=Components.train,args=(args,))
|
|
#process.name = args['context']
|
|
#process.start()
|
|
# Components.train(args)
|