bug fix ... (hopfully makes a difference)

This commit is contained in:
Steve Nyemba 2020-03-06 11:40:47 -06:00
parent 32a5e19060
commit 8c5193cb6d
1 changed files with 9 additions and 1 deletions

View File

@ -2,6 +2,7 @@
import json import json
from transport import factory from transport import factory
import numpy as np import numpy as np
import time
import os import os
from multiprocessing import Process from multiprocessing import Process
import pandas as pd import pandas as pd
@ -76,7 +77,12 @@ class Components :
columns = args['columns'] columns = args['columns']
df = np.array_split(df[columns].values,PART_SIZE) df = np.array_split(df[columns].values,PART_SIZE)
qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'}) qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'})
part_index = 0 part_index = 0
#
# let's start n processes to listen & train this mother ...
#
#-- hopefully they learn as daemons
for _df in df: for _df in df:
# _args['logs'] = os.sep.join([log_folder,str(part_index)]) # _args['logs'] = os.sep.join([log_folder,str(part_index)])
@ -206,6 +212,7 @@ class Components :
if partition : if partition :
info ['partition'] = int(partition) info ['partition'] = int(partition)
logger.write({"module":"generate","action":"write","info":info} ) logger.write({"module":"generate","action":"write","info":info} )
@staticmethod @staticmethod
def callback(channel,method,header,stream): def callback(channel,method,header,stream):
if stream.decode('utf8') in ['QUIT','EXIT','END'] : if stream.decode('utf8') in ['QUIT','EXIT','END'] :
@ -306,6 +313,7 @@ if __name__ == '__main__' :
while len(jobs) > 0 : while len(jobs) > 0 :
jobs = [job for job in jobs if job.is_alive()] jobs = [job for job in jobs if job.is_alive()]
time.sleep(2)
# pointer(qhandler) # pointer(qhandler)