data-maker/pipeline.py

512 lines
16 KiB
Python

#!/usr/bin/env python3
import json
from transport import factory
import numpy as np
import time
import os
from multiprocessing import Process, Lock
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery as bq
import data.maker
import copy
from data.params import SYS_ARGS
#
# The configuration array is now loaded and we will execute the pipe line as follows
class Components :
lock = Lock()
class KEYS :
PIPELINE_KEY = 'pipeline'
SQL_FILTER = 'filter'
@staticmethod
def get_filter (**args):
if args['qualifier'] == 'IN' :
return ' '.join([args['field'],args['qualifier'],'(',args['value'],')'])
else:
return ' '.join([args['field'],args['qualifier'],args['value']])
@staticmethod
def get_logger(**args) :
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
@staticmethod
def get(args):
"""
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
:sql basic sql statement
:condition optional condition and filters
"""
SQL = args['sql']
if Components.KEYS.SQL_FILTER in args :
FILTER_KEY = Components.KEYS.SQL_FILTER
SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]]
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER])
SQL = " ".join([SQL,'WHERE',condition])
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
if 'limit' in args :
SQL = SQL + ' LIMIT ' + args['limit']
#
# let's log the sql query that has been performed here
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}})
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
return df
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
@staticmethod
def split(X,MAX_ROWS=3,PART_SIZE=3):
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
def format_schema(self,schema):
_schema = {}
for _item in schema :
_type = int
_value = 0
if _item.field_type == 'FLOAT' :
_type =float
elif _item.field_type != 'INTEGER' :
_type = str
_value = ''
_schema[_item.name] = _type
return _schema
def get_ignore(self,**_args) :
if 'columns' in _args and 'data' in _args :
_df = _args['data']
terms = _args['columns']
return [name for name in _df.columns if np.sum( [int(field in name )for field in terms ]) > 0 ]
return []
def set_gpu(self,**_args) :
if 'gpu' in _args :
gpu = _args['gpu'] if type(_args['gpu']) != str else [_args['gpu']]
_index = str(gpu[0])
os.environ['CUDA_VISIBLE_DEVICES'] = _index
return gpu
else :
return None
def train(self,**args):
"""
This function will perform training on the basis of a given pointer that reads data
"""
schema = None
if 'file' in args :
df = pd.read_csv(args['file'])
del args['file']
elif 'data' not in args :
reader = factory.instance(**args['store']['source'])
if 'row_limit' in args :
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None
else:
df = args['data']
#
#
# df = df.fillna('')
if schema :
_schema = []
for _item in schema :
_type = int
_value = 0
if _item.field_type == 'FLOAT' :
_type =float
elif _item.field_type != 'INTEGER' :
_type = str
_value = ''
_schema += [{"name":_item.name,"type":_item.field_type}]
df[_item.name] = df[_item.name].fillna(_value).astype(_type)
args['schema'] = _schema
# df[_item.name] = df[_item.name].astype(_type)
_args = copy.deepcopy(args)
# _args['store'] = args['store']['source']
_args['data'] = df
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if 'continuous' in args :
x_cols = args['continuous']
else:
x_cols = []
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
_args['data'] = df[ list(set(df.columns)- set(_cols))]
#
# We need to make sure that continuous columns are removed
if x_cols :
_args['data'] = _args['data'][list(set(_args['data'].columns) - set(x_cols))]
if 'gpu' in args :
_args['gpu'] = self.set_gpu(gpu=args['gpu'])
if df.shape[0] and df.shape[0] :
#
# We have a full blown matrix to be processed
data.maker.train(**_args)
else:
print ("... skipping training !!")
if 'autopilot' in ( list(args.keys())) :
args['data'] = df
print (['autopilot mode enabled ....',args['context']])
self.generate(args)
pass
def approximate(self,values):
"""
:param values array of values to be approximated
"""
if values.dtype in [int,float] :
r = np.random.dirichlet(values)
x = []
_type = values.dtype
for index in np.arange(values.size) :
if np.random.choice([0,1],1)[0] :
value = values[index] + (values[index] * r[index])
else :
value = values[index] - (values[index] * r[index])
value = int(value) if _type == int else np.round(value,2)
x.append( value)
np.random.shuffle(x)
return np.array(x)
else:
return values
pass
# @staticmethod
def generate(self,args):
"""
This function will generate data and store it to a given,
"""
store = args['store']['logs']
store['doc'] = args['context']
logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
ostore = args['store']['target']
writer = factory.instance(**ostore)
schema = args['schema'] if 'schema' in args else None
if 'data' in args :
df = args['data']
else:
reader = factory.instance(**args['store']['source'])
if 'row_limit' in args :
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
if 'schema' not in args and hasattr(reader,'meta'):
schema = reader.meta(table=args['from'])
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
# else:
# #
# # This will account for autopilot mode ...
# df = args['data']
_info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[0]}}
_dc = pd.DataFrame()
# for mdf in df :
args['data'] = df.copy()
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if 'continuous' in args :
x_cols = args['continuous']
else:
x_cols = []
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
args['data'] = args['data'][ list(set(df.columns)- set(_cols))]
#
# We need to remove the continuous columns from the data-frame
# @TODO: Abstract this !!
#
real_df = pd.DataFrame()
if x_cols :
args['data'] = args['data'][list(set(args['data'].columns) - set(x_cols))]
real_df = df['data'][x_cols].copy()
args['candidates'] = 1 if 'candidates' not in args else int(args['candidates'])
if 'gpu' in args :
args['gpu'] = self.set_gpu(gpu=args['gpu'])
candidates = (data.maker.generate(**args))
if 'sql.BQWriter' in ostore['type'] :
#table = ".".join([ostore['['dataset'],args['context']])
# writer = factory.instance(**ostore)
_columns = None
skip_columns = []
_schema = schema
if schema :
cols = [_item['name'] for _item in _schema]
else:
cols = df.columns
for _df in candidates :
#
# we need to format the fields here to make sure we have something cohesive
#
if not skip_columns :
# _columns = set(df.columns) - set(_df.columns)
if 'ignore' in args and 'columns' in args['ignore'] :
skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns'])
# for name in args['ignore']['columns'] :
# for _name in _df.columns:
# if _name in name:
# skip_columns.append(_name)
#
# We perform a series of set operations to insure that the following conditions are met:
# - the synthetic dataset only has fields that need to be synthesized
# - The original dataset has all the fields except those that need to be synthesized
#
_df = _df[list(set(_df.columns) - set(skip_columns))]
if x_cols :
for _col in x_cols :
if real_df[_col].unique().size > 0 :
_df[_col] = self.approximate(df[_col].fillna(-1))
else:
_df[_col] = -1
if set(df.columns) & set(_df.columns) :
_columns = set(df.columns) - set(_df.columns)
df = df[_columns]
#
# Let us merge the dataset here and and have a comprehensive dataset
_df = pd.DataFrame.join(df,_df)
if _schema :
for _item in _schema :
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
_df[_item['name']] = _df[_item['name']].astype(str)
pass
if _schema :
writer.write(_df[cols],schema=_schema,table=args['from'])
else:
writer.write(_df[cols],table=args['from'])
# writer.write(df,table=table)
pass
else:
pass
# #
# # We need to post the generate the data in order to :
# # 1. compare immediately
# # 2. synthetic copy
# #
# cols = _dc.columns.tolist()
# data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
# #
# # performing basic analytics on the synthetic data generated (easy to quickly asses)
# #
# info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}}
# #
# # @TODO: Send data over to a process for analytics
# base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
# cols = _dc.columns.tolist()
# for name in cols :
# _args['data'][name] = _dc[name]
# #
# #-- Let us store all of this into bigquery
# prefix = args['notify']+'.'+_args['context']
# partition = str(partition)
# table = '_'.join([prefix,partition,'io']).replace('__','_')
# folder = os.sep.join([args['logs'],args['context'],partition,'output'])
# if 'file' in args :
# _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
# _pname = os.sep.join([folder,table])+'.csv'
# data_comp.to_csv( _pname,index=False)
# _args['data'].to_csv(_fname,index=False)
# _id = 'path'
# else:
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
# _pname = os.sep.join([folder,table+'.csv'])
# _fname = table.replace('_io','_full_io')
# partial = '.'.join(['io',args['context']+'_partial_io'])
# complete= '.'.join(['io',args['context']+'_full_io'])
# data_comp.to_csv(_pname,index=False)
# if 'dump' in args :
# print (_args['data'].head())
# else:
# Components.lock.acquire()
# data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
# _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000)
# Components.lock.release()
# _id = 'dataset'
# info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
# if partition :
# info ['partition'] = int(partition)
# logger.write({"module":"generate","action":"write","input":info} )
if __name__ == '__main__' :
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
f = open (filename)
_config = json.loads(f.read())
f.close()
PIPELINE = _config['pipeline']
index = SYS_ARGS['index']
if index.isnumeric() :
index = int(SYS_ARGS['index'])
else:
#
# The index provided is a key to a pipeline entry mainly the context
#
N = len(PIPELINE)
f = [i for i in range(0,N) if PIPELINE[i]['context'] == index]
index = f[0] if f else 0
#
print ("..::: ",PIPELINE[index]['context'])
args = (PIPELINE[index])
for key in _config :
if key == 'pipeline' or key in args:
#
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
#
continue
args[key] = _config[key]
args = dict(args,**SYS_ARGS)
if 'matrix_size' in args :
args['matrix_size'] = int(args['matrix_size'])
if 'batch_size' not in args :
args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
if 'dataset' not in args :
args['dataset'] = 'combined20191004v2_deid'
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
#
# @TODO:
# Log what was initiated so we have context of this processing ...
#
# if 'listen' not in SYS_ARGS :
# if 'file' in args :
# DATA = pd.read_csv(args['file']) ;
# schema = []
# else:
# DATA = Components().get(args)
# client = bq.Client.from_service_account_json(args["private_key"])
# schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
# COLUMNS = DATA.columns
# DATA = np.array_split(DATA,PART_SIZE)
# args['schema'] = schema
if 'generate' in SYS_ARGS :
#
# Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
generator = Components()
# if ''.join(content).isnumeric() :
# #
# # we have partitions we are working with
# jobs = []
# # columns = DATA.columns.tolist()
# # DATA = np.array_split(DATA,PART_SIZE)
# for index in range(0,PART_SIZE) :
# if 'focus' in args and int(args['focus']) != index :
# #
# # This handles failures/recoveries for whatever reason
# # If we are only interested in generating data for a given partition
# continue
# # index = id.index(id)
# args['partition'] = index
# args['data'] = DATA[index]
# if int(args['num_gpu']) > 1 :
# args['gpu'] = index
# else:
# args['gpu']=0
# make = lambda _args: (Components()).generate(_args)
# job = Process(target=make,args=(args,))
# job.name = 'generator # '+str(index)
# job.start()
# jobs.append(job)
# # if len(jobs) == 1 :
# # job.join()
# print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ])
# while len(jobs)> 0 :
# jobs = [job for job in jobs if job.is_alive()]
# time.sleep(2)
# # generator.generate(args)
# else:
# generator.generate(args)
# Components.generate(args)
generator.generate(args)
else:
# DATA = np.array_split(DATA,PART_SIZE)
agent = Components()
agent.train(**args)
# jobs = []
# for index in range(0,PART_SIZE) :
# if 'focus' in args and int(args['focus']) != index :
# continue
# args['part_size'] = PART_SIZE
# args['partition'] = index
# args['data'] = DATA[index]
# if int(args['num_gpu']) > 1 :
# args['gpu'] = index
# else:
# args['gpu']=0
# make = lambda _args: (Components()).train(**_args)
# job = Process(target=make,args=( dict(args),))
# job.name = 'Trainer # ' + str(index)
# job.start()
# jobs.append(job)
# # args['gpu']
# print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ])
# while len(jobs)> 0 :
# jobs = [job for job in jobs if job.is_alive()]
# time.sleep(2)
# trainer = Components()
# trainer.train(**args)
# Components.train(**args)
#for args in PIPELINE :
#args['dataset'] = 'combined20190510'
#process = Process(target=Components.train,args=(args,))
#process.name = args['context']
#process.start()
# Components.train(args)