490 lines
15 KiB
Python
490 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
from transport import factory
|
|
import numpy as np
|
|
import time
|
|
import os
|
|
from multiprocessing import Process, Lock
|
|
import pandas as pd
|
|
from google.oauth2 import service_account
|
|
from google.cloud import bigquery as bq
|
|
import data.maker
|
|
import copy
|
|
from data.params import SYS_ARGS
|
|
|
|
#
|
|
# The configuration array is now loaded and we will execute the pipe line as follows
|
|
|
|
class Components :
|
|
lock = Lock()
|
|
class KEYS :
|
|
PIPELINE_KEY = 'pipeline'
|
|
SQL_FILTER = 'filter'
|
|
@staticmethod
|
|
def get_filter (**args):
|
|
if args['qualifier'] == 'IN' :
|
|
return ' '.join([args['field'],args['qualifier'],'(',args['value'],')'])
|
|
else:
|
|
return ' '.join([args['field'],args['qualifier'],args['value']])
|
|
@staticmethod
|
|
def get_logger(**args) :
|
|
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
@staticmethod
|
|
def get(args):
|
|
"""
|
|
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
|
|
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
|
|
:sql basic sql statement
|
|
:condition optional condition and filters
|
|
"""
|
|
SQL = args['sql']
|
|
if Components.KEYS.SQL_FILTER in args :
|
|
FILTER_KEY = Components.KEYS.SQL_FILTER
|
|
SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]]
|
|
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
|
|
|
|
condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER])
|
|
SQL = " ".join([SQL,'WHERE',condition])
|
|
|
|
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
|
|
|
|
if 'limit' in args :
|
|
SQL = SQL + ' LIMIT ' + args['limit']
|
|
#
|
|
# let's log the sql query that has been performed here
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}})
|
|
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
|
|
return df
|
|
|
|
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
|
|
@staticmethod
|
|
def split(X,MAX_ROWS=3,PART_SIZE=3):
|
|
|
|
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
|
|
def format_schema(self,schema):
|
|
_schema = {}
|
|
for _item in schema :
|
|
_type = int
|
|
_value = 0
|
|
if _item.field_type == 'FLOAT' :
|
|
_type =float
|
|
elif _item.field_type != 'INTEGER' :
|
|
_type = str
|
|
_value = ''
|
|
_schema[_item.name] = _type
|
|
return _schema
|
|
def get_ignore(self,**_args) :
|
|
if 'columns' in _args and 'data' in _args :
|
|
_df = _args['data']
|
|
terms = _args['columns']
|
|
return [name for name in _df.columns if np.sum( [int(field in name )for field in terms ]) > 0 ]
|
|
return []
|
|
|
|
def train(self,**args):
|
|
"""
|
|
This function will perform training on the basis of a given pointer that reads data
|
|
|
|
"""
|
|
schema = None
|
|
if 'file' in args :
|
|
|
|
df = pd.read_csv(args['file'])
|
|
del args['file']
|
|
elif 'data' not in args :
|
|
reader = factory.instance(**args['store']['source'])
|
|
if 'row_limit' in args :
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None
|
|
else:
|
|
df = args['data']
|
|
|
|
#
|
|
#
|
|
# df = df.fillna('')
|
|
if schema :
|
|
_schema = []
|
|
for _item in schema :
|
|
_type = int
|
|
_value = 0
|
|
if _item.field_type == 'FLOAT' :
|
|
_type =float
|
|
elif _item.field_type != 'INTEGER' :
|
|
_type = str
|
|
_value = ''
|
|
_schema += [{"name":_item.name,"type":_item.field_type}]
|
|
df[_item.name] = df[_item.name].fillna(_value).astype(_type)
|
|
args['schema'] = _schema
|
|
# df[_item.name] = df[_item.name].astype(_type)
|
|
_args = copy.deepcopy(args)
|
|
# _args['store'] = args['store']['source']
|
|
_args['data'] = df
|
|
#
|
|
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
else:
|
|
x_cols = []
|
|
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
_args['data'] = df[ list(set(df.columns)- set(_cols))]
|
|
#
|
|
# We need to make sure that continuous columns are removed
|
|
if x_cols :
|
|
_args['data'] = df[list(set(df.columns) - set(x_cols))]
|
|
data.maker.train(**_args)
|
|
|
|
if 'autopilot' in ( list(args.keys())) :
|
|
|
|
args['data'] = df
|
|
print (['autopilot mode enabled ....',args['context']])
|
|
self.generate(args)
|
|
|
|
pass
|
|
|
|
def approximate(self,values):
|
|
"""
|
|
:param values array of values to be approximated
|
|
"""
|
|
if values.dtype in [int,float] :
|
|
r = np.random.dirichlet(values)
|
|
x = []
|
|
_type = values.dtype
|
|
for index in np.arange(values.size) :
|
|
|
|
if np.random.choice([0,1],1)[0] :
|
|
value = values[index] + (values[index] * r[index])
|
|
else :
|
|
value = values[index] - (values[index] * r[index])
|
|
value = int(value) if _type == int else np.round(value,2)
|
|
x.append( value)
|
|
np.random.shuffle(x)
|
|
return np.array(x)
|
|
else:
|
|
return values
|
|
pass
|
|
|
|
|
|
# @staticmethod
|
|
def generate(self,args):
|
|
"""
|
|
This function will generate data and store it to a given,
|
|
"""
|
|
store = args['store']['logs']
|
|
store['doc'] = args['context']
|
|
logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
|
|
ostore = args['store']['target']
|
|
writer = factory.instance(**ostore)
|
|
|
|
schema = args['schema'] if 'schema' in args else None
|
|
if 'data' in args :
|
|
|
|
df = args['data']
|
|
else:
|
|
|
|
reader = factory.instance(**args['store']['source'])
|
|
if 'row_limit' in args :
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
if 'schema' not in args and hasattr(reader,'meta'):
|
|
schema = reader.meta(table=args['from'])
|
|
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
|
|
|
|
|
|
# else:
|
|
# #
|
|
# # This will account for autopilot mode ...
|
|
# df = args['data']
|
|
|
|
_info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[0]}}
|
|
|
|
|
|
_dc = pd.DataFrame()
|
|
# for mdf in df :
|
|
args['data'] = df
|
|
#
|
|
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
else:
|
|
x_cols = []
|
|
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
args['data'] = df[ list(set(df.columns)- set(_cols))]
|
|
#
|
|
# We need to remove the continuous columns from the data-frame
|
|
# @TODO: Abstract this !!
|
|
#
|
|
if x_cols :
|
|
args['data'] = df[list(set(df.columns) - set(x_cols))]
|
|
|
|
args['candidates'] = 1 if 'candidates' not in args else int(args['candidates'])
|
|
|
|
candidates = (data.maker.generate(**args))
|
|
if 'sql.BQWriter' in ostore['type'] :
|
|
#table = ".".join([ostore['['dataset'],args['context']])
|
|
# writer = factory.instance(**ostore)
|
|
_columns = None
|
|
skip_columns = []
|
|
_schema = schema
|
|
if schema :
|
|
cols = [_item['name'] for _item in _schema]
|
|
else:
|
|
cols = df.columns
|
|
for _df in candidates :
|
|
#
|
|
# we need to format the fields here to make sure we have something cohesive
|
|
#
|
|
|
|
if not skip_columns :
|
|
# _columns = set(df.columns) - set(_df.columns)
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns'])
|
|
# for name in args['ignore']['columns'] :
|
|
# for _name in _df.columns:
|
|
# if _name in name:
|
|
# skip_columns.append(_name)
|
|
if x_cols :
|
|
for _col in x_cols :
|
|
_df[_col] = self.approximate(df[_col].fillna(-1))
|
|
#
|
|
# We perform a series of set operations to insure that the following conditions are met:
|
|
# - the synthetic dataset only has fields that need to be synthesized
|
|
# - The original dataset has all the fields except those that need to be synthesized
|
|
#
|
|
|
|
_df = _df[list(set(_df.columns) - set(skip_columns))]
|
|
|
|
if set(df.columns) & set(_df.columns) :
|
|
_columns = set(df.columns) - set(_df.columns)
|
|
df = df[_columns]
|
|
|
|
#
|
|
# Let us merge the dataset here and and have a comprehensive dataset
|
|
|
|
_df = pd.DataFrame.join(df,_df)
|
|
if _schema :
|
|
for _item in _schema :
|
|
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
|
|
_df[_item['name']] = _df[_item['name']].astype(str)
|
|
|
|
pass
|
|
if _schema :
|
|
writer.write(_df[cols],schema=_schema,table=args['from'])
|
|
else:
|
|
writer.write(_df[cols],table=args['from'])
|
|
# writer.write(df,table=table)
|
|
pass
|
|
else:
|
|
pass
|
|
|
|
|
|
# #
|
|
# # We need to post the generate the data in order to :
|
|
# # 1. compare immediately
|
|
# # 2. synthetic copy
|
|
# #
|
|
|
|
# cols = _dc.columns.tolist()
|
|
|
|
# data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
|
|
# #
|
|
# # performing basic analytics on the synthetic data generated (easy to quickly asses)
|
|
# #
|
|
# info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}}
|
|
|
|
# #
|
|
# # @TODO: Send data over to a process for analytics
|
|
|
|
# base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
|
|
# cols = _dc.columns.tolist()
|
|
# for name in cols :
|
|
# _args['data'][name] = _dc[name]
|
|
|
|
# #
|
|
# #-- Let us store all of this into bigquery
|
|
# prefix = args['notify']+'.'+_args['context']
|
|
# partition = str(partition)
|
|
# table = '_'.join([prefix,partition,'io']).replace('__','_')
|
|
# folder = os.sep.join([args['logs'],args['context'],partition,'output'])
|
|
# if 'file' in args :
|
|
|
|
# _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
|
|
# _pname = os.sep.join([folder,table])+'.csv'
|
|
# data_comp.to_csv( _pname,index=False)
|
|
# _args['data'].to_csv(_fname,index=False)
|
|
|
|
# _id = 'path'
|
|
# else:
|
|
|
|
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
# _pname = os.sep.join([folder,table+'.csv'])
|
|
# _fname = table.replace('_io','_full_io')
|
|
# partial = '.'.join(['io',args['context']+'_partial_io'])
|
|
# complete= '.'.join(['io',args['context']+'_full_io'])
|
|
# data_comp.to_csv(_pname,index=False)
|
|
# if 'dump' in args :
|
|
# print (_args['data'].head())
|
|
# else:
|
|
# Components.lock.acquire()
|
|
# data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
|
|
# _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000)
|
|
# Components.lock.release()
|
|
# _id = 'dataset'
|
|
# info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
|
|
# if partition :
|
|
# info ['partition'] = int(partition)
|
|
# logger.write({"module":"generate","action":"write","input":info} )
|
|
|
|
|
|
|
|
if __name__ == '__main__' :
|
|
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
|
|
f = open (filename)
|
|
_config = json.loads(f.read())
|
|
f.close()
|
|
PIPELINE = _config['pipeline']
|
|
index = SYS_ARGS['index']
|
|
if index.isnumeric() :
|
|
index = int(SYS_ARGS['index'])
|
|
else:
|
|
#
|
|
# The index provided is a key to a pipeline entry mainly the context
|
|
#
|
|
N = len(PIPELINE)
|
|
f = [i for i in range(0,N) if PIPELINE[i]['context'] == index]
|
|
index = f[0] if f else 0
|
|
#
|
|
|
|
print ("..::: ",PIPELINE[index]['context'])
|
|
args = (PIPELINE[index])
|
|
for key in _config :
|
|
if key == 'pipeline' or key in args:
|
|
#
|
|
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
|
|
#
|
|
continue
|
|
args[key] = _config[key]
|
|
|
|
args = dict(args,**SYS_ARGS)
|
|
if 'matrix_size' in args :
|
|
args['matrix_size'] = int(args['matrix_size'])
|
|
if 'batch_size' not in args :
|
|
args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
|
|
if 'dataset' not in args :
|
|
args['dataset'] = 'combined20191004v2_deid'
|
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
|
#
|
|
# @TODO:
|
|
# Log what was initiated so we have context of this processing ...
|
|
#
|
|
# if 'listen' not in SYS_ARGS :
|
|
# if 'file' in args :
|
|
# DATA = pd.read_csv(args['file']) ;
|
|
# schema = []
|
|
# else:
|
|
# DATA = Components().get(args)
|
|
# client = bq.Client.from_service_account_json(args["private_key"])
|
|
# schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
|
|
|
|
# COLUMNS = DATA.columns
|
|
# DATA = np.array_split(DATA,PART_SIZE)
|
|
# args['schema'] = schema
|
|
if 'generate' in SYS_ARGS :
|
|
#
|
|
# Let us see if we have partitions given the log folder
|
|
|
|
content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
|
|
generator = Components()
|
|
|
|
# if ''.join(content).isnumeric() :
|
|
# #
|
|
# # we have partitions we are working with
|
|
|
|
# jobs = []
|
|
|
|
# # columns = DATA.columns.tolist()
|
|
|
|
# # DATA = np.array_split(DATA,PART_SIZE)
|
|
|
|
# for index in range(0,PART_SIZE) :
|
|
# if 'focus' in args and int(args['focus']) != index :
|
|
# #
|
|
# # This handles failures/recoveries for whatever reason
|
|
# # If we are only interested in generating data for a given partition
|
|
# continue
|
|
# # index = id.index(id)
|
|
|
|
# args['partition'] = index
|
|
# args['data'] = DATA[index]
|
|
# if int(args['num_gpu']) > 1 :
|
|
# args['gpu'] = index
|
|
# else:
|
|
# args['gpu']=0
|
|
|
|
# make = lambda _args: (Components()).generate(_args)
|
|
# job = Process(target=make,args=(args,))
|
|
# job.name = 'generator # '+str(index)
|
|
# job.start()
|
|
# jobs.append(job)
|
|
# # if len(jobs) == 1 :
|
|
# # job.join()
|
|
|
|
# print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ])
|
|
# while len(jobs)> 0 :
|
|
# jobs = [job for job in jobs if job.is_alive()]
|
|
# time.sleep(2)
|
|
|
|
# # generator.generate(args)
|
|
# else:
|
|
# generator.generate(args)
|
|
# Components.generate(args)
|
|
generator.generate(args)
|
|
|
|
else:
|
|
|
|
# DATA = np.array_split(DATA,PART_SIZE)
|
|
agent = Components()
|
|
agent.train(**args)
|
|
# jobs = []
|
|
# for index in range(0,PART_SIZE) :
|
|
# if 'focus' in args and int(args['focus']) != index :
|
|
# continue
|
|
# args['part_size'] = PART_SIZE
|
|
# args['partition'] = index
|
|
# args['data'] = DATA[index]
|
|
# if int(args['num_gpu']) > 1 :
|
|
# args['gpu'] = index
|
|
# else:
|
|
# args['gpu']=0
|
|
|
|
# make = lambda _args: (Components()).train(**_args)
|
|
# job = Process(target=make,args=( dict(args),))
|
|
# job.name = 'Trainer # ' + str(index)
|
|
# job.start()
|
|
# jobs.append(job)
|
|
# # args['gpu']
|
|
# print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ])
|
|
# while len(jobs)> 0 :
|
|
# jobs = [job for job in jobs if job.is_alive()]
|
|
# time.sleep(2)
|
|
|
|
# trainer = Components()
|
|
# trainer.train(**args)
|
|
|
|
|
|
# Components.train(**args)
|
|
#for args in PIPELINE :
|
|
#args['dataset'] = 'combined20190510'
|
|
#process = Process(target=Components.train,args=(args,))
|
|
#process.name = args['context']
|
|
#process.start()
|
|
# Components.train(args)
|