#!/usr/bin/env python3 import json from transport import factory import numpy as np import time import os from multiprocessing import Process import pandas as pd from google.oauth2 import service_account import data.maker from data.params import SYS_ARGS # # The configuration array is now loaded and we will execute the pipe line as follows DATASET='combined20191004v2_deid' class Components : @staticmethod def get(args): """ This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes) The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing) :sql basic sql statement :condition optional condition and filters """ SQL = args['sql'] if 'condition' in args : condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')']) SQL = " ".join([SQL,'WHERE',condition]) SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 " if 'limit' in args : SQL = SQL + 'LIMIT ' + args['limit'] credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') return df # return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna() @staticmethod def split(X,MAX_ROWS=3,PART_SIZE=3): return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories) def train(self,**args): """ This function will perform training on the basis of a given pointer that reads data """ # # @TODO: we need to log something here about the parameters being passed pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) df = pointer() if df.shape[0] == 0 : print ("CAN NOT TRAIN EMPTY DATASET ") return # # Now we can parse the arguments and submit the entire thing to training # logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) log_folder = args['logs'] if 'logs' in args else 'logs' _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 _args['gpu'] = args['gpu'] if 'gpu' in args else 0 # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 if 'partition' not in args: lbound = 0 # bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) # bounds = Components.split(df,MAX_ROWS,PART_SIZE) columns = args['columns'] df = np.array_split(df[columns].values,PART_SIZE) qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'}) part_index = 0 # # let's start n processes to listen & train this mother ... # #-- hopefully they learn as daemons for _df in df: # _args['logs'] = os.sep.join([log_folder,str(part_index)]) _args['partition'] = str(part_index) _args['logger'] = {'args':{'dbname':'aou','doc':args['context']},'type':'mongo.MongoWriter'} # # We should post the the partitions to a queue server (at least the instructions on ): # - where to get the data # - and athe arguments to use (partition #,columns,gpu,epochs) # _df = pd.DataFrame(_df,columns=columns) # print (columns) info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE} p = {"args":_args,"data":_df.to_dict(orient="records"),"input":info} part_index += 1 qwriter.write(p) # # @TODO: # - Notify that information was just posted to the queue # In case we want slow-mode, we can store the partitions in mongodb and process (Yes|No)? # logger.write({"module":"train","action":"setup-partition","input":info}) pass else: partition = args['partition'] if 'partition' in args else '' log_folder = os.sep.join([log_folder,args['context'],partition]) _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' _args['data'] = df # # @log : # Logging information about the training process for this partition (or not) # info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} logger.write({"module":"train","action":"train","input":info}) data.maker.train(**_args) pass # @staticmethod def generate(self,args): """ This function will generate data and store it to a given, """ logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) log_folder = args['logs'] if 'logs' in args else 'logs' partition = args['partition'] if 'partition' in args else '' log_folder = os.sep.join([log_folder,args['context'],str(partition)]) _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 if args['num_gpu'] > 1 : _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)[0] else: _args['gpu'] = 0 _args['num_gpu'] = 1 _args['no_value']= args['no_value'] # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() # reader = args['reader'] # df = reader() df = args['reader']() if 'reader' in args else args['data'] # bounds = Components.split(df,MAX_ROWS,PART_SIZE) # if partition != '' : # columns = args['columns'] # df = np.array_split(df[columns].values,PART_SIZE) # df = pd.DataFrame(df[ int (partition) ],columns = columns) info = {"parition":int(partition),"rows":df.shape[0],"cols":df.shape[0],"part_size":PART_SIZE} logger.write({"module":"generate","action":"partition","input":info}) _args['data'] = df # _args['data'] = reader() #_args['data'] = _args['data'].astype(object) # _args['num_gpu'] = 1 _dc = data.maker.generate(**_args) # # We need to post the generate the data in order to : # 1. compare immediately # 2. synthetic copy # cols = _dc.columns.tolist() data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) for name in cols : _args['data'][name] = _dc[name] info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}} if partition != '' : info['partition'] = int(partition) logger.write(info) # filename = os.sep.join([log_folder,'output',name+'.csv']) # data_comp[[name]].to_csv(filename,index=False) # #-- Let us store all of this into bigquery prefix = args['notify']+'.'+_args['context'] table = '_'.join([prefix,partition,'io']).replace('__','_') folder = os.sep.join([args['logs'],args['context'],partition,'output']) if 'file' in args : _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) _pname = os.sep.join([folder,table])+'.csv' data_comp.to_csv( _pname,index=False) _args['data'].to_csv(_fname,index=False) else: credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') _pname = os.sep.join([folder,table+'.csv']) _fname = table.replace('_io','_full_io') data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000) data_comp.to_csv(_pname,index=False) INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' _args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000) info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } if partition : info ['partition'] = int(partition) logger.write({"module":"generate","action":"write","input":info} ) @staticmethod def callback(channel,method,header,stream): if stream.decode('utf8') in ['QUIT','EXIT','END'] : channel.close() channel.connection.close() info = json.loads(stream) logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']}) logger.write({'module':'process','action':'read-partition','input':info['input']}) df = pd.DataFrame(info['data']) args = info['args'] if args['num_gpu'] > 1 : args['gpu'] = int(info['input']['partition']) if info['input']['partition'] < 8 else np.random.choice(np.arange(8),1).astype(int)[0] else: args['gpu'] = 0 args['num_gpu'] = 1 # if int(args['num_gpu']) > 1 and args['gpu'] > 0: # args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus args['reader'] = lambda: df # # @TODO: Fix # There is an inconsistency in column/columns ... fix this shit! # channel.close() channel.connection.close() args['columns'] = args['column'] (Components()).train(**args) logger.write({"module":"process","action":"exit","input":info["input"]}) pass if __name__ == '__main__' : filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json' f = open (filename) PIPELINE = json.loads(f.read()) f.close() index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else 0 args = (PIPELINE[index]) args = dict(args,**SYS_ARGS) args['logs'] = args['logs'] if 'logs' in args else 'logs' if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid' # # @TODO: # Log what was initiated so we have context of this processing ... # if 'listen' not in SYS_ARGS : if 'file' in args : reader = lambda: pd.read_csv(args['file']) ; else: DATA = Components().get(args) reader = lambda: DATA args['reader'] = reader if 'generate' in SYS_ARGS : # # Let us see if we have partitions given the log folder content = os.listdir( os.sep.join([args['logs'],args['context']])) generator = Components() DATA = reader() if ''.join(content).isnumeric() : # # we have partitions we are working with make = lambda _args: (Components()).generate(_args) jobs = [] del args['reader'] columns = DATA.columns.tolist() DATA = np.array_split(DATA[args['columns']],len(content)) for id in ''.join(content) : args['partition'] = id args['data'] = pd.DataFrame(DATA[(int(id))],columns=args['columns']) if args['num_gpu'] > 0 : args['gpu'] = id else: args['gpu']=0 args['num_gpu']=1 job = Process(target=make,args=(args,)) job.name = 'generator # '+str(id) job.start() jobs.append(job) print (["Started ",len(jobs),"generator"+"s" if len(jobs)>1 else "" ]) while len(jobs)> 0 : jobs = [job for job in jobs if job.is_alive()] time.sleep(2) # generator.generate(args) else: generator.generate(args) # Components.generate(args) elif 'listen' in args : # # This will start a worker just in case to listen to a queue SYS_ARGS = dict(args) #-- things get lost in context if 'read' in SYS_ARGS : QUEUE_TYPE = 'queue.QueueReader' pointer = lambda qreader: qreader.read() else: QUEUE_TYPE = 'queue.QueueListener' pointer = lambda qlistener: qlistener.listen() N = int(SYS_ARGS['jobs']) if 'jobs' in SYS_ARGS else 1 qhandlers = [factory.instance(type=QUEUE_TYPE,args={'queue':'aou.io'}) for i in np.arange(N)] jobs = [] for qhandler in qhandlers : qhandler.callback = Components.callback job = Process(target=pointer,args=(qhandler,)) job.start() jobs.append(job) # # let us wait for the jobs print (["Started ",len(jobs)," trainers"]) while len(jobs) > 0 : jobs = [job for job in jobs if job.is_alive()] time.sleep(2) # pointer(qhandler) # qreader.read(1) pass else: trainer = Components() trainer.train(**args) # Components.train(**args) #for args in PIPELINE : #args['dataset'] = 'combined20190510' #process = Process(target=Components.train,args=(args,)) #process.name = args['context'] #process.start() # Components.train(args)