bug fix and upgrades to base functionalities

This commit is contained in:
Steve Nyemba 2020-03-04 11:49:18 -06:00
parent a2988a5972
commit 8e722d5bf1
3 changed files with 313 additions and 125 deletions

View File

@ -431,9 +431,9 @@ class Train (GNet):
def network(self,**args): def network(self,**args):
stage = args['stage'] stage = args['stage']
opt = args['opt'] opt = args['opt']
tower_grads = [] tower_grads = []
per_gpu_w = [] per_gpu_w = []
iterator, features_placeholder, labels_placeholder = self.input_fn() iterator, features_placeholder, labels_placeholder = self.input_fn()
with tf.compat.v1.variable_scope(tf.compat.v1.get_variable_scope()): with tf.compat.v1.variable_scope(tf.compat.v1.get_variable_scope()):
for i in range(self.NUM_GPUS): for i in range(self.NUM_GPUS):
@ -550,6 +550,7 @@ class Predict(GNet):
label = y[:, 1] * len(ma) + tf.squeeze(tf.matmul(y[:, 2:], tf.constant(ma, dtype=tf.int32))) label = y[:, 1] * len(ma) + tf.squeeze(tf.matmul(y[:, 2:], tf.constant(ma, dtype=tf.int32)))
else: else:
label = None label = None
fake = self.generator.network(inputs=z, label=label) fake = self.generator.network(inputs=z, label=label)
init = tf.compat.v1.global_variables_initializer() init = tf.compat.v1.global_variables_initializer()
saver = tf.compat.v1.train.Saver() saver = tf.compat.v1.train.Saver()
@ -577,11 +578,13 @@ class Predict(GNet):
# if we are dealing with numeric values only we can perform a simple marginal sum against the indexes # if we are dealing with numeric values only we can perform a simple marginal sum against the indexes
# The code below will insure we have some acceptable cardinal relationships between id and synthetic values # The code below will insure we have some acceptable cardinal relationships between id and synthetic values
# #
df = ( pd.DataFrame(np.round(f).astype(np.int32))) df = pd.DataFrame(np.round(f).astype(np.int32))
p = 0 not in df.sum(axis=1).values p = 0 not in df.sum(axis=1).values
x = df.sum(axis=1).values x = df.sum(axis=1).values
if np.divide( np.sum(x), x.size) > .9 or p: if np.divide( np.sum(x), x.size) > .9 or p and np.sum(x) == x.size:
ratio.append(np.divide( np.sum(x), x.size)) ratio.append(np.divide( np.sum(x), x.size))
found.append(df) found.append(df)
if i == CANDIDATE_COUNT: if i == CANDIDATE_COUNT:
@ -597,11 +600,13 @@ class Predict(GNet):
INDEX = np.random.choice(np.arange(len(found)),1)[0] INDEX = np.random.choice(np.arange(len(found)),1)[0]
INDEX = ratio.index(np.max(ratio)) INDEX = ratio.index(np.max(ratio))
df = found[INDEX] df = found[INDEX]
columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']] columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']]
# r = np.zeros((self.ROW_COUNT,len(columns))) # r = np.zeros((self.ROW_COUNT,len(columns)))
r = np.zeros(self.ROW_COUNT) # r = np.zeros(self.ROW_COUNT)
df.columns = self.values df.columns = self.values
if len(found): if len(found):
# print (len(found),NTH_VALID_CANDIDATE) # print (len(found),NTH_VALID_CANDIDATE)
@ -618,6 +623,10 @@ class Predict(GNet):
missing = np.repeat(0, np.where(ii==1)[0].size) missing = np.repeat(0, np.where(ii==1)[0].size)
else: else:
missing = [] missing = []
#
# @TODO:
# Log the findings here in terms of ratio, missing, candidate count
# print ([np.max(ratio),len(missing),len(found),i])
i = np.where(ii == 0)[0] i = np.where(ii == 0)[0]
df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1)) df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1))
df.columns = columns df.columns = columns

View File

