#!/usr/bin/env python3 import json from transport import factory import numpy as np import time import os from multiprocessing import Process, Lock import pandas as pd from google.oauth2 import service_account from google.cloud import bigquery as bq import data.maker import copy from data.params import SYS_ARGS # # The configuration array is now loaded and we will execute the pipe line as follows class Components : lock = Lock() class KEYS : PIPELINE_KEY = 'pipeline' SQL_FILTER = 'filter' @staticmethod def get_filter (**args): if args['qualifier'] == 'IN' : return ' '.join([args['field'],args['qualifier'],'(',args['value'],')']) else: return ' '.join([args['field'],args['qualifier'],args['value']]) @staticmethod def get_logger(**args) : return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) @staticmethod def get(args): """ This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes) The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing) :sql basic sql statement :condition optional condition and filters """ SQL = args['sql'] if Components.KEYS.SQL_FILTER in args : FILTER_KEY = Components.KEYS.SQL_FILTER SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]] # condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')']) condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER]) SQL = " ".join([SQL,'WHERE',condition]) SQL = SQL.replace(':dataset',args['dataset']) #+ " LI " if 'limit' in args : SQL = SQL + ' LIMIT ' + args['limit'] # # let's log the sql query that has been performed here logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}}) credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') return df # return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna() @staticmethod def split(X,MAX_ROWS=3,PART_SIZE=3): return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories) def format_schema(self,schema): _schema = {} for _item in schema : _type = int _value = 0 if _item.field_type == 'FLOAT' : _type =float elif _item.field_type != 'INTEGER' : _type = str _value = '' _schema[_item.name] = _type return _schema def get_ignore(self,**_args) : if 'columns' in _args and 'data' in _args : _df = _args['data'] terms = _args['columns'] return [name for name in _df.columns if np.sum( [int(field in name )for field in terms ]) ] return [] def set_gpu(self,**_args) : if 'gpu' in _args : gpu = _args['gpu'] if type(_args['gpu']) != str else [_args['gpu']] _index = str(gpu[0]) os.environ['CUDA_VISIBLE_DEVICES'] = _index return gpu else : return None def train(self,**args): """ This function will perform training on the basis of a given pointer that reads data """ schema = None if 'file' in args : df = pd.read_csv(args['file']) del args['file'] elif 'data' not in args : reader = factory.instance(**args['store']['source']) if 'row_limit' in args : df = reader.read(sql=args['sql'],limit=args['row_limit']) else: df = reader.read(sql=args['sql']) schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None else: df = args['data'] # # # df = df.fillna('') if schema : _schema = [] for _item in schema : _type = int _value = 0 if _item.field_type == 'FLOAT' : _type =float elif _item.field_type != 'INTEGER' : _type = str _value = '' _schema += [{"name":_item.name,"type":_item.field_type}] df[_item.name] = df[_item.name].fillna(_value).astype(_type) args['schema'] = _schema # df[_item.name] = df[_item.name].astype(_type) _args = copy.deepcopy(args) # _args['store'] = args['store']['source'] _args['data'] = df # # The columns that are continuous should also be skipped because they don't need to be synthesied (like-that) if 'continuous' in args : x_cols = args['continuous'] else: x_cols = [] if 'ignore' in args and 'columns' in args['ignore'] : _cols = self.get_ignore(data=df,columns=args['ignore']['columns']) _args['data'] = df[ list(set(df.columns)- set(_cols))] # # We need to make sure that continuous columns are removed if x_cols : _args['data'] = _args['data'][list(set(_args['data'].columns) - set(x_cols))] if 'gpu' in args : _args['gpu'] = self.set_gpu(gpu=args['gpu']) if 'partition' in args : _args['partition'] = args['partition'] if df.shape[0] and df.shape[0] : # # We have a full blown matrix to be processed data.maker.train(**_args) else: print ("... skipping training !!") if 'autopilot' in ( list(args.keys())) : args['data'] = df print (['autopilot mode enabled ....',args['context']]) self.generate(args) pass def approximate(self,values): """ :param values array of values to be approximated """ if values.dtype in [int,float] : # # @TODO: create bins? r = np.random.dirichlet(values+.001) #-- dirichlet doesn't work on values with zeros _sd = values[values > 0].std() _me = values[values > 0].mean() _mi = values.min() x = [] _type = values.dtype for index in np.arange(values.size) : if np.random.choice([0,1],1)[0] : value = values[index] + (values[index] * r[index]) else : value = values[index] - (values[index] * r[index]) # # randomly shifting the measurements if np.random.choice([0,1],1)[0] and _me > _sd : if np.random.choice([0,1],1)[0] : value = value * np.divide(_me,_sd) else: value = value + (np.divide(_me,_sd)) value = int(value) if _type == int else np.round(value,2) x.append( value) np.random.shuffle(x) return np.array(x) else: return values pass def shuffle(self,_args): if 'data' in args : df = data['data'] else: reader = factory.instance(**args['store']['source']) if 'file' in args : df = pd.read_csv(args['file']) elif 'data' in _args : df = _args['data'] else: if 'row_limit' in args and 'sql' in args: df = reader.read(sql=args['sql'],limit=args['row_limit']) else: df = reader.read(sql=args['sql']) schema = None if 'schema' not in args and hasattr(reader,'meta') and 'file' not in args: schema = reader.meta(table=args['from']) schema = [{"name":_item.name,"type":_item.field_type} for _item in schema] # # We are shufling designated colmns and will be approximating the others # x_cols = [] #-- coumns tobe approximated. _cols = [] #-- columns to be ignored if 'continuous' in args : x_cols = args['continuous'] if 'ignore' in args and 'columns' in args['ignore'] : _cols = self.get_ignore(data=df,columns=args['ignore']['columns']) columns = args['columns'] if 'columns' in args else df.columns columns = list(set(columns) - set(_cols)) for name in columns: i = np.arange(df.shape[0]) np.random.shuffle(i) if name in x_cols : if df[name].unique().size > 0 : df[name] = self.approximate(df.iloc[i][name].fillna(0).values) # df[name] = df[name].astype(str) # pass df.index = np.arange(df.shape[0]) self.post(data=df,schema=schema,store=args['store']['target']) def post(self,**_args) : _schema = _args['schema'] if 'schema' in _args else None writer = factory.instance(**_args['store']) _df = _args['data'] if _schema : columns = [] for _item in _schema : name = _item['name'] _type = str _value = 0 if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] : if _item['type'] == 'DATE' : _df[name] = _df[name].dt.date _df[name] = pd.to_datetime(_df[name],errors='coerce') else: if _item['type'] == 'INTEGER' : _type = np.int64 elif _item['type'] in ['FLOAT','NUMERIC']: _type = np.float64 else: _value = '' _df[name] = _df[name].fillna(_value).astype(_type) columns.append(name) writer.write(_df,schema=_schema,table=args['from']) else: writer.write(_df,table=args['from']) def finalize(self,args): """ This function performs post-processing opertions on a synthetic table i.e : - remove duplicate keys - remove orphaned keys i.e """ reader = factory.instance(**args['store']['source']) logger = factory.instance(**args['store']['logs']) target = args['store']['target']['args']['dataset'] source = args['store']['source']['args']['dataset'] table = args['from'] schema = reader.meta(table=args['from']) # # keys : unique_field = "_".join([args['from'],'id']) if 'unique_fields' not in args else args['unique_fields'] fields = [ item.name if item.name != unique_field else "y."+item.name for item in schema] SQL = [ "SELECT :fields FROM ", "(SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x","INNER JOIN", "(SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table ORDER BY RAND()) y", "ON y.row_number = x.row_number" ] SQL = " ".join(SQL).replace(":fields",",".join(fields)).replace(":table",table).replace(":source",source).replace(":target",target) SQL = SQL.replace(":unique_field",unique_field) # # Use a native job to get this done ... # client = bq.Client.from_service_account_json(args['store']['source']['args']["private_key"]) job = bq.QueryJobConfig() job.destination = client.dataset(target).table(table) job.use_query_cache = True job.allow_large_results = True # job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY) job.write_disposition = "WRITE_TRUNCATE" job.priority = 'BATCH' r = client.query(SQL,location='US',job_config=job) logger.write({"job":r.job_id,"action":"finalize", "args":{"sql":SQL,"source":"".join([source,table]),"destimation":".".join([target,table])}}) # # Keep a log of what just happened... # otable = ".".join([args['store']['source']['args']['dataset'],args['from']]) dtable = ".".join([args['store']['target']['args']['dataset'],args['from']]) def generate(self,args): """ This function will generate data and store it to a given, """ store = args['store']['logs'] store['args']['doc'] = args['context'] logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) ostore = args['store']['target'] writer = factory.instance(**ostore) schema = args['schema'] if 'schema' in args else None if 'data' in args : df = args['data'] else: reader = factory.instance(**args['store']['source']) if 'row_limit' in args : df = reader.read(sql=args['sql'],limit=args['row_limit']) else: df = reader.read(sql=args['sql']) if 'schema' not in args and hasattr(reader,'meta'): schema = reader.meta(table=args['from']) schema = [{"name":_item.name,"type":_item.field_type} for _item in schema] # else: # # # # This will account for autopilot mode ... # df = args['data'] _cast = {} if schema : for _item in schema : dtype = str name = _item['name'] novalue = -1 if _item['type'] in ['INTEGER','NUMERIC']: dtype = np.int64 elif _item['type'] == 'FLOAT' : dtype = np.float64 else: novalue = '' # _cast[schema['name']] = dtype df[name] = df[name].fillna(novalue).astype(dtype) _info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[1]},"schema":schema} logger.write(_info) _dc = pd.DataFrame() # for mdf in df : args['data'] = df.copy() # # The columns that are continuous should also be skipped because they don't need to be synthesied (like-that) if 'continuous' in args : x_cols = args['continuous'] else: x_cols = [] if 'ignore' in args and 'columns' in args['ignore'] : _cols = self.get_ignore(data=df,columns=args['ignore']['columns']) args['data'] = args['data'][ list(set(df.columns)- set(_cols))] # # We need to remove the continuous columns from the data-frame # @TODO: Abstract this !! # real_df = pd.DataFrame() if x_cols : args['data'] = args['data'][list(set(args['data'].columns) - set(x_cols))] real_df = df[x_cols].copy() args['candidates'] = 1 if 'candidates' not in args else int(args['candidates']) if 'gpu' in args : args['gpu'] = self.set_gpu(gpu=args['gpu']) # if 'partition' in args : # args['logs'] = os.sep.join([args['logs'],str(args['partition'])]) _info = {"module":"gan-prep","action":"prune","shape":{"rows":args['data'].shape[0],"columns":args['data'].shape[1]}} logger.write(_info) if args['data'].shape[0] > 0 and args['data'].shape[1] > 0 : candidates = (data.maker.generate(**args)) else: candidates = [df] if 'sql.BQWriter' in ostore['type'] : #table = ".".join([ostore['['dataset'],args['context']]) # writer = factory.instance(**ostore) _columns = None skip_columns = [] _schema = schema if schema : cols = [_item['name'] for _item in _schema] else: cols = df.columns for _df in candidates : # # we need to format the fields here to make sure we have something cohesive # if not skip_columns : # _columns = set(df.columns) - set(_df.columns) if 'ignore' in args and 'columns' in args['ignore'] : skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns']) # for name in args['ignore']['columns'] : # for _name in _df.columns: # if _name in name: # skip_columns.append(_name) # # We perform a series of set operations to insure that the following conditions are met: # - the synthetic dataset only has fields that need to be synthesized # - The original dataset has all the fields except those that need to be synthesized # _df = _df[list(set(_df.columns) - set(skip_columns))].copy() if x_cols : _approx = {} for _col in x_cols : if real_df[_col].unique().size > 0 : _df[_col] = self.approximate(real_df[_col].values) _approx[_col] = { "io":{"min":_df[_col].min().astype(float),"max":_df[_col].max().astype(float),"mean":_df[_col].mean().astype(float),"sd":_df[_col].values.std().astype(float),"missing": _df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":_df[_col].where(_df[_col] == 0).dropna().count().astype(float)}, "real":{"min":real_df[_col].min().astype(float),"max":real_df[_col].max().astype(float),"mean":real_df[_col].mean().astype(float),"sd":real_df[_col].values.std().astype(float),"missing": real_df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":real_df[_col].where(_df[_col] == 0).dropna().count().astype(float)} } else: _df[_col] = -1 logger.write({"module":"gan-generate","action":"approximate","status":_approx}) if set(df.columns) & set(_df.columns) : _columns = set(df.columns) - set(_df.columns) df = df[_columns] # # Let us merge the dataset here and and have a comprehensive dataset _df = pd.DataFrame.join(df,_df) # if _schema : # for _item in _schema : # if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] : # _df[_item['name']] = _df[_item['name']].astype(str) # pass _params = {'data':_df,'store' : ostore} if _schema : _params ['schema'] = _schema self.post(**_params) # if _schema : # writer.write(_df[cols],schema=_schema,table=args['from']) # self.post(data=_df,schema=) # else: # writer.write(_df[cols],table=args['from']) pass # else: # pass # # # # We need to post the generate the data in order to : # # 1. compare immediately # # 2. synthetic copy # # # cols = _dc.columns.tolist() # data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) # # # # performing basic analytics on the synthetic data generated (easy to quickly asses) # # # info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} # # # # @TODO: Send data over to a process for analytics # base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) # cols = _dc.columns.tolist() # for name in cols : # _args['data'][name] = _dc[name] # # # #-- Let us store all of this into bigquery # prefix = args['notify']+'.'+_args['context'] # partition = str(partition) # table = '_'.join([prefix,partition,'io']).replace('__','_') # folder = os.sep.join([args['logs'],args['context'],partition,'output']) # if 'file' in args : # _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) # _pname = os.sep.join([folder,table])+'.csv' # data_comp.to_csv( _pname,index=False) # _args['data'].to_csv(_fname,index=False) # _id = 'path' # else: # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # _pname = os.sep.join([folder,table+'.csv']) # _fname = table.replace('_io','_full_io') # partial = '.'.join(['io',args['context']+'_partial_io']) # complete= '.'.join(['io',args['context']+'_full_io']) # data_comp.to_csv(_pname,index=False) # if 'dump' in args : # print (_args['data'].head()) # else: # Components.lock.acquire() # data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) # _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) # Components.lock.release() # _id = 'dataset' # info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } # if partition : # info ['partition'] = int(partition) # logger.write({"module":"generate","action":"write","input":info} ) if __name__ == '__main__' : filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json' f = open (filename) _config = json.loads(f.read()) f.close() PIPELINE = _config['pipeline'] index = SYS_ARGS['index'] if index.isnumeric() : index = int(SYS_ARGS['index']) else: # # The index provided is a key to a pipeline entry mainly the context # N = len(PIPELINE) f = [i for i in range(0,N) if PIPELINE[i]['context'] == index] index = f[0] if f else 0 # print ("..::: ",PIPELINE[index]['context']) args = (PIPELINE[index]) for key in _config : if key == 'pipeline' or key in args: # # skip in case of pipeline or if key exists in the selected pipeline (provided by index) # continue args[key] = _config[key] args = dict(args,**SYS_ARGS) if 'matrix_size' in args : args['matrix_size'] = int(args['matrix_size']) if 'batch_size' not in args : args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size']) if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid' PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # # @TODO: # Log what was initiated so we have context of this processing ... # GPU_CHIPS = args['gpu'] if 'gpu' in args else None if GPU_CHIPS and type(GPU_CHIPS) != list : GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS] if 'gpu' in SYS_ARGS : args['gpu'] = GPU_CHIPS jobs = [] if 'generate' in SYS_ARGS : # # Let us see if we have partitions given the log folder content = os.listdir( os.sep.join([args['logs'],'train',args['context']])) if 'all-chips' in SYS_ARGS and GPU_CHIPS: index = 0 jobs = [] for _gpu in GPU_CHIPS : _args = copy.deepcopy(args) _args['gpu'] = [int(_gpu)] _args['partition'] = int(_gpu) #index index += 1 make = lambda _params: (Components()).generate(_params) job = Process(target=make,args=( dict(_args),)) job.name = 'Trainer # ' + str(index) job.start() jobs.append(job) pass else: generator = Components() generator.generate(args) elif 'shuffle' in SYS_ARGS : index = 0 if GPU_CHIPS and 'all-chips' in SYS_ARGS: for index in GPU_CHIPS : publisher = lambda _params: ( Components() ).shuffle(_params) job = Process (target = publisher,args=( args,)) job.name = 'Shuffler #' + str(index) job.start() jobs.append(job) else: shuffler = Components() shuffler.shuffle(args) pass elif 'train' in SYS_ARGS: # DATA = np.array_split(DATA,PART_SIZE) # # Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition # @TODO: Find better name for partition # if GPU_CHIPS and 'all-chips' in SYS_ARGS: index = 0 print (['... launching ',len(GPU_CHIPS),' jobs',args['context']]) for _gpu in GPU_CHIPS : _args = copy.deepcopy(args) _args['gpu'] = [int(_gpu)] _args['partition'] = int(_gpu) #index index += 1 make = lambda _params: (Components()).train(**_params) job = Process(target=make,args=( _args,)) job.name = 'Trainer # ' + str(index) job.start() jobs.append(job) else: # # The choice of the chip will be made internally agent = Components() agent.train(**args) # # If we have any obs we should wait till they finish # DIRTY = 0 while len(jobs)> 0 : DIRTY =1 jobs = [job for job in jobs if job.is_alive()] time.sleep(2) if DIRTY: print (["..:: jobs finished "]) # # We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations # print (['finalize' in SYS_ARGS, ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) ]) if 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) : # # We should pull all the primary keys and regenerate them in order to insure some form of consistency # (Components()).finalize(args) # finalize(args) pass # jobs = [] # for index in range(0,PART_SIZE) : # if 'focus' in args and int(args['focus']) != index : # continue # args['part_size'] = PART_SIZE # args['partition'] = index # args['data'] = DATA[index] # if int(args['num_gpu']) > 1 : # args['gpu'] = index # else: # args['gpu']=0 # make = lambda _args: (Components()).train(**_args) # job = Process(target=make,args=( dict(args),)) # job.name = 'Trainer # ' + str(index) # job.start() # jobs.append(job) # # args['gpu'] # print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ]) # while len(jobs)> 0 : # jobs = [job for job in jobs if job.is_alive()] # time.sleep(2) # trainer = Components() # trainer.train(**args) # Components.train(**args) #for args in PIPELINE : #args['dataset'] = 'combined20190510' #process = Process(target=Components.train,args=(args,)) #process.name = args['context'] #process.start() # Components.train(args)