bug fixes and simplified interface

This commit is contained in:
Steve Nyemba 2022-04-11 18:33:07 -05:00
parent 964ddb06ab
commit 0384a2e96f
4 changed files with 177 additions and 246 deletions

View File

@ -11,13 +11,15 @@ This package is designed to generate synthetic data from a dataset from an origi
import pandas as pd
import numpy as np
import data.gan as gan
from transport import factory
import transport
from data.bridge import Binary
import threading as thread
from data.maker import prepare
import copy
import os
import json
from multiprocessing import Process, RLock
class ContinuousToDiscrete :
ROUND_UP = 2
@ -101,7 +103,7 @@ def train (**_args):
else:
args['store']['doc'] = _args['context']
logger = factory.instance(**args['store'])
logger = transport.factory.instance(**args['store'])
args['logger'] = logger
for key in _inputhandler._map :
@ -193,4 +195,173 @@ def generate(**_args):
candidates = handler.apply(candidates=args['candidates'])
return [_inputhandler.revert(matrix=_matrix) for _matrix in candidates]
class Learner(Process):
def __init__(self,**_args):
super(Learner, self).__init__()
if 'gpu' in _args :
print (_args['gpu'])
os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu'])
self.gpu = int(_args['gpu'])
else:
self.gpu = None
self.info = _args['info']
self.columns = self.info['columns'] if 'columns' in self.info else None
self.store = _args['store']
if 'network_args' not in _args :
self.network_args ={
'context':_args['context'] if 'context' in _args else 'GENERAL',
'logs':_args['logpath'] if 'logpath' in _args else 'logs',
'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2,
'batch_size':int (_args['batch']) if 'batch' in _args else 2000
}
else:
self.network_args = _args['network_args']
self._encoder = None
self._map = None
self._df = _args['data'] if 'data' in _args else None
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
def get_schema(self):
return [{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
def initalize(self):
reader = transport.factory.instance(**self.store['source'])
_read_args= self.info
if self._df is None :
self._df = reader.read(**_read_args)
columns = self.columns if self.columns else self._df.columns
#
# convert the data to binary here ...
_args = {"schema":self.get_schema(),"data":self._df,"columns":columns}
if self._map :
_args['map'] = self._map
self._encoder = prepare.Input(**_args)
class Trainer(Learner):
"""
This will perform training using a GAN
"""
def __init__(self,**_args):
super().__init__(**_args)
# self.info = _args['info']
self.limit = int(_args['limit']) if 'limit' in _args else None
self.name = _args['name']
self.autopilot = _args['autopilot'] if 'autopilot' in _args else False
self.generate = None
self.candidates = int(_args['candidates']) if 'candidates' in _args else 1
def run(self):
self.initalize()
_space,_matrix = self._encoder.convert()
_args = self.network_args
if self.gpu :
_args['gpu'] = self.gpu
_args['real'] = _matrix
_args['candidates'] = self.candidates
#
# At this point we have the binary matrix, we can initiate training
#
gTrain = gan.Train(**_args)
gTrain.apply()
writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json']))
writer.write(self._encoder._map,overwrite=True)
writer.close()
#
# @TODO: At this point we need to generate another some other objects
#
_args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df}
if self.gpu :
_args['gpu'] = self.gpu
g = Generator(**_args)
# g.run()
self.generate = g
if self.autopilot :
self.generate.run()
def generate (self):
if self.autopilot :
print( "Autopilot is set ... No need to call this function")
else:
raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object")
class Generator (Learner):
def __init__(self,**_args):
super().__init__(**_args)
#
# We need to load the mapping information for the space we are working with ...
#
self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1
filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json'])
file = open(filename)
self._map = json.loads(file.read())
file.close()
def run(self):
self.initalize()
#
# The values will be returned because we have provided _map information from the constructor
#
values,_matrix = self._encoder.convert()
_args = self.network_args
_args['map'] = self._map
_args['values'] = np.array(values)
_args['row_count'] = self._df.shape[0]
gHandler = gan.Predict(**_args)
gHandler.load_meta(columns=None)
_iomatrix = gHandler.apply()
_candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix]
self.post(_candidates)
def appriximate(self,_df):
_columns = self.info['approximate']
_schema = {}
for _info in self.get_schema() :
_schema[_info['name']] = _info['type']
for name in _columns :
batches = np.array_split(_df[name].values,10)
x = []
for values in batches :
_values = np.random.dirichlet(values)
x += list(values + _values )if np.random.randint(0,2) else list(values - _values)
_df[name] = np.int64(x) if 'int' in _schema[name] else np.float64(x)
return _df
def format(self,_df):
pass
def post(self,_candidates):
_store = self.store['target'] if 'target' in self.store else {'provider':'console'}
_store['lock'] = True
writer = transport.factory.instance(**_store)
for _iodf in _candidates :
_df = self._df.copy()
_df[self.columns] = _iodf[self.columns]
if 'approximate' in self.info :
_df = self.appriximate(_df)
writer.write(_df,schema=self.get_schema())
pass
class factory :
_infocache = {}
@staticmethod
def instance(**_args):
"""
An instance of an object that trains and generates candidate datasets
:param gpu (optional) index of the gpu to be used if using one
:param store {source,target} if no target is provided console will be output
:param epochs (default 2) number of epochs to train
:param candidates(default 1) number of candidates to generate
:param info {columns,sql,from}
:param autopilot will generate output automatically
:param batch (default 2k) size of the batch
"""
return Trainer(**_args)