@ -15,6 +15,7 @@ from transport import factory
from data.bridge import Binary from data.bridge import Binary
import threading as thread import threading as thread
class ContinuousToDiscrete : class ContinuousToDiscrete :
ROUND_UP = 2
@staticmethod @staticmethod
def binary(X,n=4) : def binary(X,n=4) :
""" """
@ -22,7 +23,7 @@ class ContinuousToDiscrete :
""" """
# BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist() # BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist()
BOUNDS = ContinuousToDiscrete.bounds(X,n) BOUNDS = ContinuousToDiscrete.bounds(np.round(X,ContinuousToDiscrete.ROUND_UP),n)
# _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS] # _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS]
_matrix = [] _matrix = []
@ -41,7 +42,7 @@ class ContinuousToDiscrete :
@staticmethod @staticmethod
def bounds(x,n): def bounds(x,n):
return list(pd.cut(np.array(x),n).categories) return list(pd.cut(np.array( np.round(x,ContinuousToDiscrete.ROUND_UP) ),n).categories)
@ -66,7 +67,7 @@ class ContinuousToDiscrete :
ubound = BOUNDS[ index ].right ubound = BOUNDS[ index ].right
lbound = BOUNDS[ index ].left lbound = BOUNDS[ index ].left
x_ = np.round(np.random.uniform(lbound,ubound),3).astype(float) x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
values.append(x_) values.append(x_)
lbound = ubound lbound = ubound
@ -104,10 +105,10 @@ def train (**args) :
# if 'float' not in df[col].dtypes.name : # if 'float' not in df[col].dtypes.name :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values # args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
if 'float' in df[col].dtypes.name and col in CONTINUOUS: if 'float' in df[col].dtypes.name and col in CONTINUOUS:
BIN_SIZE = 10 if 'bin_size' not in args else int(args['bin_size']) BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32) args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32)
else: else:
args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values
args['column'] = col args['column'] = col
@ -157,25 +158,27 @@ def generate(**args):
args['context'] = col args['context'] = col
args['column'] = col args['column'] = col
if 'float' in df[col].dtypes.name or col in CONTINUOUS : # if 'float' in df[col].dtypes.name or col in CONTINUOUS :
# # #
# We should create the bins for the values we are observing here # # We should create the bins for the values we are observing here
BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) # BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE) # values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE)
else: # # values = np.unique(values).tolist()
values = df[col].unique().tolist() # else:
values = df[col].unique().tolist()
args['values'] = values args['values'] = values
args['row_count'] = df.shape[0] args['row_count'] = df.shape[0]
# #
# we can determine the cardinalities here so we know what to allow or disallow # we can determine the cardinalities here so we know what to allow or disallow
handler = gan.Predict (**args) handler = gan.Predict (**args)
handler.load_meta(col) handler.load_meta(col)
r = handler.apply() r = handler.apply()
_df[col] = r[col] BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
_df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if 'float' in df[col].dtypes.name or col in CONTINUOUS else r[col]
# #
# @TODO: log basic stats about the synthetic attribute # @TODO: log basic stats about the synthetic attribute
# #
# print (r)s
# break # break
return _df return _df

View File

