data-maker/pipeline.py

693 lines
23 KiB
Python

#!/usr/bin/env python3
import json
from transport import factory
import numpy as np
import time
import os
from multiprocessing import Process, Lock
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery as bq
import data.maker
import copy
from data.params import SYS_ARGS
#
# The configuration array is now loaded and we will execute the pipe line as follows
class Components :
lock = Lock()
class KEYS :
PIPELINE_KEY = 'pipeline'
SQL_FILTER = 'filter'
@staticmethod
def get_filter (**args):
if args['qualifier'] == 'IN' :
return ' '.join([args['field'],args['qualifier'],'(',args['value'],')'])
else:
return ' '.join([args['field'],args['qualifier'],args['value']])
@staticmethod
def get_logger(**args) :
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
@staticmethod
def get(args):
"""
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
:sql basic sql statement
:condition optional condition and filters
"""
SQL = args['sql']
if Components.KEYS.SQL_FILTER in args :
FILTER_KEY = Components.KEYS.SQL_FILTER
SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]]
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER])
SQL = " ".join([SQL,'WHERE',condition])
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
if 'limit' in args :
SQL = SQL + ' LIMIT ' + args['limit']
#
# let's log the sql query that has been performed here
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}})
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
return df
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
@staticmethod
def split(X,MAX_ROWS=3,PART_SIZE=3):
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
def format_schema(self,schema):
_schema = {}
for _item in schema :
_type = int
_value = 0
if _item.field_type == 'FLOAT' :
_type =float
elif _item.field_type != 'INTEGER' :
_type = str
_value = ''
_schema[_item.name] = _type
return _schema
def get_ignore(self,**_args) :
if 'columns' in _args and 'data' in _args :
_df = _args['data']
terms = _args['columns']
return [name for name in _df.columns if np.sum( [int(field in name )for field in terms ]) ]
return []
def set_gpu(self,**_args) :
if 'gpu' in _args :
gpu = _args['gpu'] if type(_args['gpu']) != str else [_args['gpu']]
_index = str(gpu[0])
os.environ['CUDA_VISIBLE_DEVICES'] = _index
return gpu
else :
return None
def train(self,**args):
"""
This function will perform training on the basis of a given pointer that reads data
"""
schema = None
if 'file' in args :
df = pd.read_csv(args['file'])
del args['file']
elif 'data' not in args :
reader = factory.instance(**args['store']['source'])
if 'row_limit' in args :
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None
else:
df = args['data']
#
#
# df = df.fillna('')
if schema :
_schema = []
for _item in schema :
_type = int
_value = 0
if _item.field_type == 'FLOAT' :
_type =float
elif _item.field_type != 'INTEGER' :
_type = str
_value = ''
_schema += [{"name":_item.name,"type":_item.field_type}]
df[_item.name] = df[_item.name].fillna(_value).astype(_type)
args['schema'] = _schema
# df[_item.name] = df[_item.name].astype(_type)
_args = copy.deepcopy(args)
# _args['store'] = args['store']['source']
_args['data'] = df
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if 'continuous' in args :
x_cols = args['continuous']
else:
x_cols = []
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
_args['data'] = df[ list(set(df.columns)- set(_cols))]
#
# We need to make sure that continuous columns are removed
if x_cols :
_args['data'] = _args['data'][list(set(_args['data'].columns) - set(x_cols))]
if 'gpu' in args :
_args['gpu'] = self.set_gpu(gpu=args['gpu'])
if 'partition' in args :
_args['partition'] = args['partition']
if df.shape[0] and df.shape[0] :
#
# We have a full blown matrix to be processed
print ('-- Training --')
data.maker.train(**_args)
else:
print ("... skipping training !!")
if 'autopilot' in ( list(args.keys())) :
args['data'] = df
print (['autopilot mode enabled ....',args['context']])
self.generate(args)
pass
def approximate(self,values):
"""
:param values array of values to be approximated
"""
if values.dtype in [int,float] :
#
# @TODO: create bins?
r = np.random.dirichlet(values+.001) #-- dirichlet doesn't work on values with zeros
_sd = values[values > 0].std()
_me = values[values > 0].mean()
_mi = values.min()
x = []
_type = values.dtype
for index in np.arange(values.size) :
if np.random.choice([0,1],1)[0] :
value = values[index] + (values[index] * r[index])
else :
value = values[index] - (values[index] * r[index])
#
# randomly shifting the measurements
if np.random.choice([0,1],1)[0] and _me > _sd :
if np.random.choice([0,1],1)[0] :
value = value * np.divide(_me,_sd)
else:
value = value + (np.divide(_me,_sd))
value = int(value) if _type == int else np.round(value,2)
x.append( value)
np.random.shuffle(x)
return np.array(x)
else:
return values
pass
def shuffle(self,_args):
if 'data' in args :
df = data['data']
else:
reader = factory.instance(**args['store']['source'])
if 'file' in args :
df = pd.read_csv(args['file'])
elif 'data' in _args :
df = _args['data']
else:
if 'row_limit' in args and 'sql' in args:
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
schema = None
if 'schema' not in args and hasattr(reader,'meta') and 'file' not in args:
schema = reader.meta(table=args['from'])
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
#
# We are shufling designated colmns and will be approximating the others
#
x_cols = [] #-- coumns tobe approximated.
_cols = [] #-- columns to be ignored
if 'continuous' in args :
x_cols = args['continuous']
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
columns = args['columns'] if 'columns' in args else df.columns
columns = list(set(columns) - set(_cols))
for name in columns:
i = np.arange(df.shape[0])
np.random.shuffle(i)
if name in x_cols :
if df[name].unique().size > 0 :
df[name] = self.approximate(df.iloc[i][name].fillna(0).values)
# df[name] = df[name].astype(str)
# pass
df.index = np.arange(df.shape[0])
self.post(data=df,schema=schema,store=args['store']['target'])
def post(self,**_args) :
table = _args['from'] if 'from' in _args else _args['store']['table']
_schema = _args['schema'] if 'schema' in _args else None
writer = factory.instance(**_args['store'])
_df = _args['data']
if _schema :
columns = []
for _item in _schema :
name = _item['name']
_type = str
_value = 0
if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] :
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
#
# There is an issue with missing dates that needs to be resolved.
# for some reason a missing date/time here will cause the types to turn into timestamp (problem)
# The following is a hack to address the issue (alas) assuming 10 digit dates and 'NaT' replaces missing date values (pandas specifications)
#
_df[name] = _df[name].apply(lambda value: None if str(value) == 'NaT' else (str(value)[:10]) if _item['type'] in ['DATE','DATETIME'] else str(value))
#_df[name] = _df[name].dt.date
# _df[name] = pd.to_datetime(_df[name].fillna(''),errors='coerce')
else:
pass
_df[name] = pd.to_datetime(_df[name])
else:
value = 0
if _item['type'] == 'INTEGER' :
_type = np.int64
elif _item['type'] in ['FLOAT','NUMERIC']:
_type = np.float64
else:
_value = ''
_df[name] = _df[name].fillna(_value) #.astype(_type)
columns.append(name)
fields = _df.columns.tolist()
if not writer.has(table=table) and _args['store']['provider'] != 'bigquery':
_map = {'STRING':'VARCHAR(256)','INTEGER':'BIGINT'} if 'provider' in _args['store'] and _args['store']['provider'] != 'bigquery' else {}
_params = {'map':_map,'table':args['from']}
if _schema :
_params['schema'] = _schema
else:
_params['fields'] = fields
writer.make(**_params)
fields = _df.columns.tolist()
_df = _df[fields]
# writer.fields = fields
if _args['store']['provider'] == 'bigquery' :
print (['_______ POSTING ______________ ',table])
print (['_______________ ',_df.shape[0],' ___________________'])
writer.write(_df.astype(object),schema=_schema,table=table)
else:
writer.table = table
writer.write(_df)
# else:
# writer.write(_df,table=args['from'])
def finalize(self,args):
"""
This function performs post-processing opertions on a synthetic table i.e :
- remove duplicate keys
- remove orphaned keys i.e
"""
reader = factory.instance(**args['store']['source'])
logger = factory.instance(**args['store']['logs'])
target = args['store']['target']['args']['dataset']
source = args['store']['source']['args']['dataset']
table = args['from']
schema = reader.meta(table=args['from'])
#
# keys :
unique_field = "_".join([args['from'],'id']) if 'unique_fields' not in args else args['unique_fields']
fields = [ item.name if item.name != unique_field else "y."+item.name for item in schema]
SQL = [
"SELECT :fields FROM ",
"(SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x","INNER JOIN",
"(SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table ORDER BY RAND()) y",
"ON y.row_number = x.row_number"
]
SQL = " ".join(SQL).replace(":fields",",".join(fields)).replace(":table",table).replace(":source",source).replace(":target",target)
SQL = SQL.replace(":unique_field",unique_field)
#
# Use a native job to get this done ...
#
client = bq.Client.from_service_account_json(args['store']['source']['args']["private_key"])
job = bq.QueryJobConfig()
job.destination = client.dataset(target).table(table)
job.use_query_cache = True
job.allow_large_results = True
# job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY)
job.write_disposition = "WRITE_TRUNCATE"
job.priority = 'BATCH'
r = client.query(SQL,location='US',job_config=job)
logger.write({"job":r.job_id,"action":"finalize", "args":{"sql":SQL,"source":"".join([source,table]),"destimation":".".join([target,table])}})
#
# Keep a log of what just happened...
#
otable = ".".join([args['store']['source']['args']['dataset'],args['from']])
dtable = ".".join([args['store']['target']['args']['dataset'],args['from']])
def generate(self,args):
"""
This function will generate data and store it to a given,
"""
store = args['store']['logs']
if 'args' in store :
store['args']['doc'] = args['context']
else:
store['doc'] = args['context']
logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
ostore = args['store']['target']
writer = factory.instance(**ostore)
schema = args['schema'] if 'schema' in args else None
if 'data' in args :
df = args['data']
else:
reader = factory.instance(**args['store']['source'])
if 'row_limit' in args :
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
if 'schema' not in args and hasattr(reader,'meta'):
schema = reader.meta(table=args['from'])
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
# else:
# #
# # This will account for autopilot mode ...
# df = args['data']
_cast = {}
if schema :
for _item in schema :
dtype = str
name = _item['name']
novalue = 0
if _item['type'] in ['INTEGER','NUMERIC']:
dtype = np.int64
elif _item['type'] == 'FLOAT' :
dtype = np.float64
else:
novalue = ''
# _cast[schema['name']] = dtype
df[name] = df[name].fillna(novalue).astype(dtype)
_info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[1]},"schema":schema}
logger.write(_info)
_dc = pd.DataFrame()
# for mdf in df :
args['data'] = df.copy()
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if 'continuous' in args :
x_cols = args['continuous']
else:
x_cols = []
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
args['data'] = args['data'][ list(set(df.columns)- set(_cols))]
#
# We need to remove the continuous columns from the data-frame
# @TODO: Abstract this !!
#
real_df = pd.DataFrame()
if x_cols :
args['data'] = args['data'][list(set(args['data'].columns) - set(x_cols))]
real_df = df[x_cols].copy()
args['candidates'] = 1 if 'candidates' not in args else int(args['candidates'])
if 'gpu' in args :
args['gpu'] = self.set_gpu(gpu=args['gpu'])
# if 'partition' in args :
# args['logs'] = os.sep.join([args['logs'],str(args['partition'])])
_info = {"module":"gan-prep","action":"prune","shape":{"rows":args['data'].shape[0],"columns":args['data'].shape[1]}}
logger.write(_info)
if args['data'].shape[0] > 0 and args['data'].shape[1] > 0 :
candidates = (data.maker.generate(**args))
else:
candidates = [df]
# if 'sql.BQWriter' in ostore['type'] :
_columns = None
skip_columns = []
_schema = schema
if schema :
cols = [_item['name'] for _item in _schema]
else:
cols = df.columns.tolist()
_info = {"module":"gan-prep","action":"selection","input":{"candidates":len(candidates),"features":cols}}
logger.write(_info)
for _df in candidates :
#
# we need to format the fields here to make sure we have something cohesive
#
if not skip_columns :
if 'ignore' in args and 'columns' in args['ignore'] :
skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns'])
#
# We perform a series of set operations to insure that the following conditions are met:
# - the synthetic dataset only has fields that need to be synthesized
# - The original dataset has all the fields except those that need to be synthesized
#
_df = _df[list(set(_df.columns) - set(skip_columns))].copy()
if x_cols :
_approx = {}
for _col in x_cols :
if real_df[_col].unique().size > 0 :
_df[_col] = self.approximate(real_df[_col].values)
_approx[_col] = {
"io":{"min":_df[_col].min().astype(float),"max":_df[_col].max().astype(float),"mean":_df[_col].mean().astype(float),"sd":_df[_col].values.std().astype(float),"missing": _df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":_df[_col].where(_df[_col] == 0).dropna().count().astype(float)},
"real":{"min":real_df[_col].min().astype(float),"max":real_df[_col].max().astype(float),"mean":real_df[_col].mean().astype(float),"sd":real_df[_col].values.std().astype(float),"missing": real_df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":real_df[_col].where(_df[_col] == 0).dropna().count().astype(float)}
}
else:
_df[_col] = -1
logger.write({"module":"gan-generate","action":"approximate","status":_approx})
if set(df.columns) & set(_df.columns) :
_columns = list(set(df.columns) - set(_df.columns))
df = df[_columns]
#
# Let us merge the dataset here and and have a comprehensive dataset
_df = pd.DataFrame.join(df,_df)
_params = {'data':_df,'store' : ostore}
if _schema :
_params ['schema'] = _schema
_info = {"module":"gan-prep","action":"write","input":{"rows":_df.shape[0],"cols":_df.shape[1]}}
logger.write(_info)
self.post(**_params)
# print (['_______ posting _________________',_df.shape])
break
pass
# else:
# pass
def bind(self,**_args):
print (_args)
if __name__ == '__main__' :
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
f = open (filename)
_config = json.loads(f.read())
f.close()
PIPELINE = _config['pipeline']
index = SYS_ARGS['index']
if index.isnumeric() :
index = int(SYS_ARGS['index'])
else:
#
# The index provided is a key to a pipeline entry mainly the context
#
N = len(PIPELINE)
f = [i for i in range(0,N) if PIPELINE[i]['context'] == index]
index = f[0] if f else 0
#
print ("..::: ",PIPELINE[index]['context'],':::..')
args = (PIPELINE[index])
for key in _config :
if key == 'pipeline' or key in args:
#
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
#
continue
args[key] = _config[key]
args = dict(args,**SYS_ARGS)
if 'matrix_size' in args :
args['matrix_size'] = int(args['matrix_size'])
if 'batch_size' not in args :
args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
if 'dataset' not in args :
args['dataset'] = 'combined20191004v2_deid'
args['logs'] = args['logs'] if 'logs' in args else 'logs'
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
#
# @TODO:
# Log what was initiated so we have context of this processing ...
#
GPU_CHIPS = args['gpu'] if 'gpu' in args else None
if GPU_CHIPS and type(GPU_CHIPS) != list :
GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS]
if 'gpu' in SYS_ARGS :
args['gpu'] = GPU_CHIPS
jobs = []
if 'generate' in SYS_ARGS :
#
# Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
if 'all-chips' in SYS_ARGS and GPU_CHIPS:
index = 0
jobs = []
for _gpu in GPU_CHIPS :
_args = copy.deepcopy(args)
_args['gpu'] = [int(_gpu)]
_args['partition'] = int(_gpu) #index
index += 1
make = lambda _params: (Components()).generate(_params)
job = Process(target=make,args=( dict(_args),))
job.name = 'Trainer # ' + str(index)
job.start()
jobs.append(job)
pass
else:
generator = Components()
generator.generate(args)
elif 'bind' in SYS_ARGS :
import binder
_args = _config['_map']
_args['store'] = copy.deepcopy(_config['store'])
if 'init' in SYS_ARGS :
#
# Creating and persisting the map ...
print (['.... Binding Initialization'])
# jobs = binder.Init(**_args)
_mapped = binder.Init(**_args)
_schema = [{"name":_name,"type":"INTEGER"} for _name in _mapped.columns.tolist()]
publisher = lambda _params: (Components()).post(**_params)
_args = {'data':_mapped,'store':_config['store']['target']}
_args['store']['table'] = '_map'
if _args['store']['provider'] =='bigquery' :
_args['schema'] = _schema
job = Process (target = publisher,args=(_args,))
job.start()
jobs = [job]
else:
#
# Applying the map of k on a particular dataset
#
index = int(SYS_ARGS['index'])
_args['config'] = _config['pipeline'][index]
_args['original_key'] = 'person_id' if 'original_key' in _config else 'person_id'
table = _config['pipeline'][index]['from']
_df = binder.ApplyOn(**_args)
_df = np.array_split(_df,PART_SIZE)
jobs = []
print (['Publishing ',PART_SIZE,' PARTITION'])
for data in _df :
publisher = lambda _params: ( Components() ).post(**_params)
_args = {'data':data,'store':_config['store']['target']}
_args['store']['table'] = table
print (_args['store'])
job = Process(target = publisher,args=(_args,))
job.name = "Publisher "+str(len(jobs)+1)
job.start()
jobs.append(job)
elif 'shuffle' in SYS_ARGS :
index = 0
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
for index in GPU_CHIPS :
publisher = lambda _params: ( Components() ).shuffle(_params)
job = Process (target = publisher,args=( args,))
job.name = 'Shuffler #' + str(index)
job.start()
jobs.append(job)
else:
shuffler = Components()
shuffler.shuffle(args)
pass
elif 'train' in SYS_ARGS:
# DATA = np.array_split(DATA,PART_SIZE)
#
# Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition
# @TODO: Find better name for partition
#
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
index = 0
print (['... launching ',len(GPU_CHIPS),' jobs',args['context']])
for _gpu in GPU_CHIPS :
_args = copy.deepcopy(args)
_args['gpu'] = [int(_gpu)]
_args['partition'] = int(_gpu) #index
index += 1
make = lambda _params: (Components()).train(**_params)
job = Process(target=make,args=( _args,))
job.name = 'Trainer # ' + str(index)
job.start()
jobs.append(job)
else:
#
# The choice of the chip will be made internally
agent = Components()
agent.train(**args)
#
# If we have any obs we should wait till they finish
#
DIRTY = 0
if (len(jobs)) :
print (['.... waiting on ',len(jobs),' jobs'])
while len(jobs)> 0 :
DIRTY =1
jobs = [job for job in jobs if job.is_alive()]
time.sleep(2)
if DIRTY:
print (["..:: jobs finished "])
#
# We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations
# This holds true for bigquery - bigquery only
IS_BIGQUERY = _config['store']['source']['provider'] == _config['store']['target']['provider'] and _config['store']['source']['provider'] == 'bigquery'
# if 'bind' not in SYS_ARGS and IS_BIGQUERY and ('autopilot' in SYS_ARGS or 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS)) :
# #
# # We should pull all the primary keys and regenerate them in order to insure some form of consistency
# #
# #
# #
# print (["..:: Finalizing process"])
# (Components()).finalize(args)