data-maker/binder.py

377 lines
14 KiB
Python

#!/usr/bin/env python3
"""
This file will perform basic tasks to finalize the GAN process by performing the following :
- basic stats & analytics
- rebuild io to another dataset
"""
import pandas as pd
import numpy as np
from multiprocessing import Process, Lock
from google.oauth2 import service_account
from google.cloud import bigquery as bq
import transport
from data.params import SYS_ARGS
import json
import pandas as pd
import numpy as np
from google.oauth2 import service_account
import json
# path = '../curation-prod.json'
# credentials = service_account.Credentials.from_service_account_file(path)
# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard')
filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config']
f = open(filename)
config = json.loads(f.read())
args = config['pipeline']
f.close()
def _formatSQL(**_args):
"""
This function will build the _map for a given segment
"""
sql = """
select DISTINCT x.person_id synthetic,y.person_id original
FROM :synthetic.:table x
INNER JOIN :original.:table y on x.person_id in (:ids)
AND x.person_id <> y.person_id AND x.gender_source_value = y.gender_source_value
AND x.year_of_birth = y.year_of_birth
ORDER BY 1
"""
table= _args['table']
original,synthetic = _args['schema']['original'],_args['schema']['synthetic']
_ids = np.array(_args['ids']).astype(str)
return sql.replace(":ids",",".join(_ids)).replace(":synthetic",synthetic).replace(":original",original).replace(":table",table)
def _addCounts(**_args) :
store = _args['store']
sql = _args['sql']
reader = transport.factory.instance(**store['source'])
_df = reader.read(sql=sql)
_ids = _df.synthetic.unique()
_counts = [ np.sum(_df.synthetic == value) for value in _ids]
original = [_df[_df.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids]
_df = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts})
#
# We can post this to the backend ...
#
table = '_map' #-- Yes this is hard-coded
writer = transport.factory.instance(**dict(store['target'],**{"parallel":True,"table":table}))
# if writer.has(table=table) is False:
# writer.write(_df)
# else:
_schema = [{"name":name,"type":"INTEGER"} for name in _df.columns]
writer.write(_df,schema=_schema)
def Init(**_args) :
"""
This function will build a map of the synthetic to real individuals.
The assumption is that the synthesized data is stored in the same data-store as the original the parameters provided are :
:param store object from the configuration file with source,target entries
:param table name of the original/synthetic tables (they should be the same)
:param feat. featuress/attributes ... demographics to account for
"""
store = _args['store']
reader = transport.factory.instance(**store['source'])
original,synthetic = _args['schema']['original'],_args['schema']['synthetic']
table = _args['table']
sql = _args['sql'].replace(':synthetic',synthetic).replace(':original',original).replace(':table',table)
_map = reader.read(sql=sql)
k = _args['k'] if 'k' in _args else 2
# _iodf = reader.read(table=table)
# _ids = _iodf['person_id'].unique().tolist()
# x_ = np.array_split(_ids,1000)
jobs = []
# for _items in x_ :
# _p = {"ids":_items,"schema":_args['schema'],'store':store,'table':table}
# sql = _formatSQL(**_p)
# _p['sql'] = sql
# _apply = lambda params: _addCounts(**params)
# thread = Process(target=_apply,args=(_p,))
# thread.start()
# jobs.append(thread)
# return jobs
#
# We have performed a m:m (many-to-many) relationship with original participants and synthetic participants
# The goal is to obtain a singular map against which records will be migrated
#
print (['... computing counts (k)'])
_ids = _map.synthetic.unique()
_counts = [ np.sum(_map.synthetic == value) for value in _ids]
original = [_map[_map.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids]
print (['Building k-classes/groups'])
_mdf = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts})
i = _mdf.apply(lambda row: row.counts >= k,axis=1)
_mdf = _mdf[i]
#
# Log what just happened here so we know about the equivalence classes,
# {"module":"binder","action":"map-generation","input":{"k":k,"rows":{"synthetic":_mdf.shape[0],"original":len(_counts)}}}
return _mdf
#
# now we are posting this to target storage ...
#
def ApplyOn (**_args):
"""
This function will rewrite SQL that applies the synthetic identifier to the entries of the pipeline
We assume that the _map has two attributes (synthetic and original)
:param store
:param _config
"""
store_args = _args['store']
_config = _args['config']
table = _config['from']
reader = transport.factory.instance(**dict(store_args['source'],**{"table":table}))
attr = reader.read(limit=1).columns.tolist()
original_key = _args['original_key'] #-- assuming referential integrity
# synthetic_key= columns['synthetic']
# mapped_original=columns['orginal']
fields = list(set(attr) - set([original_key]))
sql = "select _map.synthetic as :original_key,:fields from :original_schema.:table inner join :synthetic_schema._map on _map.original = :table.:original_key"
sql = sql.replace(":table",table).replace(":fields",",".join(fields))
sql = sql.replace(":original_key",original_key)
_schema = _args['schema']
sql = sql.replace(":original_schema",_schema['original']).replace(":synthetic_schema",_schema['synthetic'])
return reader.read (sql=sql)
if __name__ == '__main__' :
pass
# class Analytics :
# """
# This class will compile basic analytics about a given dataset i.e compare original/synthetic
# """
# @staticmethod
# def distribution(**args):
# context = args['context']
# df = args['data']
# #
# #-- This data frame counts unique values for each feature (space)
# df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts
# #
# #-- Get the distributions for common values
# #
# names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False]
# ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0)
# ddf[context] = ddf.index
# pass
# def distance(**args):
# """
# This function will measure the distance between
# """
# pass
# class Utils :
# @staticmethod
# def log(**args):
# logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"})
# logger.write(args)
# logger.close()
# class get :
# @staticmethod
# def pipeline(table,path) :
# # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts']
# config = json.loads((open(path)).read())
# pipeline = config['pipeline']
# # return [ item for item in pipeline if item['context'] in contexts]
# pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table]
# Utils.log(module=table,action='init',input={"pipeline":pipeline})
# return pipeline
# @staticmethod
# def sql(**args) :
# """
# This function is intended to build SQL query for the remainder of the table that was not synthesized
# :config configuration entries
# :from source of the table name
# :dataset name of the source dataset
# """
# SQL = ["SELECT * FROM :from "]
# SQL_FILTER = []
# NO_FILTERS_FOUND = True
# # pipeline = Utils.get.config(**args)
# pipeline = args['pipeline']
# REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='}
# for item in pipeline :
# if 'filter' in item :
# if NO_FILTERS_FOUND :
# NO_FILTERS_FOUND = False
# SQL += ['WHERE']
# #
# # Let us load the filter in the SQL Query
# FILTER = item['filter']
# QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()]
# SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])]
# src = ".".join([args['dataset'],args['from']])
# SQL += [" AND ".join(SQL_FILTER)]
# #
# # let's pull the field schemas out of the table definition
# #
# Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) })
# return " ".join(SQL).replace(":from",src)
# def mk(**args) :
# dataset = args['dataset']
# client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key'])
# #
# # let us see if we have a dataset handy here
# #
# datasets = list(client.list_datasets())
# found = [item for item in datasets if item.dataset_id == dataset]
# if not found :
# return client.create_dataset(dataset)
# return found[0]
# def move (args):
# """
# This function will move a table from the synthetic dataset into a designated location
# This is the simplest case for finalizing a synthetic data set
# :private_key
# """
# pipeline = Utils.get.pipeline(args['from'],args['config'])
# _args = json.loads((open(args['config'])).read())
# _args['pipeline'] = pipeline
# # del _args['pipeline']
# args = dict(args,**_args)
# # del args['pipeline']
# # private_key = args['private_key']
# client = bq.Client.from_service_account_json(args['private_key'])
# dataset = args['dataset']
# if pipeline :
# SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline]
# SQL += [Utils.get.sql(**args)]
# SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io'))
# else:
# #
# # moving a table to a designated location
# tablename = args['from']
# if 'sql' not in args :
# SQL = "SELECT * FROM :dataset.:table"
# else:
# SQL = args['sql']
# SQL = SQL.replace(":dataset",dataset).replace(":table",tablename)
# Utils.log(module=args['from'],action='sql',input={'sql':SQL})
# #
# # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table
# #
# odataset = mk(dataset=dataset+'_io',client=client)
# # SQL = "SELECT * FROM io.:context_full_io".replace(':context',context)
# config = bq.QueryJobConfig()
# config.destination = client.dataset(odataset.dataset_id).table(args['from'])
# config.use_query_cache = True
# config.allow_large_results = True
# config.priority = 'INTERACTIVE'
# #
# #
# schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
# fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema]
# SQL = SQL.replace("*"," , ".join(fields))
# # print (SQL)
# out = client.query(SQL,location='US',job_config=config)
# Utils.log(module=args['from'],action='move',input={'job':out.job_id})
# return (out.job_id)
# import pandas as pd
# import numpy as np
# from google.oauth2 import service_account
# import json
# # path = '../curation-prod.json'
# # credentials = service_account.Credentials.from_service_account_file(path)
# # df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard')
# filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config']
# f = open(filename)
# config = json.loads(f.read())
# args = config['pipeline']
# f.close()
# if __name__ == '__main__' :
# """
# Usage :
# finalize --<move|stats> --contexts <c1,c2,...c3> --from <table>
# """
# if 'move' in SYS_ARGS :
# if 'init' in SYS_ARGS :
# dep = config['dep'] if 'dep' in config else {}
# info = []
# if 'queries' in dep :
# info += dep['queries']
# print ('________')
# if 'tables' in dep :
# info += dep['tables']
# args = {}
# jobs = []
# for item in info :
# args = {}
# if type(item) == str :
# args['from'] = item
# name = item
# else:
# args = item
# name = item['from']
# args['config'] = SYS_ARGS['config']
# # args['pipeline'] = []
# job = Process(target=move,args=(args,))
# job.name = name
# jobs.append(job)
# job.start()
# # while len(jobs) > 0 :
# # jobs = [job for job in jobs if job.is_alive()]
# # time.sleep(1)
# else:
# move(SYS_ARGS)
# # # table = SYS_ARGS['from']
# # # args = dict(config,**{"private_key":"../curation-prod.json"})
# # args = dict(args,**SYS_ARGS)
# # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']]
# # log = []
# # if contexts :
# # args['contexts'] = contexts
# # log = move(**args)
# # else:
# # tables = args['from'].split(',')
# # for name in tables :
# # name = name.strip()
# # args['from'] = name
# # log += [move(**args)]
# # print ("\n".join(log))
# else:
# print ("NOT YET READY !")