@ -1,5 +1,6 @@
import json import json
from transport import factory from transport import factory
import numpy as np
import os import os
from multiprocessing import Process from multiprocessing import Process
import pandas as pd import pandas as pd
@ -8,119 +9,294 @@ import data.maker
from data.params import SYS_ARGS from data.params import SYS_ARGS
f = open ('config.json')
PIPELINE = json.loads(f.read())
f.close()
# #
# The configuration array is now loaded and we will execute the pipe line as follows # The configuration array is now loaded and we will execute the pipe line as follows
DATASET='combined20190510_deid' DATASET='combined20190510'
class Components : class Components :
@staticmethod
def get(args):
SQL = args['sql']
if 'condition' in args :
condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')'])
SQL = " ".join([SQL,'WHERE',condition])
SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 " @staticmethod
return SQL #+ " LIMIT 10000 " def get(args):
"""
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
:sql basic sql statement
:condition optional condition and filters
"""
SQL = args['sql']
if 'condition' in args :
condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')'])
SQL = " ".join([SQL,'WHERE',condition])
@staticmethod SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 "
def train(args): if 'limit' in args :
""" SQL = SQL + 'LIMIT ' + args['limit']
This function will instanciate a worker that will train given a message that is provided to it credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
This is/will be a separate process that will df = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
""" return df
print (['starting .... ',args['notify'],args['context']] )
#SQL = args['sql'] # return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
#if 'condition' in args : @staticmethod
# condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')']) def split(X,MAX_ROWS=3,PART_SIZE=3):
# SQL = " ".join([SQL,'WHERE',condition])
print ( args['context']) return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = os.sep.join(["logs",args['context']])
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":250,"num_gpus":2,"column":args['columns'],"id":"person_id","logger":logger}
os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu']
#SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 "
SQL = Components.get(args)
if 'limit' in args :
SQL = ' '.join([SQL,'limit',args['limit'] ])
_args['max_epochs'] = 250 if 'max_epochs' not in args else args['max_epochs']
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
#_args['data'] = _args['data'].astype(object)
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
data.maker.train(**_args)
@staticmethod
def generate(args):
"""
This function will generate data and store it to a given,
"""
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = os.sep.join(["logs",args['context']])
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":250,"num_gpus":2,"column":args['columns'],"id":"person_id","logger":logger}
os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu']
SQL = Components.get(args)
if 'limit' in args :
SQL = " ".join([SQL ,'limit', args['limit'] ])
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').fillna('')
#_args['data'] = _args['data'].astype(object)
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
_args['max_epochs'] = 250 if 'max_epochs' not in args else args['max_epochs']
_args['no_value'] = args['no_value'] if 'no_value' in args else '' def train(self,**args):
#credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') """
#_args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard') This function will perform training on the basis of a given pointer that reads data
#_args['data'] = _args['data'].astype(object)
_dc = data.maker.generate(**_args)
#
# We need to post the generate the data in order to :
# 1. compare immediately
# 2. synthetic copy
#
cols = _dc.columns.tolist()
print (args['columns'])
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
print (_args['data'].shape)
print (_args['data'].shape)
for name in cols :
_args['data'][name] = _dc[name]
# filename = os.sep.join([log_folder,'output',name+'.csv'])
# data_comp[[name]].to_csv(filename,index=False)
# """
#-- Let us store all of this into bigquery #
prefix = args['notify']+'.'+_args['context'] # @TODO: we need to log something here about the parameters being passed
table = '_'.join([prefix,'compare','io']) pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args)
data_comp.to_gbq(if_exists='replace',destination_table=table,credentials=credentials,chunksize=50000) df = pointer()
_args['data'].to_gbq(if_exists='replace',destination_table=table.replace('compare','full'),credentials=credentials,chunksize=50000)
data_comp.to_csv(os.sep.join([log_folder,table+'.csv']),index=False)
#
# Now we can parse the arguments and submit the entire thing to training
#
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = args['logs'] if 'logs' in args else 'logs'
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = args['part_size'] if 'part_size' in args else 0
if df.shape[0] > MAX_ROWS and 'partition' not in args:
lbound = 0
bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'})
for b in bounds :
part_index = bounds.index(b)
ubound = int(b.right)
_data = df.iloc[lbound:ubound][args['columns']]
lbound = ubound
# _args['logs'] = os.sep.join([log_folder,str(part_index)])
_args['partition'] = str(part_index)
_args['logger'] = {'args':{'dbname':'aou','doc':args['context']},'type':'mongo.MongoWriter'}
#
# We should post the the partitions to a queue server (at least the instructions on ):
# - where to get the data
# - and athe arguments to use (partition #,columns,gpu,epochs)
#
info = {"rows":_data.shape[0],"cols":_data.shape[1], "paritition":part_index,"logs":_args['logs']}
p = {"args":_args,"data":_data.to_dict(orient="records"),"info":info}
qwriter.write(p)
#
# @TODO:
# - Notify that information was just posted to the queue
info['max_rows'] = MAX_ROWS
info['part_size'] = PART_SIZE
logger.write({"module":"train","action":"setup-partition","input":info})
pass
else:
partition = args['partition'] if 'partition' in args else ''
log_folder = os.sep.join([log_folder,args['context'],partition])
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
_args['data'] = df
#
# @log :
# Logging information about the training process for this partition (or not)
#
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":partition,"logs":_args['logs']}
logger.write({"module":"train","action":"train","input":info})
data.maker.train(**_args)
pass
# @staticmethod
def generate(self,args):
"""
This function will generate data and store it to a given,
"""
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = args['logs'] if 'logs' in args else 'logs'
partition = args['partition'] if 'partition' in args else ''
log_folder = os.sep.join([log_folder,args['context'],partition])
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
_args['no_value']= args['no_value']
MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = args['part_size'] if 'part_size' in args else 0
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
reader = args['reader']
df = reader()
if 'partition' in args :
bounds = Components.split(df,MAX_ROWS,PART_SIZE)
# bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
lbound = int(bounds[int(partition)].left)
ubound = int(bounds[int(partition)].right)
df = df.iloc[lbound:ubound]
_args['data'] = df
# _args['data'] = reader()
#_args['data'] = _args['data'].astype(object)
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
_dc = data.maker.generate(**_args)
#
# We need to post the generate the data in order to :
# 1. compare immediately
# 2. synthetic copy
#
cols = _dc.columns.tolist()
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
for name in cols :
_args['data'][name] = _dc[name]
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
if partition != '' :
info['partition'] = partition
logger.write(info)
# filename = os.sep.join([log_folder,'output',name+'.csv'])
# data_comp[[name]].to_csv(filename,index=False)
#
#-- Let us store all of this into bigquery
prefix = args['notify']+'.'+_args['context']
table = '_'.join([prefix,partition,'io']).replace('__','_')
folder = os.sep.join([args['logs'],args['context'],partition,'output'])
if 'file' in args :
_fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
_pname = os.sep.join([folder,table])+'.csv'
data_comp.to_csv( _pname,index=False)
_args['data'].to_csv(_fname,index=False)
else:
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_pname = os.sep.join([folder,table+'.csv'])
_fname = table.replace('_io','_full_io')
data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000)
data_comp.to_csv(_pname,index=False)
INSERT_FLAG = 'replace' if 'partition' not in args else 'append'
_args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000)
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} }
if partition :
info ['partition'] = partition
logger.write({"module":"generate","action":"write","info":info} )
@staticmethod
def callback(channel,method,header,stream):
info = json.loads(stream)
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
logger.write({'module':'process','action':'read-partition','input':info['info']})
df = pd.DataFrame(info['data'])
args = info['args']
if int(args['num_gpu']) > 1 and args['gpu'] > 0:
args['gpu'] = args['gpu'] + args['num_gpu']
args['reader'] = lambda: df
#
# @TODO: Fix
# There is an inconsistency in column/columns ... fix this shit!
#
args['columns'] = args['column']
(Components()).train(**args)
logger.write({"module":"process","action":"exit","info":info["info"]})
channel.close()
channel.connection.close()
pass
if __name__ == '__main__' : if __name__ == '__main__' :
index = int(SYS_ARGS['index']) filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
f = open (filename)
PIPELINE = json.loads(f.read())
f.close()
index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else 0
args = (PIPELINE[index])
args['dataset'] = 'combined20190510'
args = dict(args,**SYS_ARGS)
args['max_rows'] = int(args['max_rows']) if 'max_rows' in args else 3
args['part_size']= int(args['part_size']) if 'part_size' in args else 3
args = (PIPELINE[index]) #
#if 'limit' in SYS_ARGS : # @TODO:
# args['limit'] = SYS_ARGS['limit'] # Log what was initiated so we have context of this processing ...
#args['dataset'] = 'combined20190510' #
SYS_ARGS['dataset'] = 'combined20190510_deid' if 'dataset' not in SYS_ARGS else SYS_ARGS['dataset'] if 'listen' not in SYS_ARGS :
#if 'max_epochs' in SYS_ARGS : if 'file' in args :
# args['max_epochs'] = SYS_ARGS['max_epochs'] reader = lambda: pd.read_csv(args['file']) ;
args = dict(args,**SYS_ARGS) else:
if 'generate' in SYS_ARGS : reader = lambda: Components().get(args)
Components.generate(args) args['reader'] = reader
else: if 'generate' in SYS_ARGS :
#
Components.train(args) # Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],args['context']]))
generator = Components()
if ''.join(content).isnumeric() :
#
# we have partitions we are working with
for id in ''.join(content) :
args['partition'] = id
generator.generate(args)
else:
generator.generate(args)
# Components.generate(args)
elif 'listen' in args :
#
# This will start a worker just in case to listen to a queue
if 'read' in SYS_ARGS :
QUEUE_TYPE = 'queue.QueueReader'
pointer = lambda qreader: qreader.read(1)
else:
QUEUE_TYPE = 'queue.QueueListener'
pointer = lambda qlistener: qlistener.listen()
N = int(SYS_ARGS['jobs']) if 'jobs' in SYS_ARGS else 1
qhandlers = [factory.instance(type=QUEUE_TYPE,args={'queue':'aou.io'}) for i in np.arange(N)]
jobs = []
for qhandler in qhandlers :
qhandler.callback = Components.callback
job = Process(target=pointer,args=(qhandler,))
job.start()
jobs.append(job)
#
# let us wait for the jobs
print (["Started ",len(jobs)," trainers"])
while len(jobs) > 0 :
jobs = [job for job in jobs if job.is_alive()]
# pointer(qhandler)
# qreader.read(1)
pass
else:
trainer = Components()
trainer.train(**args)
# Components.train(**args)
#for args in PIPELINE : #for args in PIPELINE :
#args['dataset'] = 'combined20190510' #args['dataset'] = 'combined20190510'
#process = Process(target=Components.train,args=(args,)) #process = Process(target=Components.train,args=(args,))
#process.name = args['context'] #process.name = args['context']
#process.start() #process.start()
# Components.train(args) # Components.train(args)