bug fix with minor corrections
This commit is contained in:
parent
c75bb54d2b
commit
32a5e19060
20
pipeline.py
20
pipeline.py
|
@ -92,7 +92,7 @@ class Components :
|
||||||
_df = pd.DataFrame(_df,columns=columns)
|
_df = pd.DataFrame(_df,columns=columns)
|
||||||
# print (columns)
|
# print (columns)
|
||||||
|
|
||||||
info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":2,"part_size":PART_SIZE}
|
info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE}
|
||||||
p = {"args":_args,"data":_df.to_dict(orient="records"),"info":info}
|
p = {"args":_args,"data":_df.to_dict(orient="records"),"info":info}
|
||||||
part_index += 1
|
part_index += 1
|
||||||
qwriter.write(p)
|
qwriter.write(p)
|
||||||
|
@ -134,7 +134,7 @@ class Components :
|
||||||
partition = args['partition'] if 'partition' in args else ''
|
partition = args['partition'] if 'partition' in args else ''
|
||||||
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
|
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
|
||||||
|
|
||||||
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
||||||
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
||||||
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
||||||
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
|
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
|
||||||
|
@ -147,15 +147,18 @@ class Components :
|
||||||
reader = args['reader']
|
reader = args['reader']
|
||||||
df = reader()
|
df = reader()
|
||||||
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
||||||
if partition != '' and os.path.exists(log_folder):
|
if partition != '' :
|
||||||
columns = args['columns']
|
columns = args['columns']
|
||||||
df = np.array_split(df[columns].values,PART_SIZE)
|
df = np.array_split(df[columns].values,PART_SIZE)
|
||||||
df = pd.DataFrame(df[ int (partition) ],columns = columns)
|
df = pd.DataFrame(df[ int (partition) ],columns = columns)
|
||||||
|
info = {"parition":int(partition),"rows":df.shape[0],"cols":df.shape[0],"part_size":PART_SIZE}
|
||||||
|
logger.write({"module":"generate","action":"partition","input":info})
|
||||||
|
|
||||||
_args['data'] = df
|
_args['data'] = df
|
||||||
# _args['data'] = reader()
|
# _args['data'] = reader()
|
||||||
#_args['data'] = _args['data'].astype(object)
|
#_args['data'] = _args['data'].astype(object)
|
||||||
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
_args['num_gpu'] = 1
|
||||||
|
_args['gpu'] = partition
|
||||||
_dc = data.maker.generate(**_args)
|
_dc = data.maker.generate(**_args)
|
||||||
#
|
#
|
||||||
# We need to post the generate the data in order to :
|
# We need to post the generate the data in order to :
|
||||||
|
@ -205,7 +208,9 @@ class Components :
|
||||||
logger.write({"module":"generate","action":"write","info":info} )
|
logger.write({"module":"generate","action":"write","info":info} )
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def callback(channel,method,header,stream):
|
def callback(channel,method,header,stream):
|
||||||
|
if stream.decode('utf8') in ['QUIT','EXIT','END'] :
|
||||||
|
channel.close()
|
||||||
|
channel.connection.close()
|
||||||
info = json.loads(stream)
|
info = json.loads(stream)
|
||||||
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
|
||||||
|
|
||||||
|
@ -214,7 +219,7 @@ class Components :
|
||||||
args = info['args']
|
args = info['args']
|
||||||
if args['num_gpu'] > 1 :
|
if args['num_gpu'] > 1 :
|
||||||
args['gpu'] = int(info['info']['partition']) if info['info']['partition'] == 0 else info['info']['partition'] + 2
|
args['gpu'] = int(info['info']['partition']) if info['info']['partition'] == 0 else info['info']['partition'] + 2
|
||||||
args['num_gpu'] = 2
|
|
||||||
else:
|
else:
|
||||||
args['gpu'] = 0
|
args['gpu'] = 0
|
||||||
args['num_gpu'] = 1
|
args['num_gpu'] = 1
|
||||||
|
@ -242,8 +247,7 @@ if __name__ == '__main__' :
|
||||||
args = (PIPELINE[index])
|
args = (PIPELINE[index])
|
||||||
|
|
||||||
args = dict(args,**SYS_ARGS)
|
args = dict(args,**SYS_ARGS)
|
||||||
args['max_rows'] = int(args['max_rows']) if 'max_rows' in args else 3
|
|
||||||
args['part_size'] = int(args['part_size']) if 'part_size' in args else 4
|
|
||||||
args['logs'] = args['logs'] if 'logs' in args else 'logs'
|
args['logs'] = args['logs'] if 'logs' in args else 'logs'
|
||||||
if 'dataset' not in args :
|
if 'dataset' not in args :
|
||||||
args['dataset'] = 'combined20191004v2_deid'
|
args['dataset'] = 'combined20191004v2_deid'
|
||||||
|
|
Loading…
Reference in New Issue