693 lines
23 KiB
Python
693 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
from transport import factory
|
|
import numpy as np
|
|
import time
|
|
import os
|
|
from multiprocessing import Process, Lock
|
|
import pandas as pd
|
|
from google.oauth2 import service_account
|
|
from google.cloud import bigquery as bq
|
|
import data.maker
|
|
import copy
|
|
from data.params import SYS_ARGS
|
|
|
|
#
|
|
# The configuration array is now loaded and we will execute the pipe line as follows
|
|
|
|
class Components :
|
|
lock = Lock()
|
|
class KEYS :
|
|
PIPELINE_KEY = 'pipeline'
|
|
SQL_FILTER = 'filter'
|
|
@staticmethod
|
|
def get_filter (**args):
|
|
if args['qualifier'] == 'IN' :
|
|
return ' '.join([args['field'],args['qualifier'],'(',args['value'],')'])
|
|
else:
|
|
return ' '.join([args['field'],args['qualifier'],args['value']])
|
|
@staticmethod
|
|
def get_logger(**args) :
|
|
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
@staticmethod
|
|
def get(args):
|
|
"""
|
|
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
|
|
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
|
|
:sql basic sql statement
|
|
:condition optional condition and filters
|
|
"""
|
|
SQL = args['sql']
|
|
if Components.KEYS.SQL_FILTER in args :
|
|
FILTER_KEY = Components.KEYS.SQL_FILTER
|
|
SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]]
|
|
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
|
|
|
|
condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER])
|
|
SQL = " ".join([SQL,'WHERE',condition])
|
|
|
|
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
|
|
|
|
if 'limit' in args :
|
|
SQL = SQL + ' LIMIT ' + args['limit']
|
|
#
|
|
# let's log the sql query that has been performed here
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}})
|
|
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
|
|
return df
|
|
|
|
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
|
|
@staticmethod
|
|
def split(X,MAX_ROWS=3,PART_SIZE=3):
|
|
|
|
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
|
|
def format_schema(self,schema):
|
|
_schema = {}
|
|
for _item in schema :
|
|
_type = int
|
|
_value = 0
|
|
if _item.field_type == 'FLOAT' :
|
|
_type =float
|
|
elif _item.field_type != 'INTEGER' :
|
|
_type = str
|
|
_value = ''
|
|
_schema[_item.name] = _type
|
|
return _schema
|
|
def get_ignore(self,**_args) :
|
|
if 'columns' in _args and 'data' in _args :
|
|
_df = _args['data']
|
|
terms = _args['columns']
|
|
return [name for name in _df.columns if np.sum( [int(field in name )for field in terms ]) ]
|
|
|
|
return []
|
|
def set_gpu(self,**_args) :
|
|
if 'gpu' in _args :
|
|
gpu = _args['gpu'] if type(_args['gpu']) != str else [_args['gpu']]
|
|
_index = str(gpu[0])
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = _index
|
|
return gpu
|
|
else :
|
|
return None
|
|
def train(self,**args):
|
|
"""
|
|
This function will perform training on the basis of a given pointer that reads data
|
|
|
|
"""
|
|
schema = None
|
|
if 'file' in args :
|
|
|
|
df = pd.read_csv(args['file'])
|
|
del args['file']
|
|
elif 'data' not in args :
|
|
|
|
reader = factory.instance(**args['store']['source'])
|
|
|
|
|
|
if 'row_limit' in args :
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None
|
|
else:
|
|
df = args['data']
|
|
|
|
#
|
|
#
|
|
# df = df.fillna('')
|
|
if schema :
|
|
_schema = []
|
|
for _item in schema :
|
|
_type = int
|
|
_value = 0
|
|
if _item.field_type == 'FLOAT' :
|
|
_type =float
|
|
elif _item.field_type != 'INTEGER' :
|
|
_type = str
|
|
_value = ''
|
|
_schema += [{"name":_item.name,"type":_item.field_type}]
|
|
df[_item.name] = df[_item.name].fillna(_value).astype(_type)
|
|
args['schema'] = _schema
|
|
# df[_item.name] = df[_item.name].astype(_type)
|
|
_args = copy.deepcopy(args)
|
|
# _args['store'] = args['store']['source']
|
|
_args['data'] = df
|
|
#
|
|
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
else:
|
|
x_cols = []
|
|
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
_args['data'] = df[ list(set(df.columns)- set(_cols))]
|
|
#
|
|
# We need to make sure that continuous columns are removed
|
|
if x_cols :
|
|
_args['data'] = _args['data'][list(set(_args['data'].columns) - set(x_cols))]
|
|
if 'gpu' in args :
|
|
_args['gpu'] = self.set_gpu(gpu=args['gpu'])
|
|
if 'partition' in args :
|
|
_args['partition'] = args['partition']
|
|
if df.shape[0] and df.shape[0] :
|
|
#
|
|
# We have a full blown matrix to be processed
|
|
print ('-- Training --')
|
|
data.maker.train(**_args)
|
|
else:
|
|
print ("... skipping training !!")
|
|
|
|
if 'autopilot' in ( list(args.keys())) :
|
|
|
|
args['data'] = df
|
|
print (['autopilot mode enabled ....',args['context']])
|
|
self.generate(args)
|
|
|
|
pass
|
|
|
|
def approximate(self,values):
|
|
"""
|
|
:param values array of values to be approximated
|
|
"""
|
|
if values.dtype in [int,float] :
|
|
#
|
|
# @TODO: create bins?
|
|
r = np.random.dirichlet(values+.001) #-- dirichlet doesn't work on values with zeros
|
|
_sd = values[values > 0].std()
|
|
_me = values[values > 0].mean()
|
|
_mi = values.min()
|
|
x = []
|
|
_type = values.dtype
|
|
for index in np.arange(values.size) :
|
|
|
|
if np.random.choice([0,1],1)[0] :
|
|
value = values[index] + (values[index] * r[index])
|
|
|
|
else :
|
|
value = values[index] - (values[index] * r[index])
|
|
#
|
|
# randomly shifting the measurements
|
|
if np.random.choice([0,1],1)[0] and _me > _sd :
|
|
if np.random.choice([0,1],1)[0] :
|
|
value = value * np.divide(_me,_sd)
|
|
else:
|
|
value = value + (np.divide(_me,_sd))
|
|
value = int(value) if _type == int else np.round(value,2)
|
|
x.append( value)
|
|
np.random.shuffle(x)
|
|
return np.array(x)
|
|
else:
|
|
return values
|
|
pass
|
|
|
|
def shuffle(self,_args):
|
|
if 'data' in args :
|
|
df = data['data']
|
|
else:
|
|
reader = factory.instance(**args['store']['source'])
|
|
if 'file' in args :
|
|
df = pd.read_csv(args['file'])
|
|
elif 'data' in _args :
|
|
df = _args['data']
|
|
else:
|
|
if 'row_limit' in args and 'sql' in args:
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
schema = None
|
|
if 'schema' not in args and hasattr(reader,'meta') and 'file' not in args:
|
|
schema = reader.meta(table=args['from'])
|
|
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
|
|
#
|
|
# We are shufling designated colmns and will be approximating the others
|
|
#
|
|
x_cols = [] #-- coumns tobe approximated.
|
|
_cols = [] #-- columns to be ignored
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
|
|
columns = args['columns'] if 'columns' in args else df.columns
|
|
columns = list(set(columns) - set(_cols))
|
|
for name in columns:
|
|
i = np.arange(df.shape[0])
|
|
np.random.shuffle(i)
|
|
if name in x_cols :
|
|
if df[name].unique().size > 0 :
|
|
df[name] = self.approximate(df.iloc[i][name].fillna(0).values)
|
|
# df[name] = df[name].astype(str)
|
|
# pass
|
|
|
|
df.index = np.arange(df.shape[0])
|
|
self.post(data=df,schema=schema,store=args['store']['target'])
|
|
def post(self,**_args) :
|
|
table = _args['from'] if 'from' in _args else _args['store']['table']
|
|
_schema = _args['schema'] if 'schema' in _args else None
|
|
writer = factory.instance(**_args['store'])
|
|
_df = _args['data']
|
|
if _schema :
|
|
columns = []
|
|
for _item in _schema :
|
|
name = _item['name']
|
|
_type = str
|
|
_value = 0
|
|
if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] :
|
|
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
|
|
#
|
|
# There is an issue with missing dates that needs to be resolved.
|
|
# for some reason a missing date/time here will cause the types to turn into timestamp (problem)
|
|
# The following is a hack to address the issue (alas) assuming 10 digit dates and 'NaT' replaces missing date values (pandas specifications)
|
|
#
|
|
_df[name] = _df[name].apply(lambda value: None if str(value) == 'NaT' else (str(value)[:10]) if _item['type'] in ['DATE','DATETIME'] else str(value))
|
|
#_df[name] = _df[name].dt.date
|
|
# _df[name] = pd.to_datetime(_df[name].fillna(''),errors='coerce')
|
|
else:
|
|
pass
|
|
_df[name] = pd.to_datetime(_df[name])
|
|
else:
|
|
value = 0
|
|
if _item['type'] == 'INTEGER' :
|
|
_type = np.int64
|
|
elif _item['type'] in ['FLOAT','NUMERIC']:
|
|
_type = np.float64
|
|
else:
|
|
|
|
_value = ''
|
|
_df[name] = _df[name].fillna(_value) #.astype(_type)
|
|
columns.append(name)
|
|
|
|
fields = _df.columns.tolist()
|
|
if not writer.has(table=table) and _args['store']['provider'] != 'bigquery':
|
|
|
|
_map = {'STRING':'VARCHAR(256)','INTEGER':'BIGINT'} if 'provider' in _args['store'] and _args['store']['provider'] != 'bigquery' else {}
|
|
_params = {'map':_map,'table':args['from']}
|
|
if _schema :
|
|
_params['schema'] = _schema
|
|
|
|
else:
|
|
_params['fields'] = fields
|
|
|
|
writer.make(**_params)
|
|
|
|
fields = _df.columns.tolist()
|
|
_df = _df[fields]
|
|
# writer.fields = fields
|
|
if _args['store']['provider'] == 'bigquery' :
|
|
print (['_______ POSTING ______________ ',table])
|
|
print (['_______________ ',_df.shape[0],' ___________________'])
|
|
writer.write(_df.astype(object),schema=_schema,table=table)
|
|
else:
|
|
writer.table = table
|
|
writer.write(_df)
|
|
# else:
|
|
# writer.write(_df,table=args['from'])
|
|
|
|
|
|
def finalize(self,args):
|
|
"""
|
|
This function performs post-processing opertions on a synthetic table i.e :
|
|
- remove duplicate keys
|
|
- remove orphaned keys i.e
|
|
"""
|
|
reader = factory.instance(**args['store']['source'])
|
|
logger = factory.instance(**args['store']['logs'])
|
|
|
|
target = args['store']['target']['args']['dataset']
|
|
source = args['store']['source']['args']['dataset']
|
|
table = args['from']
|
|
schema = reader.meta(table=args['from'])
|
|
#
|
|
# keys :
|
|
unique_field = "_".join([args['from'],'id']) if 'unique_fields' not in args else args['unique_fields']
|
|
fields = [ item.name if item.name != unique_field else "y."+item.name for item in schema]
|
|
SQL = [
|
|
"SELECT :fields FROM ",
|
|
"(SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x","INNER JOIN",
|
|
"(SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table ORDER BY RAND()) y",
|
|
"ON y.row_number = x.row_number"
|
|
]
|
|
SQL = " ".join(SQL).replace(":fields",",".join(fields)).replace(":table",table).replace(":source",source).replace(":target",target)
|
|
SQL = SQL.replace(":unique_field",unique_field)
|
|
#
|
|
# Use a native job to get this done ...
|
|
#
|
|
client = bq.Client.from_service_account_json(args['store']['source']['args']["private_key"])
|
|
job = bq.QueryJobConfig()
|
|
job.destination = client.dataset(target).table(table)
|
|
job.use_query_cache = True
|
|
job.allow_large_results = True
|
|
# job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY)
|
|
job.write_disposition = "WRITE_TRUNCATE"
|
|
job.priority = 'BATCH'
|
|
r = client.query(SQL,location='US',job_config=job)
|
|
logger.write({"job":r.job_id,"action":"finalize", "args":{"sql":SQL,"source":"".join([source,table]),"destimation":".".join([target,table])}})
|
|
#
|
|
# Keep a log of what just happened...
|
|
#
|
|
otable = ".".join([args['store']['source']['args']['dataset'],args['from']])
|
|
dtable = ".".join([args['store']['target']['args']['dataset'],args['from']])
|
|
def generate(self,args):
|
|
"""
|
|
This function will generate data and store it to a given,
|
|
"""
|
|
store = args['store']['logs']
|
|
if 'args' in store :
|
|
store['args']['doc'] = args['context']
|
|
else:
|
|
store['doc'] = args['context']
|
|
logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
|
|
ostore = args['store']['target']
|
|
writer = factory.instance(**ostore)
|
|
|
|
schema = args['schema'] if 'schema' in args else None
|
|
if 'data' in args :
|
|
|
|
df = args['data']
|
|
else:
|
|
|
|
reader = factory.instance(**args['store']['source'])
|
|
if 'row_limit' in args :
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
if 'schema' not in args and hasattr(reader,'meta'):
|
|
schema = reader.meta(table=args['from'])
|
|
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
|
|
|
|
# else:
|
|
# #
|
|
# # This will account for autopilot mode ...
|
|
# df = args['data']
|
|
_cast = {}
|
|
if schema :
|
|
|
|
for _item in schema :
|
|
dtype = str
|
|
name = _item['name']
|
|
novalue = 0
|
|
if _item['type'] in ['INTEGER','NUMERIC']:
|
|
dtype = np.int64
|
|
|
|
elif _item['type'] == 'FLOAT' :
|
|
dtype = np.float64
|
|
else:
|
|
novalue = ''
|
|
# _cast[schema['name']] = dtype
|
|
df[name] = df[name].fillna(novalue).astype(dtype)
|
|
|
|
_info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[1]},"schema":schema}
|
|
logger.write(_info)
|
|
|
|
|
|
_dc = pd.DataFrame()
|
|
# for mdf in df :
|
|
args['data'] = df.copy()
|
|
#
|
|
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
else:
|
|
x_cols = []
|
|
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
args['data'] = args['data'][ list(set(df.columns)- set(_cols))]
|
|
#
|
|
# We need to remove the continuous columns from the data-frame
|
|
# @TODO: Abstract this !!
|
|
#
|
|
real_df = pd.DataFrame()
|
|
if x_cols :
|
|
args['data'] = args['data'][list(set(args['data'].columns) - set(x_cols))]
|
|
real_df = df[x_cols].copy()
|
|
|
|
args['candidates'] = 1 if 'candidates' not in args else int(args['candidates'])
|
|
if 'gpu' in args :
|
|
args['gpu'] = self.set_gpu(gpu=args['gpu'])
|
|
# if 'partition' in args :
|
|
# args['logs'] = os.sep.join([args['logs'],str(args['partition'])])
|
|
|
|
_info = {"module":"gan-prep","action":"prune","shape":{"rows":args['data'].shape[0],"columns":args['data'].shape[1]}}
|
|
logger.write(_info)
|
|
if args['data'].shape[0] > 0 and args['data'].shape[1] > 0 :
|
|
candidates = (data.maker.generate(**args))
|
|
|
|
else:
|
|
candidates = [df]
|
|
|
|
# if 'sql.BQWriter' in ostore['type'] :
|
|
_columns = None
|
|
skip_columns = []
|
|
_schema = schema
|
|
if schema :
|
|
cols = [_item['name'] for _item in _schema]
|
|
else:
|
|
cols = df.columns.tolist()
|
|
_info = {"module":"gan-prep","action":"selection","input":{"candidates":len(candidates),"features":cols}}
|
|
logger.write(_info)
|
|
for _df in candidates :
|
|
#
|
|
# we need to format the fields here to make sure we have something cohesive
|
|
#
|
|
|
|
if not skip_columns :
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns'])
|
|
#
|
|
# We perform a series of set operations to insure that the following conditions are met:
|
|
# - the synthetic dataset only has fields that need to be synthesized
|
|
# - The original dataset has all the fields except those that need to be synthesized
|
|
#
|
|
|
|
_df = _df[list(set(_df.columns) - set(skip_columns))].copy()
|
|
if x_cols :
|
|
_approx = {}
|
|
for _col in x_cols :
|
|
if real_df[_col].unique().size > 0 :
|
|
|
|
|
|
_df[_col] = self.approximate(real_df[_col].values)
|
|
_approx[_col] = {
|
|
"io":{"min":_df[_col].min().astype(float),"max":_df[_col].max().astype(float),"mean":_df[_col].mean().astype(float),"sd":_df[_col].values.std().astype(float),"missing": _df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":_df[_col].where(_df[_col] == 0).dropna().count().astype(float)},
|
|
"real":{"min":real_df[_col].min().astype(float),"max":real_df[_col].max().astype(float),"mean":real_df[_col].mean().astype(float),"sd":real_df[_col].values.std().astype(float),"missing": real_df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":real_df[_col].where(_df[_col] == 0).dropna().count().astype(float)}
|
|
}
|
|
else:
|
|
_df[_col] = -1
|
|
logger.write({"module":"gan-generate","action":"approximate","status":_approx})
|
|
if set(df.columns) & set(_df.columns) :
|
|
_columns = list(set(df.columns) - set(_df.columns))
|
|
df = df[_columns]
|
|
|
|
#
|
|
# Let us merge the dataset here and and have a comprehensive dataset
|
|
|
|
_df = pd.DataFrame.join(df,_df)
|
|
_params = {'data':_df,'store' : ostore,'from':args['from']}
|
|
if _schema :
|
|
_params ['schema'] = _schema
|
|
_info = {"module":"gan-prep","action":"write","input":{"rows":_df.shape[0],"cols":_df.shape[1]}}
|
|
logger.write(_info)
|
|
self.post(**_params)
|
|
# print (['_______ posting _________________',_df.shape])
|
|
break
|
|
|
|
|
|
pass
|
|
# else:
|
|
# pass
|
|
def bind(self,**_args):
|
|
print (_args)
|
|
|
|
|
|
if __name__ == '__main__' :
|
|
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
|
|
f = open (filename)
|
|
_config = json.loads(f.read())
|
|
f.close()
|
|
PIPELINE = _config['pipeline']
|
|
index = SYS_ARGS['index']
|
|
if index.isnumeric() :
|
|
index = int(SYS_ARGS['index'])
|
|
else:
|
|
#
|
|
# The index provided is a key to a pipeline entry mainly the context
|
|
#
|
|
N = len(PIPELINE)
|
|
f = [i for i in range(0,N) if PIPELINE[i]['context'] == index]
|
|
index = f[0] if f else 0
|
|
#
|
|
|
|
print ("..::: ",PIPELINE[index]['context'],':::..')
|
|
args = (PIPELINE[index])
|
|
for key in _config :
|
|
if key == 'pipeline' or key in args:
|
|
#
|
|
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
|
|
#
|
|
continue
|
|
args[key] = _config[key]
|
|
|
|
args = dict(args,**SYS_ARGS)
|
|
if 'matrix_size' in args :
|
|
args['matrix_size'] = int(args['matrix_size'])
|
|
if 'batch_size' not in args :
|
|
args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
|
|
if 'dataset' not in args :
|
|
args['dataset'] = 'combined20191004v2_deid'
|
|
args['logs'] = args['logs'] if 'logs' in args else 'logs'
|
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
|
#
|
|
# @TODO:
|
|
# Log what was initiated so we have context of this processing ...
|
|
#
|
|
|
|
GPU_CHIPS = args['gpu'] if 'gpu' in args else None
|
|
if GPU_CHIPS and type(GPU_CHIPS) != list :
|
|
GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS]
|
|
if 'gpu' in SYS_ARGS :
|
|
args['gpu'] = GPU_CHIPS
|
|
jobs = []
|
|
if 'generate' in SYS_ARGS :
|
|
#
|
|
# Let us see if we have partitions given the log folder
|
|
|
|
content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
|
|
if 'all-chips' in SYS_ARGS and GPU_CHIPS:
|
|
index = 0
|
|
jobs = []
|
|
for _gpu in GPU_CHIPS :
|
|
_args = copy.deepcopy(args)
|
|
_args['gpu'] = [int(_gpu)]
|
|
_args['partition'] = int(_gpu) #index
|
|
index += 1
|
|
make = lambda _params: (Components()).generate(_params)
|
|
job = Process(target=make,args=( dict(_args),))
|
|
job.name = 'Trainer # ' + str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
pass
|
|
else:
|
|
|
|
generator = Components()
|
|
generator.generate(args)
|
|
elif 'bind' in SYS_ARGS :
|
|
import binder
|
|
_args = _config['_map']
|
|
_args['store'] = copy.deepcopy(_config['store'])
|
|
if 'init' in SYS_ARGS :
|
|
#
|
|
# Creating and persisting the map ...
|
|
print (['.... Binding Initialization'])
|
|
# jobs = binder.Init(**_args)
|
|
_mapped = binder.Init(**_args)
|
|
|
|
|
|
_schema = [{"name":_name,"type":"INTEGER"} for _name in _mapped.columns.tolist()]
|
|
publisher = lambda _params: (Components()).post(**_params)
|
|
_args = {'data':_mapped,'store':_config['store']['target']}
|
|
_args['store']['table'] = '_map'
|
|
if _args['store']['provider'] =='bigquery' :
|
|
_args['schema'] = _schema
|
|
|
|
job = Process (target = publisher,args=(_args,))
|
|
job.start()
|
|
jobs = [job]
|
|
else:
|
|
#
|
|
# Applying the map of k on a particular dataset
|
|
#
|
|
index = int(SYS_ARGS['index'])
|
|
_args['config'] = _config['pipeline'][index]
|
|
_args['original_key'] = 'person_id' if 'original_key' in _config else 'person_id'
|
|
table = _config['pipeline'][index]['from']
|
|
_df = binder.ApplyOn(**_args)
|
|
_df = np.array_split(_df,PART_SIZE)
|
|
jobs = []
|
|
print (['Publishing ',PART_SIZE,' PARTITION'])
|
|
for data in _df :
|
|
publisher = lambda _params: ( Components() ).post(**_params)
|
|
_args = {'data':data,'store':_config['store']['target']}
|
|
_args['store']['table'] = table
|
|
print (_args['store'])
|
|
job = Process(target = publisher,args=(_args,))
|
|
job.name = "Publisher "+str(len(jobs)+1)
|
|
job.start()
|
|
jobs.append(job)
|
|
|
|
elif 'shuffle' in SYS_ARGS :
|
|
index = 0
|
|
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
|
|
|
|
for index in GPU_CHIPS :
|
|
publisher = lambda _params: ( Components() ).shuffle(_params)
|
|
job = Process (target = publisher,args=( args,))
|
|
job.name = 'Shuffler #' + str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
else:
|
|
shuffler = Components()
|
|
shuffler.shuffle(args)
|
|
pass
|
|
elif 'train' in SYS_ARGS:
|
|
|
|
# DATA = np.array_split(DATA,PART_SIZE)
|
|
#
|
|
# Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition
|
|
# @TODO: Find better name for partition
|
|
#
|
|
|
|
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
|
|
index = 0
|
|
print (['... launching ',len(GPU_CHIPS),' jobs',args['context']])
|
|
for _gpu in GPU_CHIPS :
|
|
_args = copy.deepcopy(args)
|
|
_args['gpu'] = [int(_gpu)]
|
|
_args['partition'] = int(_gpu) #index
|
|
index += 1
|
|
make = lambda _params: (Components()).train(**_params)
|
|
job = Process(target=make,args=( _args,))
|
|
job.name = 'Trainer # ' + str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
|
|
|
|
|
|
|
|
else:
|
|
#
|
|
# The choice of the chip will be made internally
|
|
|
|
agent = Components()
|
|
agent.train(**args)
|
|
#
|
|
# If we have any obs we should wait till they finish
|
|
#
|
|
DIRTY = 0
|
|
if (len(jobs)) :
|
|
print (['.... waiting on ',len(jobs),' jobs'])
|
|
while len(jobs)> 0 :
|
|
DIRTY =1
|
|
jobs = [job for job in jobs if job.is_alive()]
|
|
time.sleep(2)
|
|
if DIRTY:
|
|
print (["..:: jobs finished "])
|
|
#
|
|
# We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations
|
|
# This holds true for bigquery - bigquery only
|
|
IS_BIGQUERY = _config['store']['source']['provider'] == _config['store']['target']['provider'] and _config['store']['source']['provider'] == 'bigquery'
|
|
|
|
# if 'bind' not in SYS_ARGS and IS_BIGQUERY and ('autopilot' in SYS_ARGS or 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS)) :
|
|
# #
|
|
# # We should pull all the primary keys and regenerate them in order to insure some form of consistency
|
|
# #
|
|
|
|
# #
|
|
# #
|
|
|
|
# print (["..:: Finalizing process"])
|
|
# (Components()).finalize(args)
|