From 8c5193cb6d4293682f838597bcca7e8287e37d6d Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 6 Mar 2020 11:40:47 -0600 Subject: [PATCH] bug fix ... (hopfully makes a difference) --- pipeline.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pipeline.py b/pipeline.py index c5a16d8..df92427 100644 --- a/pipeline.py +++ b/pipeline.py @@ -2,6 +2,7 @@ import json from transport import factory import numpy as np +import time import os from multiprocessing import Process import pandas as pd @@ -76,7 +77,12 @@ class Components : columns = args['columns'] df = np.array_split(df[columns].values,PART_SIZE) qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'}) - part_index = 0 + part_index = 0 + # + # let's start n processes to listen & train this mother ... + # + #-- hopefully they learn as daemons + for _df in df: # _args['logs'] = os.sep.join([log_folder,str(part_index)]) @@ -206,6 +212,7 @@ class Components : if partition : info ['partition'] = int(partition) logger.write({"module":"generate","action":"write","info":info} ) + @staticmethod def callback(channel,method,header,stream): if stream.decode('utf8') in ['QUIT','EXIT','END'] : @@ -306,6 +313,7 @@ if __name__ == '__main__' : while len(jobs) > 0 : jobs = [job for job in jobs if job.is_alive()] + time.sleep(2) # pointer(qhandler)