#!/usr/bin/env python3 """ This file will perform basic tasks to finalize the GAN process by performing the following : - basic stats & analytics - rebuild io to another dataset """ import pandas as pd import numpy as np from multiprocessing import Process, Lock from google.oauth2 import service_account from google.cloud import bigquery as bq import transport from data.params import SYS_ARGS import json import pandas as pd import numpy as np from google.oauth2 import service_account import json # path = '../curation-prod.json' # credentials = service_account.Credentials.from_service_account_file(path) # df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard') filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config'] f = open(filename) config = json.loads(f.read()) args = config['pipeline'] f.close() def _formatSQL(**_args): """ This function will build the _map for a given segment """ sql = """ select DISTINCT x.person_id synthetic,y.person_id original FROM :synthetic.:table x INNER JOIN :original.:table y on x.person_id in (:ids) AND x.person_id <> y.person_id AND x.gender_source_value = y.gender_source_value AND x.year_of_birth = y.year_of_birth ORDER BY 1 """ table= _args['table'] original,synthetic = _args['schema']['original'],_args['schema']['synthetic'] _ids = np.array(_args['ids']).astype(str) return sql.replace(":ids",",".join(_ids)).replace(":synthetic",synthetic).replace(":original",original).replace(":table",table) def _addCounts(**_args) : store = _args['store'] sql = _args['sql'] reader = transport.factory.instance(**store['source']) _df = reader.read(sql=sql) _ids = _df.synthetic.unique() _counts = [ np.sum(_df.synthetic == value) for value in _ids] original = [_df[_df.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids] _df = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts}) # # We can post this to the backend ... # table = '_map' #-- Yes this is hard-coded writer = transport.factory.instance(**dict(store['target'],**{"parallel":True,"table":table})) # if writer.has(table=table) is False: # writer.write(_df) # else: _schema = [{"name":name,"type":"INTEGER"} for name in _df.columns] writer.write(_df,schema=_schema) def Init(**_args) : """ This function will build a map of the synthetic to real individuals. The assumption is that the synthesized data is stored in the same data-store as the original the parameters provided are : :param store object from the configuration file with source,target entries :param table name of the original/synthetic tables (they should be the same) :param feat. featuress/attributes ... demographics to account for """ store = _args['store'] reader = transport.factory.instance(**store['source']) original,synthetic = _args['schema']['original'],_args['schema']['synthetic'] table = _args['table'] sql = _args['sql'].replace(':synthetic',synthetic).replace(':original',original).replace(':table',table) _map = reader.read(sql=sql) k = _args['k'] if 'k' in _args else 2 # _iodf = reader.read(table=table) # _ids = _iodf['person_id'].unique().tolist() # x_ = np.array_split(_ids,1000) jobs = [] # for _items in x_ : # _p = {"ids":_items,"schema":_args['schema'],'store':store,'table':table} # sql = _formatSQL(**_p) # _p['sql'] = sql # _apply = lambda params: _addCounts(**params) # thread = Process(target=_apply,args=(_p,)) # thread.start() # jobs.append(thread) # return jobs # # We have performed a m:m (many-to-many) relationship with original participants and synthetic participants # The goal is to obtain a singular map against which records will be migrated # print (['... computing counts (k)']) _ids = _map.synthetic.unique() _counts = [ np.sum(_map.synthetic == value) for value in _ids] original = [_map[_map.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids] print (['Building k-classes/groups']) _mdf = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts}) i = _mdf.apply(lambda row: row.counts >= k,axis=1) _mdf = _mdf[i] # # Log what just happened here so we know about the equivalence classes, # {"module":"binder","action":"map-generation","input":{"k":k,"rows":{"synthetic":_mdf.shape[0],"original":len(_counts)}}} return _mdf # # now we are posting this to target storage ... # def ApplyOn (**_args): """ This function will rewrite SQL that applies the synthetic identifier to the entries of the pipeline We assume that the _map has two attributes (synthetic and original) :param store :param _config """ store_args = _args['store'] _config = _args['config'] table = _config['from'] reader = transport.factory.instance(**dict(store_args['source'],**{"table":table})) attr = reader.read(limit=1).columns.tolist() original_key = _args['original_key'] #-- assuming referential integrity # synthetic_key= columns['synthetic'] # mapped_original=columns['orginal'] fields = list(set(attr) - set([original_key])) sql = "select _map.synthetic as :original_key,:fields from :original_schema.:table inner join :synthetic_schema._map on _map.original = :table.:original_key" sql = sql.replace(":table",table).replace(":fields",",".join(fields)) sql = sql.replace(":original_key",original_key) _schema = _args['schema'] sql = sql.replace(":original_schema",_schema['original']).replace(":synthetic_schema",_schema['synthetic']) return reader.read (sql=sql) if __name__ == '__main__' : pass # class Analytics : # """ # This class will compile basic analytics about a given dataset i.e compare original/synthetic # """ # @staticmethod # def distribution(**args): # context = args['context'] # df = args['data'] # # # #-- This data frame counts unique values for each feature (space) # df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts # # # #-- Get the distributions for common values # # # names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False] # ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0) # ddf[context] = ddf.index # pass # def distance(**args): # """ # This function will measure the distance between # """ # pass # class Utils : # @staticmethod # def log(**args): # logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"}) # logger.write(args) # logger.close() # class get : # @staticmethod # def pipeline(table,path) : # # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] # config = json.loads((open(path)).read()) # pipeline = config['pipeline'] # # return [ item for item in pipeline if item['context'] in contexts] # pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table] # Utils.log(module=table,action='init',input={"pipeline":pipeline}) # return pipeline # @staticmethod # def sql(**args) : # """ # This function is intended to build SQL query for the remainder of the table that was not synthesized # :config configuration entries # :from source of the table name # :dataset name of the source dataset # """ # SQL = ["SELECT * FROM :from "] # SQL_FILTER = [] # NO_FILTERS_FOUND = True # # pipeline = Utils.get.config(**args) # pipeline = args['pipeline'] # REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='} # for item in pipeline : # if 'filter' in item : # if NO_FILTERS_FOUND : # NO_FILTERS_FOUND = False # SQL += ['WHERE'] # # # # Let us load the filter in the SQL Query # FILTER = item['filter'] # QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()] # SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])] # src = ".".join([args['dataset'],args['from']]) # SQL += [" AND ".join(SQL_FILTER)] # # # # let's pull the field schemas out of the table definition # # # Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) }) # return " ".join(SQL).replace(":from",src) # def mk(**args) : # dataset = args['dataset'] # client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key']) # # # # let us see if we have a dataset handy here # # # datasets = list(client.list_datasets()) # found = [item for item in datasets if item.dataset_id == dataset] # if not found : # return client.create_dataset(dataset) # return found[0] # def move (args): # """ # This function will move a table from the synthetic dataset into a designated location # This is the simplest case for finalizing a synthetic data set # :private_key # """ # pipeline = Utils.get.pipeline(args['from'],args['config']) # _args = json.loads((open(args['config'])).read()) # _args['pipeline'] = pipeline # # del _args['pipeline'] # args = dict(args,**_args) # # del args['pipeline'] # # private_key = args['private_key'] # client = bq.Client.from_service_account_json(args['private_key']) # dataset = args['dataset'] # if pipeline : # SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline] # SQL += [Utils.get.sql(**args)] # SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io')) # else: # # # # moving a table to a designated location # tablename = args['from'] # if 'sql' not in args : # SQL = "SELECT * FROM :dataset.:table" # else: # SQL = args['sql'] # SQL = SQL.replace(":dataset",dataset).replace(":table",tablename) # Utils.log(module=args['from'],action='sql',input={'sql':SQL}) # # # # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table # # # odataset = mk(dataset=dataset+'_io',client=client) # # SQL = "SELECT * FROM io.:context_full_io".replace(':context',context) # config = bq.QueryJobConfig() # config.destination = client.dataset(odataset.dataset_id).table(args['from']) # config.use_query_cache = True # config.allow_large_results = True # config.priority = 'INTERACTIVE' # # # # # schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema # fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema] # SQL = SQL.replace("*"," , ".join(fields)) # # print (SQL) # out = client.query(SQL,location='US',job_config=config) # Utils.log(module=args['from'],action='move',input={'job':out.job_id}) # return (out.job_id) # import pandas as pd # import numpy as np # from google.oauth2 import service_account # import json # # path = '../curation-prod.json' # # credentials = service_account.Credentials.from_service_account_file(path) # # df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard') # filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config'] # f = open(filename) # config = json.loads(f.read()) # args = config['pipeline'] # f.close() # if __name__ == '__main__' : # """ # Usage : # finalize -- --contexts --from # """ # if 'move' in SYS_ARGS : # if 'init' in SYS_ARGS : # dep = config['dep'] if 'dep' in config else {} # info = [] # if 'queries' in dep : # info += dep['queries'] # print ('________') # if 'tables' in dep : # info += dep['tables'] # args = {} # jobs = [] # for item in info : # args = {} # if type(item) == str : # args['from'] = item # name = item # else: # args = item # name = item['from'] # args['config'] = SYS_ARGS['config'] # # args['pipeline'] = [] # job = Process(target=move,args=(args,)) # job.name = name # jobs.append(job) # job.start() # # while len(jobs) > 0 : # # jobs = [job for job in jobs if job.is_alive()] # # time.sleep(1) # else: # move(SYS_ARGS) # # # table = SYS_ARGS['from'] # # # args = dict(config,**{"private_key":"../curation-prod.json"}) # # args = dict(args,**SYS_ARGS) # # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']] # # log = [] # # if contexts : # # args['contexts'] = contexts # # log = move(**args) # # else: # # tables = args['from'].split(',') # # for name in tables : # # name = name.strip() # # args['from'] = name # # log += [move(**args)] # # print ("\n".join(log)) # else: # print ("NOT YET READY !")