bug fix: mandatory partitioning while training
This commit is contained in:
parent
27473989f9
commit
c75bb54d2b
81
pipeline.py
81
pipeline.py
|
@ -61,27 +61,23 @@ class Components :
|
||||||
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
||||||
log_folder = args['logs'] if 'logs' in args else 'logs'
|
log_folder = args['logs'] if 'logs' in args else 'logs'
|
||||||
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
||||||
|
|
||||||
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
||||||
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
||||||
_args['gpu'] = args['gpu'] if 'gpu' in args else 0
|
_args['gpu'] = args['gpu'] if 'gpu' in args else 0
|
||||||
|
|
||||||
MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
|
# MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
|
||||||
PART_SIZE = args['part_size'] if 'part_size' in args else 0
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
||||||
|
|
||||||
if df.shape[0] > MAX_ROWS and 'partition' not in args:
|
if 'partition' not in args:
|
||||||
lbound = 0
|
lbound = 0
|
||||||
bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
|
# bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
|
||||||
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
||||||
|
columns = args['columns']
|
||||||
|
df = np.array_split(df[columns].values,PART_SIZE)
|
||||||
qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'})
|
qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'})
|
||||||
|
part_index = 0
|
||||||
for b in bounds :
|
for _df in df:
|
||||||
part_index = bounds.index(b)
|
|
||||||
ubound = int(b.right)
|
|
||||||
|
|
||||||
|
|
||||||
_data = df.iloc[lbound:ubound][args['columns']]
|
|
||||||
lbound = ubound
|
|
||||||
|
|
||||||
# _args['logs'] = os.sep.join([log_folder,str(part_index)])
|
# _args['logs'] = os.sep.join([log_folder,str(part_index)])
|
||||||
_args['partition'] = str(part_index)
|
_args['partition'] = str(part_index)
|
||||||
|
@ -92,14 +88,20 @@ class Components :
|
||||||
# - where to get the data
|
# - where to get the data
|
||||||
# - and athe arguments to use (partition #,columns,gpu,epochs)
|
# - and athe arguments to use (partition #,columns,gpu,epochs)
|
||||||
#
|
#
|
||||||
info = {"rows":_data.shape[0],"cols":_data.shape[1], "partition":part_index,"logs":_args['logs']}
|
|
||||||
p = {"args":_args,"data":_data.to_dict(orient="records"),"info":info}
|
_df = pd.DataFrame(_df,columns=columns)
|
||||||
|
# print (columns)
|
||||||
|
|
||||||
|
info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":2,"part_size":PART_SIZE}
|
||||||
|
p = {"args":_args,"data":_df.to_dict(orient="records"),"info":info}
|
||||||
|
part_index += 1
|
||||||
qwriter.write(p)
|
qwriter.write(p)
|
||||||
#
|
#
|
||||||
# @TODO:
|
# @TODO:
|
||||||
# - Notify that information was just posted to the queue
|
# - Notify that information was just posted to the queue
|
||||||
info['max_rows'] = MAX_ROWS
|
# In case we want slow-mode, we can store the partitions in mongodb and process (Yes|No)?
|
||||||
info['part_size'] = PART_SIZE
|
#
|
||||||
|
|
||||||
logger.write({"module":"train","action":"setup-partition","input":info})
|
logger.write({"module":"train","action":"setup-partition","input":info})
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
@ -137,37 +139,18 @@ class Components :
|
||||||
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
||||||
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
|
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
|
||||||
_args['no_value']= args['no_value']
|
_args['no_value']= args['no_value']
|
||||||
MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
|
# MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
|
||||||
PART_SIZE = args['part_size'] if 'part_size' in args else 0
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
||||||
|
|
||||||
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
||||||
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
|
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
|
||||||
reader = args['reader']
|
reader = args['reader']
|
||||||
df = reader()
|
df = reader()
|
||||||
bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
||||||
if partition != '' and os.path.exists(log_folder):
|
if partition != '' and os.path.exists(log_folder):
|
||||||
bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
columns = args['columns']
|
||||||
# bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
|
df = np.array_split(df[columns].values,PART_SIZE)
|
||||||
lbound = int(bounds[int(partition)].left)
|
df = pd.DataFrame(df[ int (partition) ],columns = columns)
|
||||||
ubound = int(bounds[int(partition)].right)
|
|
||||||
df = df.iloc[lbound:ubound]
|
|
||||||
else:
|
|
||||||
#
|
|
||||||
# We have an implicit partition here
|
|
||||||
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
|
||||||
logger.write({"module":"generate","action":"virtual-parititions","input":{"rows":df.shape[0],"max_rows":MAX_ROWS,"part_size":PART_SIZE}})
|
|
||||||
for item in bounds :
|
|
||||||
|
|
||||||
lbound = int(item.left)
|
|
||||||
ubound = int(item.right)
|
|
||||||
args['reader'] = lambda: df[lbound:ubound]
|
|
||||||
args['partition'] = bounds.index(item)
|
|
||||||
|
|
||||||
self.generate(args)
|
|
||||||
return ;
|
|
||||||
if not os.path.exists(log_folder) :
|
|
||||||
log_folder = log_folder.replace(partition,'')
|
|
||||||
_args['logs'] = log_folder
|
|
||||||
|
|
||||||
_args['data'] = df
|
_args['data'] = df
|
||||||
# _args['data'] = reader()
|
# _args['data'] = reader()
|
||||||
|
@ -189,7 +172,7 @@ class Components :
|
||||||
_args['data'][name] = _dc[name]
|
_args['data'][name] = _dc[name]
|
||||||
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
|
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
|
||||||
if partition != '' :
|
if partition != '' :
|
||||||
info['partition'] = partition
|
info['partition'] = int(partition)
|
||||||
logger.write(info)
|
logger.write(info)
|
||||||
# filename = os.sep.join([log_folder,'output',name+'.csv'])
|
# filename = os.sep.join([log_folder,'output',name+'.csv'])
|
||||||
# data_comp[[name]].to_csv(filename,index=False)
|
# data_comp[[name]].to_csv(filename,index=False)
|
||||||
|
@ -218,7 +201,7 @@ class Components :
|
||||||
|
|
||||||
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} }
|
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} }
|
||||||
if partition :
|
if partition :
|
||||||
info ['partition'] = partition
|
info ['partition'] = int(partition)
|
||||||
logger.write({"module":"generate","action":"write","info":info} )
|
logger.write({"module":"generate","action":"write","info":info} )
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def callback(channel,method,header,stream):
|
def callback(channel,method,header,stream):
|
||||||
|
@ -229,8 +212,12 @@ class Components :
|
||||||
logger.write({'module':'process','action':'read-partition','input':info['info']})
|
logger.write({'module':'process','action':'read-partition','input':info['info']})
|
||||||
df = pd.DataFrame(info['data'])
|
df = pd.DataFrame(info['data'])
|
||||||
args = info['args']
|
args = info['args']
|
||||||
MAX_GPUS = 8
|
if args['num_gpu'] > 1 :
|
||||||
args['gpu'] = int(info['info']['partition']) if info['info']['partition'] < MAX_GPUS else np.random.choice(np.arange(MAX_GPUS),1).astype(int).tolist()[0]
|
args['gpu'] = int(info['info']['partition']) if info['info']['partition'] == 0 else info['info']['partition'] + 2
|
||||||
|
args['num_gpu'] = 2
|
||||||
|
else:
|
||||||
|
args['gpu'] = 0
|
||||||
|
args['num_gpu'] = 1
|
||||||
# if int(args['num_gpu']) > 1 and args['gpu'] > 0:
|
# if int(args['num_gpu']) > 1 and args['gpu'] > 0:
|
||||||
# args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus
|
# args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus
|
||||||
args['reader'] = lambda: df
|
args['reader'] = lambda: df
|
||||||
|
@ -296,7 +283,7 @@ if __name__ == '__main__' :
|
||||||
SYS_ARGS = dict(args) #-- things get lost in context
|
SYS_ARGS = dict(args) #-- things get lost in context
|
||||||
if 'read' in SYS_ARGS :
|
if 'read' in SYS_ARGS :
|
||||||
QUEUE_TYPE = 'queue.QueueReader'
|
QUEUE_TYPE = 'queue.QueueReader'
|
||||||
pointer = lambda qreader: qreader.read(1)
|
pointer = lambda qreader: qreader.read()
|
||||||
else:
|
else:
|
||||||
QUEUE_TYPE = 'queue.QueueListener'
|
QUEUE_TYPE = 'queue.QueueListener'
|
||||||
pointer = lambda qlistener: qlistener.listen()
|
pointer = lambda qlistener: qlistener.listen()
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -4,7 +4,7 @@ import sys
|
||||||
|
|
||||||
def read(fname):
|
def read(fname):
|
||||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
||||||
args = {"name":"data-maker","version":"1.1.8","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
|
args = {"name":"data-maker","version":"1.1.9","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
|
||||||
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
|
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
|
||||||
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
|
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
|
||||||
args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'
|
args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'
|
||||||
|
|
Loading…
Reference in New Issue