bug fix with program dying
This commit is contained in:
parent
e81e50c94f
commit
f9496ed806
20
pipeline.py
20
pipeline.py
|
@ -76,10 +76,11 @@ class Components :
|
||||||
|
|
||||||
partition = args['partition']
|
partition = args['partition']
|
||||||
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
|
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
|
||||||
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
||||||
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
||||||
if 'batch_size' in args :
|
if 'batch_size' in args :
|
||||||
_args['batch_size'] = int(args['batch_size'])
|
_args['batch_size'] = int(args['batch_size'])
|
||||||
|
|
||||||
#
|
#
|
||||||
# We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel
|
# We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel
|
||||||
#
|
#
|
||||||
|
@ -143,7 +144,7 @@ class Components :
|
||||||
# columns = args['columns']
|
# columns = args['columns']
|
||||||
# df = np.array_split(df[columns].values,PART_SIZE)
|
# df = np.array_split(df[columns].values,PART_SIZE)
|
||||||
# df = pd.DataFrame(df[ int (partition) ],columns = columns)
|
# df = pd.DataFrame(df[ int (partition) ],columns = columns)
|
||||||
info = {"parition":int(partition),"gpu":_args["gpu"],"rows":str(df.shape[0]),"cols":str(df.shape[1]),"part_size":int(PART_SIZE)}
|
info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"part_size":int(PART_SIZE)}
|
||||||
logger.write({"module":"generate","action":"partition","input":info})
|
logger.write({"module":"generate","action":"partition","input":info})
|
||||||
_args['partition'] = int(partition)
|
_args['partition'] = int(partition)
|
||||||
_args['continuous']= args['continuous'] if 'continuous' in args else []
|
_args['continuous']= args['continuous'] if 'continuous' in args else []
|
||||||
|
@ -163,7 +164,6 @@ class Components :
|
||||||
|
|
||||||
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
|
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
|
||||||
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
|
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
|
||||||
|
|
||||||
for name in cols :
|
for name in cols :
|
||||||
_args['data'][name] = _dc[name]
|
_args['data'][name] = _dc[name]
|
||||||
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
|
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
|
||||||
|
@ -193,10 +193,14 @@ class Components :
|
||||||
_fname = table.replace('_io','_full_io')
|
_fname = table.replace('_io','_full_io')
|
||||||
partial = '.'.join(['io',args['context']+'_partial_io'])
|
partial = '.'.join(['io',args['context']+'_partial_io'])
|
||||||
complete= '.'.join(['io',args['context']+'_full_io'])
|
complete= '.'.join(['io',args['context']+'_full_io'])
|
||||||
data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=50000)
|
|
||||||
data_comp.to_csv(_pname,index=False)
|
data_comp.to_csv(_pname,index=False)
|
||||||
INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append'
|
if 'dump' in args :
|
||||||
_args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=50000)
|
print (_args['data'].head())
|
||||||
|
else:
|
||||||
|
data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=50000)
|
||||||
|
|
||||||
|
INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append'
|
||||||
|
_args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=50000)
|
||||||
_id = 'dataset'
|
_id = 'dataset'
|
||||||
info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
|
info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
|
||||||
if partition :
|
if partition :
|
||||||
|
@ -247,6 +251,7 @@ if __name__ == '__main__' :
|
||||||
args = dict(args,**SYS_ARGS)
|
args = dict(args,**SYS_ARGS)
|
||||||
|
|
||||||
args['logs'] = args['logs'] if 'logs' in args else 'logs'
|
args['logs'] = args['logs'] if 'logs' in args else 'logs'
|
||||||
|
args['batch_size'] = 2000 if 'batch_size' not in args else int(args['batch_size'])
|
||||||
if 'dataset' not in args :
|
if 'dataset' not in args :
|
||||||
args['dataset'] = 'combined20191004v2_deid'
|
args['dataset'] = 'combined20191004v2_deid'
|
||||||
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
||||||
|
@ -350,10 +355,7 @@ if __name__ == '__main__' :
|
||||||
continue
|
continue
|
||||||
args['part_size'] = PART_SIZE
|
args['part_size'] = PART_SIZE
|
||||||
args['partition'] = index
|
args['partition'] = index
|
||||||
# _df = pd.DataFrame(DATA[index],columns=args['columns'])
|
|
||||||
args['data'] = DATA[index]
|
args['data'] = DATA[index]
|
||||||
# args['data'].to_csv('aou-'+str(index)+'csv',index=False)
|
|
||||||
# args['reader'] = lambda: _df
|
|
||||||
if int(args['num_gpu']) > 1 :
|
if int(args['num_gpu']) > 1 :
|
||||||
args['gpu'] = index
|
args['gpu'] = index
|
||||||
else:
|
else:
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -4,7 +4,7 @@ import sys
|
||||||
|
|
||||||
def read(fname):
|
def read(fname):
|
||||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
||||||
args = {"name":"data-maker","version":"1.2.2","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
|
args = {"name":"data-maker","version":"1.2.3","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
|
||||||
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
|
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
|
||||||
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
|
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
|
||||||
args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'
|
args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'
|
||||||
|
|
Loading…
Reference in New Issue