From ee0165de0188faba09c55e518fca6c2e5761f287 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 24 Mar 2022 11:38:52 -0500 Subject: [PATCH] bug fixes: enhancements --- binder.py | 377 +++++++++++++++++++++++++++++++++ data/gan.py | 54 +---- data/maker/__init__.py | 6 +- data/maker/prepare/__init__.py | 58 +---- pipeline.py | 330 ++++++++++++++--------------- 5 files changed, 543 insertions(+), 282 deletions(-) create mode 100644 binder.py diff --git a/binder.py b/binder.py new file mode 100644 index 0000000..5379d62 --- /dev/null +++ b/binder.py @@ -0,0 +1,377 @@ +#!/usr/bin/env python3 +""" +This file will perform basic tasks to finalize the GAN process by performing the following : + - basic stats & analytics + - rebuild io to another dataset +""" +import pandas as pd +import numpy as np +from multiprocessing import Process, Lock +from google.oauth2 import service_account +from google.cloud import bigquery as bq +import transport +from data.params import SYS_ARGS +import json + +import pandas as pd +import numpy as np +from google.oauth2 import service_account +import json + +# path = '../curation-prod.json' +# credentials = service_account.Credentials.from_service_account_file(path) +# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard') +filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config'] +f = open(filename) +config = json.loads(f.read()) +args = config['pipeline'] +f.close() + +def _formatSQL(**_args): + """ + This function will build the _map for a given segment + """ + sql = """ + select DISTINCT x.person_id synthetic,y.person_id original + FROM :synthetic.:table x + INNER JOIN :original.:table y on x.person_id in (:ids) + AND x.person_id <> y.person_id AND x.gender_source_value = y.gender_source_value + AND x.year_of_birth = y.year_of_birth + ORDER BY 1 + """ + table= _args['table'] + original,synthetic = _args['schema']['original'],_args['schema']['synthetic'] + _ids = np.array(_args['ids']).astype(str) + return sql.replace(":ids",",".join(_ids)).replace(":synthetic",synthetic).replace(":original",original).replace(":table",table) +def _addCounts(**_args) : + store = _args['store'] + sql = _args['sql'] + reader = transport.factory.instance(**store['source']) + _df = reader.read(sql=sql) + _ids = _df.synthetic.unique() + _counts = [ np.sum(_df.synthetic == value) for value in _ids] + original = [_df[_df.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids] + _df = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts}) + + # + # We can post this to the backend ... + # + table = '_map' #-- Yes this is hard-coded + writer = transport.factory.instance(**dict(store['target'],**{"parallel":True,"table":table})) + # if writer.has(table=table) is False: + # writer.write(_df) + # else: + _schema = [{"name":name,"type":"INTEGER"} for name in _df.columns] + writer.write(_df,schema=_schema) + + + + + +def Init(**_args) : + """ + This function will build a map of the synthetic to real individuals. + The assumption is that the synthesized data is stored in the same data-store as the original the parameters provided are : + :param store object from the configuration file with source,target entries + :param table name of the original/synthetic tables (they should be the same) + :param feat. featuress/attributes ... demographics to account for + """ + store = _args['store'] + reader = transport.factory.instance(**store['source']) + original,synthetic = _args['schema']['original'],_args['schema']['synthetic'] + table = _args['table'] + sql = _args['sql'].replace(':synthetic',synthetic).replace(':original',original).replace(':table',table) + + _map = reader.read(sql=sql) + + + + k = _args['k'] if 'k' in _args else 2 + # _iodf = reader.read(table=table) + # _ids = _iodf['person_id'].unique().tolist() + # x_ = np.array_split(_ids,1000) + jobs = [] + # for _items in x_ : + # _p = {"ids":_items,"schema":_args['schema'],'store':store,'table':table} + # sql = _formatSQL(**_p) + # _p['sql'] = sql + # _apply = lambda params: _addCounts(**params) + # thread = Process(target=_apply,args=(_p,)) + # thread.start() + # jobs.append(thread) + + # return jobs + # + # We have performed a m:m (many-to-many) relationship with original participants and synthetic participants + # The goal is to obtain a singular map against which records will be migrated + # + print (['... computing counts (k)']) + _ids = _map.synthetic.unique() + _counts = [ np.sum(_map.synthetic == value) for value in _ids] + original = [_map[_map.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids] + print (['Building k-classes/groups']) + _mdf = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts}) + i = _mdf.apply(lambda row: row.counts >= k,axis=1) + _mdf = _mdf[i] + # + # Log what just happened here so we know about the equivalence classes, + # {"module":"binder","action":"map-generation","input":{"k":k,"rows":{"synthetic":_mdf.shape[0],"original":len(_counts)}}} + + return _mdf + # + # now we are posting this to target storage ... + # +def ApplyOn (**_args): + """ + This function will rewrite SQL that applies the synthetic identifier to the entries of the pipeline + We assume that the _map has two attributes (synthetic and original) + :param store + :param _config + """ + store_args = _args['store'] + _config = _args['config'] + + table = _config['from'] + reader = transport.factory.instance(**dict(store_args['source'],**{"table":table})) + attr = reader.read(limit=1).columns.tolist() + original_key = _args['original_key'] #-- assuming referential integrity + + # synthetic_key= columns['synthetic'] + # mapped_original=columns['orginal'] + fields = list(set(attr) - set([original_key])) + sql = "select _map.synthetic as :original_key,:fields from :original_schema.:table inner join :synthetic_schema._map on _map.original = :table.:original_key" + sql = sql.replace(":table",table).replace(":fields",",".join(fields)) + sql = sql.replace(":original_key",original_key) + _schema = _args['schema'] + sql = sql.replace(":original_schema",_schema['original']).replace(":synthetic_schema",_schema['synthetic']) + + return reader.read (sql=sql) + +if __name__ == '__main__' : + pass + +# class Analytics : +# """ +# This class will compile basic analytics about a given dataset i.e compare original/synthetic +# """ +# @staticmethod +# def distribution(**args): +# context = args['context'] +# df = args['data'] +# # +# #-- This data frame counts unique values for each feature (space) +# df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts +# # +# #-- Get the distributions for common values +# # +# names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False] +# ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0) +# ddf[context] = ddf.index + +# pass +# def distance(**args): +# """ +# This function will measure the distance between +# """ +# pass +# class Utils : +# @staticmethod +# def log(**args): +# logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"}) +# logger.write(args) +# logger.close() +# class get : +# @staticmethod +# def pipeline(table,path) : +# # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] +# config = json.loads((open(path)).read()) +# pipeline = config['pipeline'] +# # return [ item for item in pipeline if item['context'] in contexts] +# pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table] +# Utils.log(module=table,action='init',input={"pipeline":pipeline}) +# return pipeline +# @staticmethod +# def sql(**args) : +# """ +# This function is intended to build SQL query for the remainder of the table that was not synthesized +# :config configuration entries +# :from source of the table name +# :dataset name of the source dataset + +# """ +# SQL = ["SELECT * FROM :from "] +# SQL_FILTER = [] +# NO_FILTERS_FOUND = True +# # pipeline = Utils.get.config(**args) +# pipeline = args['pipeline'] +# REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='} +# for item in pipeline : + + +# if 'filter' in item : +# if NO_FILTERS_FOUND : +# NO_FILTERS_FOUND = False +# SQL += ['WHERE'] +# # +# # Let us load the filter in the SQL Query +# FILTER = item['filter'] +# QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()] +# SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])] +# src = ".".join([args['dataset'],args['from']]) +# SQL += [" AND ".join(SQL_FILTER)] +# # +# # let's pull the field schemas out of the table definition +# # +# Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) }) +# return " ".join(SQL).replace(":from",src) + + +# def mk(**args) : +# dataset = args['dataset'] +# client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key']) +# # +# # let us see if we have a dataset handy here +# # +# datasets = list(client.list_datasets()) +# found = [item for item in datasets if item.dataset_id == dataset] + +# if not found : + +# return client.create_dataset(dataset) +# return found[0] + +# def move (args): +# """ +# This function will move a table from the synthetic dataset into a designated location +# This is the simplest case for finalizing a synthetic data set +# :private_key +# """ +# pipeline = Utils.get.pipeline(args['from'],args['config']) +# _args = json.loads((open(args['config'])).read()) +# _args['pipeline'] = pipeline +# # del _args['pipeline'] +# args = dict(args,**_args) +# # del args['pipeline'] +# # private_key = args['private_key'] +# client = bq.Client.from_service_account_json(args['private_key']) + +# dataset = args['dataset'] +# if pipeline : +# SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline] +# SQL += [Utils.get.sql(**args)] +# SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io')) +# else: +# # +# # moving a table to a designated location +# tablename = args['from'] +# if 'sql' not in args : +# SQL = "SELECT * FROM :dataset.:table" +# else: +# SQL = args['sql'] +# SQL = SQL.replace(":dataset",dataset).replace(":table",tablename) +# Utils.log(module=args['from'],action='sql',input={'sql':SQL}) +# # +# # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table +# # + + + +# odataset = mk(dataset=dataset+'_io',client=client) +# # SQL = "SELECT * FROM io.:context_full_io".replace(':context',context) +# config = bq.QueryJobConfig() +# config.destination = client.dataset(odataset.dataset_id).table(args['from']) +# config.use_query_cache = True +# config.allow_large_results = True +# config.priority = 'INTERACTIVE' +# # +# # + +# schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema +# fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema] +# SQL = SQL.replace("*"," , ".join(fields)) +# # print (SQL) +# out = client.query(SQL,location='US',job_config=config) +# Utils.log(module=args['from'],action='move',input={'job':out.job_id}) +# return (out.job_id) + + + + +# import pandas as pd +# import numpy as np +# from google.oauth2 import service_account +# import json + +# # path = '../curation-prod.json' +# # credentials = service_account.Credentials.from_service_account_file(path) +# # df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard') +# filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config'] +# f = open(filename) +# config = json.loads(f.read()) +# args = config['pipeline'] +# f.close() + + +# if __name__ == '__main__' : +# """ +# Usage : +# finalize -- --contexts --from +# """ + +# if 'move' in SYS_ARGS : + +# if 'init' in SYS_ARGS : +# dep = config['dep'] if 'dep' in config else {} +# info = [] + +# if 'queries' in dep : +# info += dep['queries'] +# print ('________') +# if 'tables' in dep : +# info += dep['tables'] +# args = {} +# jobs = [] +# for item in info : +# args = {} +# if type(item) == str : +# args['from'] = item +# name = item +# else: +# args = item +# name = item['from'] +# args['config'] = SYS_ARGS['config'] +# # args['pipeline'] = [] +# job = Process(target=move,args=(args,)) +# job.name = name +# jobs.append(job) +# job.start() + + +# # while len(jobs) > 0 : +# # jobs = [job for job in jobs if job.is_alive()] +# # time.sleep(1) + + +# else: +# move(SYS_ARGS) +# # # table = SYS_ARGS['from'] +# # # args = dict(config,**{"private_key":"../curation-prod.json"}) +# # args = dict(args,**SYS_ARGS) +# # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']] +# # log = [] +# # if contexts : +# # args['contexts'] = contexts +# # log = move(**args) + +# # else: +# # tables = args['from'].split(',') +# # for name in tables : +# # name = name.strip() +# # args['from'] = name +# # log += [move(**args)] +# # print ("\n".join(log)) + + + +# else: +# print ("NOT YET READY !") \ No newline at end of file diff --git a/data/gan.py b/data/gan.py index 0008489..f5705ea 100644 --- a/data/gan.py +++ b/data/gan.py @@ -622,7 +622,7 @@ class Predict(GNet): candidates.append(np.array([np.round(row).astype(int) for row in _matrix])) # return candidates[0] if len(candidates) == 1 else candidates - return candidates + return [candidates [0]] def _apply(self,**args): # print (self.train_dir) @@ -768,55 +768,3 @@ class Predict(GNet): # return df.to_dict(orient='list') return _matrix - -if __name__ == '__main__' : - # - # Now we get things done ... - column = SYS_ARGS['column'] - column_id = SYS_ARGS['id'] if 'id' in SYS_ARGS else 'person_id' - column_id = column_id.split(',') if ',' in column_id else column_id - df = pd.read_csv(SYS_ARGS['raw-data']) - LABEL = pd.get_dummies(df[column_id]).astype(np.float32).values - - context = SYS_ARGS['raw-data'].split(os.sep)[-1:][0][:-4] - if set(['train','learn']) & set(SYS_ARGS.keys()): - - df = pd.read_csv(SYS_ARGS['raw-data']) - - # cols = SYS_ARGS['column'] - # _map,_df = (Binary()).Export(df) - # i = np.arange(_map[column]['start'],_map[column]['end']) - max_epochs = np.int32(SYS_ARGS['max_epochs']) if 'max_epochs' in SYS_ARGS else 10 - # REAL = _df[:,i] - REAL = pd.get_dummies(df[column]).astype(np.float32).values - LABEL = pd.get_dummies(df[column_id]).astype(np.float32).values - trainer = Train(context=context,max_epochs=max_epochs,real=REAL,label=LABEL,column=column,column_id=column_id) - trainer.apply() - - - - - # - # We should train upon this data - # - # -- we need to convert the data-frame to binary matrix, given a column - # - pass - elif 'generate' in SYS_ARGS: - values = df[column].unique().tolist() - values.sort() - - p = Predict(context=context,label=LABEL,values=values,column=column) - p.load_meta(column) - r = p.apply() - # print (df) - # print () - df[column] = r[column] - # print (df) - - - else: - print (SYS_ARGS.keys()) - print (__doc__) - pass - diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 9db2b8d..a7d8d69 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -96,7 +96,11 @@ def train (**_args): # This args['store'] = copy.deepcopy(_args['store']['logs']) - args['store']['args']['doc'] = _args['context'] + if 'args' in _args['store']: + args['store']['args']['doc'] = _args['context'] + else: + + args['store']['doc'] = _args['context'] logger = factory.instance(**args['store']) args['logger'] = logger diff --git a/data/maker/prepare/__init__.py b/data/maker/prepare/__init__.py index 5ace56a..6e67cb2 100644 --- a/data/maker/prepare/__init__.py +++ b/data/maker/prepare/__init__.py @@ -39,26 +39,10 @@ class Input : - provide a feature space, and rows (matrix profile) - a data index map """ - # def learn(self,**_args): - # """ - # This function is designed to learn about, the data and persist - # :param table - # :param store - # """ - # table = _args['table'] - # reader = transport.factory.instance(**_args['store']) - # df = reader.read(table=table,limit=1) - # self.columns = df.columns.tolist() - - # self._metadf = pd.DataFrame(self.df[self._columns].dtypes.values.astype(str)).T #,self._columns] - # self._metadf.columns = self._columns - - # sql = "SELECT :fields from :table".replace(":table",table) - def __init__(self,**_args): """ - :param table + :param data :param store data-store parameters/configuration :param sql sql query that pulls a representative sample of the data """ @@ -70,29 +54,18 @@ class Input : pass else: self._initsql(**_args) + # + # We need to have a means to map of values,columns and vector positions in order + # to perform convert and revert to and from binary + # self._map = {} if 'map' not in _args else _args['map'] - # self._metadf = pd.DataFrame(self.df[self._columns].dtypes.values.astype(str)).T #,self._columns] - # self._metadf.columns = self._columns - # if 'gpu' in _args and 'GPU' in os.environ: - - # np = cp - # index = int(_args['gpu']) - # np.cuda.Device(index).use() - # print(['..:: GPU ',index]) def _initsql(self,**_args): """ This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized :param store data-store configuration - :param sql sql query to be applied to the transported data :param columns list of columns to be """ - # _store_args = _args['store'] - # reader = transport.factory.instance(**_store_args) - # sql = _args['sql'] - - # self.df = reader.read(sql=_args['sql']) - if 'columns' not in _args : self._initcols(data=self.df) @@ -128,14 +101,6 @@ class Input : :param data data-frame that holds the data :param columns columns that need to be synthesized if any """ - # - # setting class-level variables to be reused across the class - # self.df = _args['data'] - row_count = self.df.shape[0] - # self.columns = self.df.columns - # self._metadf = self.df.apply(lambda col: col.unique().size) - # _df = pd.DataFrame(self.df.apply(lambda col: col.unique().size )).T - # cols = None if 'columns' not in _args else _args['columns'] self._initcols(**_args) def convert(self,**_args): @@ -247,16 +212,3 @@ class Input : return cols,_matrix -if __name__ == '__main__' : - df = pd.read_csv('../../sample.csv') - _input = Input(data=df,columns=['age','race']) - _m = _input.convert(column='age') - print (_m.shape) - print (_input.revert(matrix=_m,column='age')) - print (_input._metadf) - -# _args = {"store":{"type":"sql.BQReader","args":{"service_key":"/home/steve/dev/aou/accounts/curation-prod.json"}}} -# _args['table'] = 'io.observation' -# _i = Input(**_args) -# df = pd.read_csv('../../sample.csv') -# print (Input.ToBinary(df.age)) \ No newline at end of file diff --git a/pipeline.py b/pipeline.py index 296d4d5..5fb62fe 100644 --- a/pipeline.py +++ b/pipeline.py @@ -101,11 +101,14 @@ class Components : df = pd.read_csv(args['file']) del args['file'] elif 'data' not in args : + reader = factory.instance(**args['store']['source']) + + if 'row_limit' in args : df = reader.read(sql=args['sql'],limit=args['row_limit']) else: - df = reader.read(sql=args['sql']) + df = reader.read(sql=args['sql']) schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None else: df = args['data'] @@ -241,6 +244,7 @@ class Components : df.index = np.arange(df.shape[0]) self.post(data=df,schema=schema,store=args['store']['target']) def post(self,**_args) : + table = _args['from'] if 'from' in _args else _args['store']['table'] _schema = _args['schema'] if 'schema' in _args else None writer = factory.instance(**_args['store']) _df = _args['data'] @@ -251,13 +255,13 @@ class Components : _type = str _value = 0 if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] : - if _item['type'] == 'DATE' : + if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] : # # There is an issue with missing dates that needs to be resolved. # for some reason a missing date/time here will cause the types to turn into timestamp (problem) # The following is a hack to address the issue (alas) assuming 10 digit dates and 'NaT' replaces missing date values (pandas specifications) # - _df[name] = _df[name].apply(lambda value: '' if str(value) == 'NaT' else str(value)[:10]) + _df[name] = _df[name].apply(lambda value: None if str(value) == 'NaT' else (str(value)[:10]) if _item['type'] in ['DATE','DATETIME'] else str(value)) #_df[name] = _df[name].dt.date # _df[name] = pd.to_datetime(_df[name].fillna(''),errors='coerce') else: @@ -274,11 +278,33 @@ class Components : _value = '' _df[name] = _df[name].fillna(_value) #.astype(_type) columns.append(name) - print () - print (_df) - writer.write(_df.astype(object),schema=_schema,table=args['from']) + + fields = _df.columns.tolist() + if not writer.has(table=table) and _args['store']['provider'] != 'bigquery': + + _map = {'STRING':'VARCHAR(256)','INTEGER':'BIGINT'} if 'provider' in _args['store'] and _args['store']['provider'] != 'bigquery' else {} + _params = {'map':_map,'table':args['from']} + if _schema : + _params['schema'] = _schema + + else: + _params['fields'] = fields + + writer.make(**_params) + + fields = _df.columns.tolist() + _df = _df[fields] + # writer.fields = fields + if _args['store']['provider'] == 'bigquery' : + print (['_______ POSTING ______________ ',table]) + print (['_______________ ',_df.shape[0],' ___________________']) + writer.write(_df.astype(object),schema=_schema,table=table) else: - writer.write(_df,table=args['from']) + writer.table = table + writer.write(_df) + # else: + # writer.write(_df,table=args['from']) + def finalize(self,args): """ @@ -288,8 +314,9 @@ class Components : """ reader = factory.instance(**args['store']['source']) logger = factory.instance(**args['store']['logs']) - target = args['store']['target']['args']['dataset'] - source = args['store']['source']['args']['dataset'] + + target = args['store']['target']['args']['dataset'] + source = args['store']['source']['args']['dataset'] table = args['from'] schema = reader.meta(table=args['from']) # @@ -327,7 +354,10 @@ class Components : This function will generate data and store it to a given, """ store = args['store']['logs'] - store['args']['doc'] = args['context'] + if 'args' in store : + store['args']['doc'] = args['context'] + else: + store['doc'] = args['context'] logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) ostore = args['store']['target'] @@ -348,13 +378,13 @@ class Components : schema = reader.meta(table=args['from']) schema = [{"name":_item.name,"type":_item.field_type} for _item in schema] - # else: # # # # This will account for autopilot mode ... # df = args['data'] _cast = {} if schema : + for _item in schema : dtype = str name = _item['name'] @@ -405,139 +435,72 @@ class Components : logger.write(_info) if args['data'].shape[0] > 0 and args['data'].shape[1] > 0 : candidates = (data.maker.generate(**args)) + else: candidates = [df] - if 'sql.BQWriter' in ostore['type'] : - #table = ".".join([ostore['['dataset'],args['context']]) - # writer = factory.instance(**ostore) - _columns = None - skip_columns = [] - _schema = schema - if schema : - cols = [_item['name'] for _item in _schema] - else: - cols = df.columns - for _df in candidates : - # - # we need to format the fields here to make sure we have something cohesive - # + + # if 'sql.BQWriter' in ostore['type'] : + _columns = None + skip_columns = [] + _schema = schema + if schema : + cols = [_item['name'] for _item in _schema] + else: + cols = df.columns.tolist() + _info = {"module":"gan-prep","action":"selection","input":{"candidates":len(candidates),"features":cols}} + logger.write(_info) + for _df in candidates : + # + # we need to format the fields here to make sure we have something cohesive + # - if not skip_columns : - # _columns = set(df.columns) - set(_df.columns) - if 'ignore' in args and 'columns' in args['ignore'] : - skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns']) - # for name in args['ignore']['columns'] : - # for _name in _df.columns: - # if _name in name: - # skip_columns.append(_name) - # - # We perform a series of set operations to insure that the following conditions are met: - # - the synthetic dataset only has fields that need to be synthesized - # - The original dataset has all the fields except those that need to be synthesized - # - - _df = _df[list(set(_df.columns) - set(skip_columns))].copy() - if x_cols : - _approx = {} - for _col in x_cols : - if real_df[_col].unique().size > 0 : - + if not skip_columns : + if 'ignore' in args and 'columns' in args['ignore'] : + skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns']) + # + # We perform a series of set operations to insure that the following conditions are met: + # - the synthetic dataset only has fields that need to be synthesized + # - The original dataset has all the fields except those that need to be synthesized + # + + _df = _df[list(set(_df.columns) - set(skip_columns))].copy() + if x_cols : + _approx = {} + for _col in x_cols : + if real_df[_col].unique().size > 0 : + - _df[_col] = self.approximate(real_df[_col].values) - _approx[_col] = { - "io":{"min":_df[_col].min().astype(float),"max":_df[_col].max().astype(float),"mean":_df[_col].mean().astype(float),"sd":_df[_col].values.std().astype(float),"missing": _df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":_df[_col].where(_df[_col] == 0).dropna().count().astype(float)}, - "real":{"min":real_df[_col].min().astype(float),"max":real_df[_col].max().astype(float),"mean":real_df[_col].mean().astype(float),"sd":real_df[_col].values.std().astype(float),"missing": real_df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":real_df[_col].where(_df[_col] == 0).dropna().count().astype(float)} - } - else: - _df[_col] = -1 - logger.write({"module":"gan-generate","action":"approximate","status":_approx}) - if set(df.columns) & set(_df.columns) : - _columns = set(df.columns) - set(_df.columns) - df = df[_columns] + _df[_col] = self.approximate(real_df[_col].values) + _approx[_col] = { + "io":{"min":_df[_col].min().astype(float),"max":_df[_col].max().astype(float),"mean":_df[_col].mean().astype(float),"sd":_df[_col].values.std().astype(float),"missing": _df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":_df[_col].where(_df[_col] == 0).dropna().count().astype(float)}, + "real":{"min":real_df[_col].min().astype(float),"max":real_df[_col].max().astype(float),"mean":real_df[_col].mean().astype(float),"sd":real_df[_col].values.std().astype(float),"missing": real_df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":real_df[_col].where(_df[_col] == 0).dropna().count().astype(float)} + } + else: + _df[_col] = -1 + logger.write({"module":"gan-generate","action":"approximate","status":_approx}) + if set(df.columns) & set(_df.columns) : + _columns = list(set(df.columns) - set(_df.columns)) + df = df[_columns] - # - # Let us merge the dataset here and and have a comprehensive dataset + # + # Let us merge the dataset here and and have a comprehensive dataset - _df = pd.DataFrame.join(df,_df) - - # if _schema : - # for _item in _schema : - # if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] : - # _df[_item['name']] = _df[_item['name']].astype(str) - - # pass - _params = {'data':_df,'store' : ostore} - if _schema : - _params ['schema'] = _schema - self.post(**_params) - # if _schema : - # writer.write(_df[cols],schema=_schema,table=args['from']) - # self.post(data=_df,schema=) - # else: - # writer.write(_df[cols],table=args['from']) + _df = pd.DataFrame.join(df,_df) + _params = {'data':_df,'store' : ostore} + if _schema : + _params ['schema'] = _schema + _info = {"module":"gan-prep","action":"write","input":{"rows":_df.shape[0],"cols":_df.shape[1]}} + logger.write(_info) + self.post(**_params) + # print (['_______ posting _________________',_df.shape]) + break + pass # else: # pass - - - # # - # # We need to post the generate the data in order to : - # # 1. compare immediately - # # 2. synthetic copy - # # - - # cols = _dc.columns.tolist() - - # data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) - # # - # # performing basic analytics on the synthetic data generated (easy to quickly asses) - # # - # info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} - - # # - # # @TODO: Send data over to a process for analytics - - # base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) - # cols = _dc.columns.tolist() - # for name in cols : - # _args['data'][name] = _dc[name] - - # # - # #-- Let us store all of this into bigquery - # prefix = args['notify']+'.'+_args['context'] - # partition = str(partition) - # table = '_'.join([prefix,partition,'io']).replace('__','_') - # folder = os.sep.join([args['logs'],args['context'],partition,'output']) - # if 'file' in args : - - # _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) - # _pname = os.sep.join([folder,table])+'.csv' - # data_comp.to_csv( _pname,index=False) - # _args['data'].to_csv(_fname,index=False) - - # _id = 'path' - # else: - - # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') - # _pname = os.sep.join([folder,table+'.csv']) - # _fname = table.replace('_io','_full_io') - # partial = '.'.join(['io',args['context']+'_partial_io']) - # complete= '.'.join(['io',args['context']+'_full_io']) - # data_comp.to_csv(_pname,index=False) - # if 'dump' in args : - # print (_args['data'].head()) - # else: - # Components.lock.acquire() - # data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) - # _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) - # Components.lock.release() - # _id = 'dataset' - # info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } - # if partition : - # info ['partition'] = int(partition) - # logger.write({"module":"generate","action":"write","input":info} ) - + def bind(self,**_args): + print (_args) if __name__ == '__main__' : @@ -611,6 +574,50 @@ if __name__ == '__main__' : generator = Components() generator.generate(args) + elif 'bind' in SYS_ARGS : + import binder + _args = _config['_map'] + _args['store'] = copy.deepcopy(_config['store']) + if 'init' in SYS_ARGS : + # + # Creating and persisting the map ... + print (['.... Binding Initialization']) + # jobs = binder.Init(**_args) + _mapped = binder.Init(**_args) + + + _schema = [{"name":_name,"type":"INTEGER"} for _name in _mapped.columns.tolist()] + publisher = lambda _params: (Components()).post(**_params) + _args = {'data':_mapped,'store':_config['store']['target']} + _args['store']['table'] = '_map' + if _args['store']['provider'] =='bigquery' : + _args['schema'] = _schema + + job = Process (target = publisher,args=(_args,)) + job.start() + jobs = [job] + else: + # + # Applying the map of k on a particular dataset + # + index = int(SYS_ARGS['index']) + _args['config'] = _config['pipeline'][index] + _args['original_key'] = 'person_id' if 'original_key' in _config else 'person_id' + table = _config['pipeline'][index]['from'] + _df = binder.ApplyOn(**_args) + _df = np.array_split(_df,PART_SIZE) + jobs = [] + print (['Publishing ',PART_SIZE,' PARTITION']) + for data in _df : + publisher = lambda _params: ( Components() ).post(**_params) + _args = {'data':data,'store':_config['store']['target']} + _args['store']['table'] = table + print (_args['store']) + job = Process(target = publisher,args=(_args,)) + job.name = "Publisher "+str(len(jobs)+1) + job.start() + jobs.append(job) + elif 'shuffle' in SYS_ARGS : index = 0 if GPU_CHIPS and 'all-chips' in SYS_ARGS: @@ -632,6 +639,7 @@ if __name__ == '__main__' : # Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition # @TODO: Find better name for partition # + if GPU_CHIPS and 'all-chips' in SYS_ARGS: index = 0 print (['... launching ',len(GPU_CHIPS),' jobs',args['context']]) @@ -652,12 +660,15 @@ if __name__ == '__main__' : else: # # The choice of the chip will be made internally + agent = Components() agent.train(**args) # # If we have any obs we should wait till they finish # DIRTY = 0 + if (len(jobs)) : + print (['.... waiting on ',len(jobs),' jobs']) while len(jobs)> 0 : DIRTY =1 jobs = [job for job in jobs if job.is_alive()] @@ -666,47 +677,16 @@ if __name__ == '__main__' : print (["..:: jobs finished "]) # # We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations - # - - if 'autopilot' in SYS_ARGS or 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) : - # - # We should pull all the primary keys and regenerate them in order to insure some form of consistency - # - print (["..:: Finalizing process"]) - (Components()).finalize(args) - # finalize(args) - pass - # jobs = [] - # for index in range(0,PART_SIZE) : - # if 'focus' in args and int(args['focus']) != index : - # continue - # args['part_size'] = PART_SIZE - # args['partition'] = index - # args['data'] = DATA[index] - # if int(args['num_gpu']) > 1 : - # args['gpu'] = index - # else: - # args['gpu']=0 + # This holds true for bigquery - bigquery only + IS_BIGQUERY = _config['store']['source']['provider'] == _config['store']['target']['provider'] and _config['store']['source']['provider'] == 'bigquery' - # make = lambda _args: (Components()).train(**_args) - # job = Process(target=make,args=( dict(args),)) - # job.name = 'Trainer # ' + str(index) - # job.start() - # jobs.append(job) - # # args['gpu'] - # print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ]) - # while len(jobs)> 0 : - # jobs = [job for job in jobs if job.is_alive()] - # time.sleep(2) + # if 'bind' not in SYS_ARGS and IS_BIGQUERY and ('autopilot' in SYS_ARGS or 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS)) : + # # + # # We should pull all the primary keys and regenerate them in order to insure some form of consistency + # # - # trainer = Components() - # trainer.train(**args) + # # + # # - - # Components.train(**args) -#for args in PIPELINE : - #args['dataset'] = 'combined20190510' - #process = Process(target=Components.train,args=(args,)) - #process.name = args['context'] - #process.start() -# Components.train(args) + # print (["..:: Finalizing process"]) + # (Components()).finalize(args)