From f9496ed8061cf0f1c452f75ffb3a421af119446d Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sun, 15 Mar 2020 10:25:19 -0500 Subject: [PATCH] bug fix with program dying --- pipeline.py | 20 +++++++++++--------- setup.py | 2 +- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pipeline.py b/pipeline.py index 9eee8c5..bfdd72e 100644 --- a/pipeline.py +++ b/pipeline.py @@ -76,10 +76,11 @@ class Components : partition = args['partition'] log_folder = os.sep.join([log_folder,args['context'],str(partition)]) - _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) if 'batch_size' in args : _args['batch_size'] = int(args['batch_size']) + # # We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel # @@ -143,7 +144,7 @@ class Components : # columns = args['columns'] # df = np.array_split(df[columns].values,PART_SIZE) # df = pd.DataFrame(df[ int (partition) ],columns = columns) - info = {"parition":int(partition),"gpu":_args["gpu"],"rows":str(df.shape[0]),"cols":str(df.shape[1]),"part_size":int(PART_SIZE)} + info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"part_size":int(PART_SIZE)} logger.write({"module":"generate","action":"partition","input":info}) _args['partition'] = int(partition) _args['continuous']= args['continuous'] if 'continuous' in args else [] @@ -163,7 +164,6 @@ class Components : data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) - for name in cols : _args['data'][name] = _dc[name] info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}} @@ -193,10 +193,14 @@ class Components : _fname = table.replace('_io','_full_io') partial = '.'.join(['io',args['context']+'_partial_io']) complete= '.'.join(['io',args['context']+'_full_io']) - data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=50000) data_comp.to_csv(_pname,index=False) - INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' - _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=50000) + if 'dump' in args : + print (_args['data'].head()) + else: + data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=50000) + + INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' + _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=50000) _id = 'dataset' info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } if partition : @@ -247,6 +251,7 @@ if __name__ == '__main__' : args = dict(args,**SYS_ARGS) args['logs'] = args['logs'] if 'logs' in args else 'logs' + args['batch_size'] = 2000 if 'batch_size' not in args else int(args['batch_size']) if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid' PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 @@ -350,10 +355,7 @@ if __name__ == '__main__' : continue args['part_size'] = PART_SIZE args['partition'] = index - # _df = pd.DataFrame(DATA[index],columns=args['columns']) args['data'] = DATA[index] - # args['data'].to_csv('aou-'+str(index)+'csv',index=False) - # args['reader'] = lambda: _df if int(args['num_gpu']) > 1 : args['gpu'] = index else: diff --git a/setup.py b/setup.py index 78c52ea..4a4e87b 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() -args = {"name":"data-maker","version":"1.2.2","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", +args = {"name":"data-maker","version":"1.2.3","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo'] args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'