View File

@ -128,7 +128,7 @@ class Input :
cols, _matrix = self.tobinary(_df[name],values)
_beg,_end = i,i+len(cols)
if name not in self._map :
self._map[name] = {"beg":_beg,"end":_end ,"values":cols}
self._map[name] = {"beg":_beg,"end":_end ,"values":cols.tolist()}
i += len(cols)
if not _m.shape[0]:
_m = _matrix ;
@ -196,7 +196,7 @@ class Input :
# In the advent the sample rows do NOT have the values of the
cols = rows.unique()
cols = np.array(cols)
row_count = len(rows)
row_count = np.int64(len(rows))
# if 'GPU' not in os.environ :
# _matrix = np.zeros([row_count,cols.size],dtype=int)
#

View File

@ -1,240 +0,0 @@
#!/usr/bin/env python3
"""
This file will perform basic tasks to finalize the GAN process by performing the following :
- basic stats & analytics
- rebuild io to another dataset
"""
import pandas as pd
import numpy as np
from multiprocessing import Process, Lock
from google.oauth2 import service_account
from google.cloud import bigquery as bq
import transport
from data.params import SYS_ARGS
import json
class Analytics :
"""
This class will compile basic analytics about a given dataset i.e compare original/synthetic
"""
@staticmethod
def distribution(**args):
context = args['context']
df = args['data']
#
#-- This data frame counts unique values for each feature (space)
df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts
#
#-- Get the distributions for common values
#
names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False]
ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0)
ddf[context] = ddf.index
pass
def distance(**args):
"""
This function will measure the distance between
"""
pass
class Utils :
@staticmethod
def log(**args):
logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"})
logger.write(args)
logger.close()
class get :
@staticmethod
def pipeline(table,path) :
# contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts']
config = json.loads((open(path)).read())
pipeline = config['pipeline']
# return [ item for item in pipeline if item['context'] in contexts]
pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table]
Utils.log(module=table,action='init',input={"pipeline":pipeline})
return pipeline
@staticmethod
def sql(**args) :
"""
This function is intended to build SQL query for the remainder of the table that was not synthesized
:config configuration entries
:from source of the table name
:dataset name of the source dataset
"""
SQL = ["SELECT * FROM :from "]
SQL_FILTER = []
NO_FILTERS_FOUND = True
# pipeline = Utils.get.config(**args)
pipeline = args['pipeline']
REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='}
for item in pipeline :
if 'filter' in item :
if NO_FILTERS_FOUND :
NO_FILTERS_FOUND = False
SQL += ['WHERE']
#
# Let us load the filter in the SQL Query
FILTER = item['filter']
QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()]
SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])]
src = ".".join([args['dataset'],args['from']])
SQL += [" AND ".join(SQL_FILTER)]
#
# let's pull the field schemas out of the table definition
#
Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) })
return " ".join(SQL).replace(":from",src)
def mk(**args) :
dataset = args['dataset']
client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key'])
#
# let us see if we have a dataset handy here
#
datasets = list(client.list_datasets())
found = [item for item in datasets if item.dataset_id == dataset]
if not found :
return client.create_dataset(dataset)
return found[0]
def move (args):
"""
This function will move a table from the synthetic dataset into a designated location
This is the simplest case for finalizing a synthetic data set
:private_key
"""
pipeline = Utils.get.pipeline(args['from'],args['config'])
_args = json.loads((open(args['config'])).read())
_args['pipeline'] = pipeline
# del _args['pipeline']
args = dict(args,**_args)
# del args['pipeline']
# private_key = args['private_key']
client = bq.Client.from_service_account_json(args['private_key'])
dataset = args['dataset']
if pipeline :
SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline]
SQL += [Utils.get.sql(**args)]
SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io'))
else:
#
# moving a table to a designated location
tablename = args['from']
if 'sql' not in args :
SQL = "SELECT * FROM :dataset.:table"
else:
SQL = args['sql']
SQL = SQL.replace(":dataset",dataset).replace(":table",tablename)
Utils.log(module=args['from'],action='sql',input={'sql':SQL})
#
# At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table
#
odataset = mk(dataset=dataset+'_io',client=client)
# SQL = "SELECT * FROM io.:context_full_io".replace(':context',context)
config = bq.QueryJobConfig()
config.destination = client.dataset(odataset.dataset_id).table(args['from'])
config.use_query_cache = True
config.allow_large_results = True
config.priority = 'INTERACTIVE'
#
#
schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema]
SQL = SQL.replace("*"," , ".join(fields))
# print (SQL)
out = client.query(SQL,location='US',job_config=config)
Utils.log(module=args['from'],action='move',input={'job':out.job_id})
return (out.job_id)
import pandas as pd
import numpy as np
from google.oauth2 import service_account
import json
# path = '../curation-prod.json'
# credentials = service_account.Credentials.from_service_account_file(path)
# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard')
filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config']
f = open(filename)
config = json.loads(f.read())
args = config['pipeline']
f.close()
if __name__ == '__main__' :
"""
Usage :
finalize --<move|stats> --contexts <c1,c2,...c3> --from <table>
"""
if 'move' in SYS_ARGS :
if 'init' in SYS_ARGS :
dep = config['dep'] if 'dep' in config else {}
info = []
if 'queries' in dep :
info += dep['queries']
print ('________')
if 'tables' in dep :
info += dep['tables']
args = {}
jobs = []
for item in info :
args = {}
if type(item) == str :
args['from'] = item
name = item
else:
args = item
name = item['from']
args['config'] = SYS_ARGS['config']
# args['pipeline'] = []
job = Process(target=move,args=(args,))
job.name = name
jobs.append(job)
job.start()
# while len(jobs) > 0 :
# jobs = [job for job in jobs if job.is_alive()]
# time.sleep(1)
else:
move(SYS_ARGS)
# # table = SYS_ARGS['from']
# # args = dict(config,**{"private_key":"../curation-prod.json"})
# args = dict(args,**SYS_ARGS)
# contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']]
# log = []
# if contexts :
# args['contexts'] = contexts
# log = move(**args)
# else:
# tables = args['from'].split(',')
# for name in tables :
# name = name.strip()
# args['from'] = name
# log += [move(**args)]
# print ("\n".join(log))
else:
print ("NOT YET READY !")

View File

@ -486,7 +486,7 @@ class Components :
# Let us merge the dataset here and and have a comprehensive dataset
_df = pd.DataFrame.join(df,_df)
_params = {'data':_df,'store' : ostore}
_params = {'data':_df,'store' : ostore,'from':args['from']}
if _schema :
_params ['schema'] = _schema
_info = {"module":"gan-prep","action":"write","input":{"rows":_df.shape[0],"cols":_df.shape[1]}}