#!/usr/bin/env python3 import json from transport import factory import numpy as np import time import os from multiprocessing import Process, Lock import pandas as pd from google.oauth2 import service_account import data.maker from data.params import SYS_ARGS # # The configuration array is now loaded and we will execute the pipe line as follows DATASET='combined20191004v2_deid' class Components : lock = Lock() class KEYS : PIPELINE_KEY = 'pipeline' SQL_FILTER = 'filter' @staticmethod def get_filter (**args): if args['qualifier'] == 'IN' : return ' '.join([args['field'],args['qualifier'],'(',args['value'],')']) else: return ' '.join([args['field'],args['qualifier'],args['value']]) @staticmethod def get_logger(**args) : return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) @staticmethod def get(args): """ This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes) The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing) :sql basic sql statement :condition optional condition and filters """ SQL = args['sql'] if Components.KEYS.SQL_FILTER in args : FILTER_KEY = Components.KEYS.SQL_FILTER SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]] # condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')']) condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER]) SQL = " ".join([SQL,'WHERE',condition]) SQL = SQL.replace(':dataset',args['dataset']) #+ " LI " if 'limit' in args : SQL = SQL + ' LIMIT ' + args['limit'] # # let's log the sql query that has been performed here logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}}) credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') return df # return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna() @staticmethod def split(X,MAX_ROWS=3,PART_SIZE=3): return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories) def train(self,**args): """ This function will perform training on the basis of a given pointer that reads data """ # # @TODO: we need to log something here about the parameters being passed # pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) df = args['data'] # # Now we can parse the arguments and submit the entire thing to training # logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) log_folder = args['logs'] if 'logs' in args else 'logs' PART_SIZE = int(args['part_size']) partition = args['partition'] log_folder = os.sep.join([log_folder,args['context'],str(partition)]) _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) if 'batch_size' in args : _args['batch_size'] = int(args['batch_size']) _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 # # We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel # if int(args['num_gpu']) > 1 : _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int) else: _args['gpu'] = 0 _args['num_gpu'] = 1 os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) _args['partition'] = int(partition) _args['continuous']= args['continuous'] if 'continuous' in args else [] _args['store'] = {'type':'mongo.MongoWriter','args':{'dbname':'aou','doc':args['context']}} _args['data'] = args['data'] # print (['partition ',partition,df.value_source_concept_id.unique()]) # # @log : # Logging information about the training process for this partition (or not) # info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} logger.write({"module":"train","action":"train","input":info}) data.maker.train(**_args) if 'autopilot' in ( list(args.keys())) : print (['drone mode enabled ....']) self.generate(args) pass # @staticmethod def generate(self,args): """ This function will generate data and store it to a given, """ logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) log_folder = args['logs'] if 'logs' in args else 'logs' partition = args['partition'] if 'partition' in args else '' log_folder = os.sep.join([log_folder,args['context'],str(partition)]) _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 if 'batch_size' in args : _args['batch_size'] = int(args['batch_size']) if int(args['num_gpu']) > 1 : _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int) else: _args['gpu'] = 0 _args['num_gpu'] = 1 os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) _args['no_value']= args['no_value'] _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() # reader = args['reader'] # df = reader() df = args['reader']() if 'reader' in args else args['data'] # if 'slice' in args and 'max_rows' in args['slice']: # max_rows = args['slice']['max_rows'] # if df.shape[0] > max_rows : # print (".. slicing ") # i = np.random.choice(df.shape[0],max_rows,replace=False) # df = df.iloc[i] # bounds = Components.split(df,MAX_ROWS,PART_SIZE) # if partition != '' : # columns = args['columns'] # df = np.array_split(df[columns].values,PART_SIZE) # df = pd.DataFrame(df[ int (partition) ],columns = columns) # max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000 # N = np.divide(df.shape[0],max_rows).astype(int) + 1 info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)} logger.write({"module":"generate","action":"partition","input":info}) _args['partition'] = int(partition) _args['continuous']= args['continuous'] if 'continuous' in args else [] # # How many rows sub-partition must we divide this into ? # let us fix the data types here every _id field will be an np.int64... # for name in df.columns.tolist(): if name.endswith('_id') : if df[name].isnull().sum() > 0 : df[name].fillna(np.nan_to_num(np.nan),inplace=True) df[name] = df[name].astype(int) _dc = pd.DataFrame() # for mdf in df : _args['data'] = df _dc = _dc.append(data.maker.generate(**_args)) # # We need to post the generate the data in order to : # 1. compare immediately # 2. synthetic copy # cols = _dc.columns.tolist() data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) # # performing basic analytics on the synthetic data generated (easy to quickly asses) # info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} x = {} # # @TODO: Send data over to a process for analytics base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) cols = _dc.columns.tolist() for name in cols : _args['data'][name] = _dc[name] # #-- Let us store all of this into bigquery prefix = args['notify']+'.'+_args['context'] partition = str(partition) table = '_'.join([prefix,partition,'io']).replace('__','_') folder = os.sep.join([args['logs'],args['context'],partition,'output']) if 'file' in args : _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) _pname = os.sep.join([folder,table])+'.csv' data_comp.to_csv( _pname,index=False) _args['data'].to_csv(_fname,index=False) _id = 'path' else: credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') _pname = os.sep.join([folder,table+'.csv']) _fname = table.replace('_io','_full_io') partial = '.'.join(['io',args['context']+'_partial_io']) complete= '.'.join(['io',args['context']+'_full_io']) data_comp.to_csv(_pname,index=False) if 'dump' in args : print (_args['data'].head()) else: Components.lock.acquire() data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) Components.lock.release() _id = 'dataset' info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } if partition : info ['partition'] = int(partition) logger.write({"module":"generate","action":"write","input":info} ) if __name__ == '__main__' : filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json' f = open (filename) _config = json.loads(f.read()) f.close() PIPELINE = _config['pipeline'] index = SYS_ARGS['index'] if index.isnumeric() : index = int(SYS_ARGS['index']) else: # # The index provided is a key to a pipeline entry mainly the context # N = len(PIPELINE) f = [i for i in range(0,N) if PIPELINE[i]['context'] == index] index = f[0] if f else 0 # print ("..::: ",PIPELINE[index]['context']) args = (PIPELINE[index]) for key in _config : if key == 'pipeline' or key in args: # # skip in case of pipeline or if key exists in the selected pipeline (provided by index) # continue args[key] = _config[key] args = dict(args,**SYS_ARGS) if 'matrix_size' in args : args['matrix_size'] = int(args['matrix_size']) if 'batch_size' not in args : args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size']) if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid' PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # # @TODO: # Log what was initiated so we have context of this processing ... # # if 'listen' not in SYS_ARGS : if 'file' in args : DATA = pd.read_csv(args['file']) ; else: DATA = Components().get(args) COLUMNS = DATA.columns DATA = np.array_split(DATA,PART_SIZE) if 'generate' in SYS_ARGS : # # Let us see if we have partitions given the log folder content = os.listdir( os.sep.join([args['logs'],args['context']])) generator = Components() if ''.join(content).isnumeric() : # # we have partitions we are working with jobs = [] # columns = DATA.columns.tolist() # DATA = np.array_split(DATA,PART_SIZE) for index in range(0,PART_SIZE) : if 'focus' in args and int(args['focus']) != index : # # This handles failures/recoveries for whatever reason # If we are only interested in generating data for a given partition continue # index = id.index(id) args['partition'] = index args['data'] = DATA[index] if int(args['num_gpu']) > 1 : args['gpu'] = index else: args['gpu']=0 make = lambda _args: (Components()).generate(_args) job = Process(target=make,args=(args,)) job.name = 'generator # '+str(index) job.start() jobs.append(job) # if len(jobs) == 1 : # job.join() print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) while len(jobs)> 0 : jobs = [job for job in jobs if job.is_alive()] time.sleep(2) # generator.generate(args) else: generator.generate(args) # Components.generate(args) else: # DATA = np.array_split(DATA,PART_SIZE) jobs = [] for index in range(0,PART_SIZE) : if 'focus' in args and int(args['focus']) != index : continue args['part_size'] = PART_SIZE args['partition'] = index args['data'] = DATA[index] if int(args['num_gpu']) > 1 : args['gpu'] = index else: args['gpu']=0 make = lambda _args: (Components()).train(**_args) job = Process(target=make,args=( dict(args),)) job.name = 'Trainer # ' + str(index) job.start() jobs.append(job) # args['gpu'] print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ]) while len(jobs)> 0 : jobs = [job for job in jobs if job.is_alive()] time.sleep(2) # trainer = Components() # trainer.train(**args) # Components.train(**args) #for args in PIPELINE : #args['dataset'] = 'combined20190510' #process = Process(target=Components.train,args=(args,)) #process.name = args['context'] #process.start() # Components.train(args)