bug fix with queue connection dropping out

This commit is contained in:
Steve Nyemba 2020-03-06 13:00:32 -06:00
parent 57e32261c6
commit 872744c682
1 changed files with 18 additions and 15 deletions

View File

@ -99,7 +99,7 @@ class Components :
# print (columns) # print (columns)
info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE} info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE}
p = {"args":_args,"data":_df.to_dict(orient="records"),"info":info} p = {"args":_args,"data":_df.to_dict(orient="records"),"input":info}
part_index += 1 part_index += 1
qwriter.write(p) qwriter.write(p)
# #
@ -124,7 +124,8 @@ class Components :
# @log : # @log :
# Logging information about the training process for this partition (or not) # Logging information about the training process for this partition (or not)
# #
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":partition,"logs":_args['logs']} info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']}
logger.write({"module":"train","action":"train","input":info}) logger.write({"module":"train","action":"train","input":info})
data.maker.train(**_args) data.maker.train(**_args)
@ -211,7 +212,7 @@ class Components :
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} }
if partition : if partition :
info ['partition'] = int(partition) info ['partition'] = int(partition)
logger.write({"module":"generate","action":"write","info":info} ) logger.write({"module":"generate","action":"write","input":info} )
@staticmethod @staticmethod
def callback(channel,method,header,stream): def callback(channel,method,header,stream):
@ -221,11 +222,11 @@ class Components :
info = json.loads(stream) info = json.loads(stream)
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']}) logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
logger.write({'module':'process','action':'read-partition','input':info['info']}) logger.write({'module':'process','action':'read-partition','input':info['input']})
df = pd.DataFrame(info['data']) df = pd.DataFrame(info['data'])
args = info['args'] args = info['args']
if args['num_gpu'] > 1 : if args['num_gpu'] > 1 :
args['gpu'] = int(info['info']['partition']) if info['info']['partition'] == 0 else info['info']['partition'] + 2 args['gpu'] = int(info['info']['partition']) if info['input']['partition'] == 0 else info['input']['partition'] + 2
else: else:
args['gpu'] = 0 args['gpu'] = 0
@ -237,11 +238,12 @@ class Components :
# @TODO: Fix # @TODO: Fix
# There is an inconsistency in column/columns ... fix this shit! # There is an inconsistency in column/columns ... fix this shit!
# #
args['columns'] = args['column']
(Components()).train(**args)
logger.write({"module":"process","action":"exit","info":info["info"]})
channel.close() channel.close()
channel.connection.close() channel.connection.close()
args['columns'] = args['column']
(Components()).train(**args)
logger.write({"module":"process","action":"exit","input":info["input"]})
pass pass
if __name__ == '__main__' : if __name__ == '__main__' :
@ -280,18 +282,19 @@ if __name__ == '__main__' :
if ''.join(content).isnumeric() : if ''.join(content).isnumeric() :
# #
# we have partitions we are working with # we have partitions we are working with
make = lambda args: (Components()).generate(args) make = lambda _args: (Components()).generate(_args)
jobs = [] jobs = []
print (["Started ",len(jobs),"generators"])
for id in ''.join(content) : for id in ''.join(content) :
args['partition'] = id args['partition'] = id
job = Process(target=make,args=(args,args)) job = Process(target=make,args=(args,))
job.name = 'generator # '+str(id)
job.start() job.start()
jobs.append(job) jobs.append(job)
while (len(jobs)> 0) : print (["Started ",len(jobs),"generator"+"s" if len(jobs)>1 else "" ])
jobs = [jobs for job in jobs if job.is_alive()] while len(jobs)> 0 :
jobs = [job for job in jobs if job.is_alive()]
time.sleep(2) time.sleep(2)
# generator.generate(args) # generator.generate(args)