Merge branch 'dev' of aou/data-maker into master

This commit is contained in:
Steve L. Nyemba 2022-11-21 21:04:47 +00:00 committed by Gogs
commit 59d6cc50c0
17 changed files with 3436 additions and 656 deletions

View File

@ -13,17 +13,19 @@ This package is designed to generate synthetic data from a dataset from an origi
After installing the easiest way to get started is as follows (using pandas). The process is as follows:
Read about [data-transport on github](https://github.com/lnyemba/data-transport) or on [healthcareio.the-phi.com/git/code/transport](https://healthcareio.the-phi.com/git/code/transport.git)
**Train the GAN on the original/raw dataset**
1. We define the data sources
The sources will consists in source, target and logger20.
import pandas as pd
import data.maker
import transport
from transport import providers
df = pd.read_csv('sample.csv')
column = 'gender'
id = 'id'
context = 'demo'
data.maker.train(context=context,data=df,column=column,id=id,logs='logs')
The trainer will store the data on disk (for now) in a structured folder that will hold training models that will be used to generate the synthetic data.

1
bin/data-maker Symbolic link
View File

@ -0,0 +1 @@
pipeline.py

377
binder.py Normal file
View File

@ -0,0 +1,377 @@
#!/usr/bin/env python3
"""
This file will perform basic tasks to finalize the GAN process by performing the following :
- basic stats & analytics
- rebuild io to another dataset
"""
import pandas as pd
import numpy as np
from multiprocessing import Process, Lock
from google.oauth2 import service_account
from google.cloud import bigquery as bq
import transport
from data.params import SYS_ARGS
import json
import pandas as pd
import numpy as np
from google.oauth2 import service_account
import json
# path = '../curation-prod.json'
# credentials = service_account.Credentials.from_service_account_file(path)
# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard')
filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config']
f = open(filename)
config = json.loads(f.read())
args = config['pipeline']
f.close()
def _formatSQL(**_args):
"""
This function will build the _map for a given segment
"""
sql = """
select DISTINCT x.person_id synthetic,y.person_id original
FROM :synthetic.:table x
INNER JOIN :original.:table y on x.person_id in (:ids)
AND x.person_id <> y.person_id AND x.gender_source_value = y.gender_source_value
AND x.year_of_birth = y.year_of_birth
ORDER BY 1
"""
table= _args['table']
original,synthetic = _args['schema']['original'],_args['schema']['synthetic']
_ids = np.array(_args['ids']).astype(str)
return sql.replace(":ids",",".join(_ids)).replace(":synthetic",synthetic).replace(":original",original).replace(":table",table)
def _addCounts(**_args) :
store = _args['store']
sql = _args['sql']
reader = transport.factory.instance(**store['source'])
_df = reader.read(sql=sql)
_ids = _df.synthetic.unique()
_counts = [ np.sum(_df.synthetic == value) for value in _ids]
original = [_df[_df.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids]
_df = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts})
#
# We can post this to the backend ...
#
table = '_map' #-- Yes this is hard-coded
writer = transport.factory.instance(**dict(store['target'],**{"parallel":True,"table":table}))
# if writer.has(table=table) is False:
# writer.write(_df)
# else:
_schema = [{"name":name,"type":"INTEGER"} for name in _df.columns]
writer.write(_df,schema=_schema)
def Init(**_args) :
"""
This function will build a map of the synthetic to real individuals.
The assumption is that the synthesized data is stored in the same data-store as the original the parameters provided are :
:param store object from the configuration file with source,target entries
:param table name of the original/synthetic tables (they should be the same)
:param feat. featuress/attributes ... demographics to account for
"""
store = _args['store']
reader = transport.factory.instance(**store['source'])
original,synthetic = _args['schema']['original'],_args['schema']['synthetic']
table = _args['table']
sql = _args['sql'].replace(':synthetic',synthetic).replace(':original',original).replace(':table',table)
_map = reader.read(sql=sql)
k = _args['k'] if 'k' in _args else 2
# _iodf = reader.read(table=table)
# _ids = _iodf['person_id'].unique().tolist()
# x_ = np.array_split(_ids,1000)
jobs = []
# for _items in x_ :
# _p = {"ids":_items,"schema":_args['schema'],'store':store,'table':table}
# sql = _formatSQL(**_p)
# _p['sql'] = sql
# _apply = lambda params: _addCounts(**params)
# thread = Process(target=_apply,args=(_p,))
# thread.start()
# jobs.append(thread)
# return jobs
#
# We have performed a m:m (many-to-many) relationship with original participants and synthetic participants
# The goal is to obtain a singular map against which records will be migrated
#
print (['... computing counts (k)'])
_ids = _map.synthetic.unique()
_counts = [ np.sum(_map.synthetic == value) for value in _ids]
original = [_map[_map.synthetic == value].iloc[np.random.choice(np.arange(_counts[_ids.tolist().index(value)]),1),:].original.values[0] for value in _ids]
print (['Building k-classes/groups'])
_mdf = pd.DataFrame({"synthetic":_ids,"original":original,"counts":_counts})
i = _mdf.apply(lambda row: row.counts >= k,axis=1)
_mdf = _mdf[i]
#
# Log what just happened here so we know about the equivalence classes,
# {"module":"binder","action":"map-generation","input":{"k":k,"rows":{"synthetic":_mdf.shape[0],"original":len(_counts)}}}
return _mdf
#
# now we are posting this to target storage ...
#
def ApplyOn (**_args):
"""
This function will rewrite SQL that applies the synthetic identifier to the entries of the pipeline
We assume that the _map has two attributes (synthetic and original)
:param store
:param _config
"""
store_args = _args['store']
_config = _args['config']
table = _config['from']
reader = transport.factory.instance(**dict(store_args['source'],**{"table":table}))
attr = reader.read(limit=1).columns.tolist()
original_key = _args['original_key'] #-- assuming referential integrity
# synthetic_key= columns['synthetic']
# mapped_original=columns['orginal']
fields = list(set(attr) - set([original_key]))
sql = "select _map.synthetic as :original_key,:fields from :original_schema.:table inner join :synthetic_schema._map on _map.original = :table.:original_key"
sql = sql.replace(":table",table).replace(":fields",",".join(fields))
sql = sql.replace(":original_key",original_key)
_schema = _args['schema']
sql = sql.replace(":original_schema",_schema['original']).replace(":synthetic_schema",_schema['synthetic'])
return reader.read (sql=sql)
if __name__ == '__main__' :
pass
# class Analytics :
# """
# This class will compile basic analytics about a given dataset i.e compare original/synthetic
# """
# @staticmethod
# def distribution(**args):
# context = args['context']
# df = args['data']
# #
# #-- This data frame counts unique values for each feature (space)
# df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts
# #
# #-- Get the distributions for common values
# #
# names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False]
# ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0)
# ddf[context] = ddf.index
# pass
# def distance(**args):
# """
# This function will measure the distance between
# """
# pass
# class Utils :
# @staticmethod
# def log(**args):
# logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"})
# logger.write(args)
# logger.close()
# class get :
# @staticmethod
# def pipeline(table,path) :
# # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts']
# config = json.loads((open(path)).read())
# pipeline = config['pipeline']
# # return [ item for item in pipeline if item['context'] in contexts]
# pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table]
# Utils.log(module=table,action='init',input={"pipeline":pipeline})
# return pipeline
# @staticmethod
# def sql(**args) :
# """
# This function is intended to build SQL query for the remainder of the table that was not synthesized
# :config configuration entries
# :from source of the table name
# :dataset name of the source dataset
# """
# SQL = ["SELECT * FROM :from "]
# SQL_FILTER = []
# NO_FILTERS_FOUND = True
# # pipeline = Utils.get.config(**args)
# pipeline = args['pipeline']
# REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='}
# for item in pipeline :
# if 'filter' in item :
# if NO_FILTERS_FOUND :
# NO_FILTERS_FOUND = False
# SQL += ['WHERE']
# #
# # Let us load the filter in the SQL Query
# FILTER = item['filter']
# QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()]
# SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])]
# src = ".".join([args['dataset'],args['from']])
# SQL += [" AND ".join(SQL_FILTER)]
# #
# # let's pull the field schemas out of the table definition
# #
# Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) })
# return " ".join(SQL).replace(":from",src)
# def mk(**args) :
# dataset = args['dataset']
# client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key'])
# #
# # let us see if we have a dataset handy here
# #
# datasets = list(client.list_datasets())
# found = [item for item in datasets if item.dataset_id == dataset]
# if not found :
# return client.create_dataset(dataset)
# return found[0]
# def move (args):
# """
# This function will move a table from the synthetic dataset into a designated location
# This is the simplest case for finalizing a synthetic data set
# :private_key
# """
# pipeline = Utils.get.pipeline(args['from'],args['config'])
# _args = json.loads((open(args['config'])).read())
# _args['pipeline'] = pipeline
# # del _args['pipeline']
# args = dict(args,**_args)
# # del args['pipeline']
# # private_key = args['private_key']
# client = bq.Client.from_service_account_json(args['private_key'])
# dataset = args['dataset']
# if pipeline :
# SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline]
# SQL += [Utils.get.sql(**args)]
# SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io'))
# else:
# #
# # moving a table to a designated location
# tablename = args['from']
# if 'sql' not in args :
# SQL = "SELECT * FROM :dataset.:table"
# else:
# SQL = args['sql']
# SQL = SQL.replace(":dataset",dataset).replace(":table",tablename)
# Utils.log(module=args['from'],action='sql',input={'sql':SQL})
# #
# # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table
# #
# odataset = mk(dataset=dataset+'_io',client=client)
# # SQL = "SELECT * FROM io.:context_full_io".replace(':context',context)
# config = bq.QueryJobConfig()
# config.destination = client.dataset(odataset.dataset_id).table(args['from'])
# config.use_query_cache = True
# config.allow_large_results = True
# config.priority = 'INTERACTIVE'
# #
# #
# schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
# fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema]
# SQL = SQL.replace("*"," , ".join(fields))
# # print (SQL)
# out = client.query(SQL,location='US',job_config=config)
# Utils.log(module=args['from'],action='move',input={'job':out.job_id})
# return (out.job_id)
# import pandas as pd
# import numpy as np
# from google.oauth2 import service_account
# import json
# # path = '../curation-prod.json'
# # credentials = service_account.Credentials.from_service_account_file(path)
# # df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard')
# filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config']
# f = open(filename)
# config = json.loads(f.read())
# args = config['pipeline']
# f.close()
# if __name__ == '__main__' :
# """
# Usage :
# finalize --<move|stats> --contexts <c1,c2,...c3> --from <table>
# """
# if 'move' in SYS_ARGS :
# if 'init' in SYS_ARGS :
# dep = config['dep'] if 'dep' in config else {}
# info = []
# if 'queries' in dep :
# info += dep['queries']
# print ('________')
# if 'tables' in dep :
# info += dep['tables']
# args = {}
# jobs = []
# for item in info :
# args = {}
# if type(item) == str :
# args['from'] = item
# name = item
# else:
# args = item
# name = item['from']
# args['config'] = SYS_ARGS['config']
# # args['pipeline'] = []
# job = Process(target=move,args=(args,))
# job.name = name
# jobs.append(job)
# job.start()
# # while len(jobs) > 0 :
# # jobs = [job for job in jobs if job.is_alive()]
# # time.sleep(1)
# else:
# move(SYS_ARGS)
# # # table = SYS_ARGS['from']
# # # args = dict(config,**{"private_key":"../curation-prod.json"})
# # args = dict(args,**SYS_ARGS)
# # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']]
# # log = []
# # if contexts :
# # args['contexts'] = contexts
# # log = move(**args)
# # else:
# # tables = args['from'].split(',')
# # for name in tables :
# # name = name.strip()
# # args['from'] = name
# # log += [move(**args)]
# # print ("\n".join(log))
# else:
# print ("NOT YET READY !")

View File

@ -1,2 +1,6 @@
import data.params as params
# import data.params as params
from data.params import SYS_ARGS
import transport
from multiprocessing import Process, Queue
from data.maker import prepare
from data.maker import state

View File

@ -153,21 +153,28 @@ class Binary :
"""
This is a utility class to import and export a data to/from a binary matrix
"""
def __stream(self,column) :
def __stream(self,column,size=-1) :
"""
This function will convert a column into a binary matrix with the value-space representing each column of the resulting matrix
:column a column vector i.e every item is a row
"""
# values = np.unique(column)
values = column.dropna().unique()
values.sort()
# values = column.dropna().unique()
# values.sort()
# column = column.values
values = self.get_column(column,size)
column = column.values
#
# Let's treat the case of missing values i.e nulls
#
row_count,col_count = column.size,values.size
# if row_count * col_count > size and row_count < size:
matrix = [ np.zeros(col_count) for i in np.arange(row_count)]
matrix = [ np.zeros(col_count,dtype=np.float32) for i in np.arange(row_count)]
#
# let's create a binary matrix of the feature that was passed in
# The indices of the matrix are inspired by classical x,y axis
@ -176,14 +183,52 @@ class Binary :
for yi in np.arange(row_count) :
value = column[yi]
if value not in values :
continue
xi = np.where(values == value)
xi = xi[0][0] #-- column index
matrix[yi][xi] = 1
# if value not in values :
# continue
xi = np.where(values == value)
if xi and xi[0].size > 0:
xi = xi[0][0] #-- column index
matrix[yi][xi] = 1
return matrix
def Export(self,df) :
return pd.DataFrame(matrix,columns=values)
def apply(self,column,size):
return self.__stream(column,size)
def get_column(self,column,size=-1):
"""
This function will return the columns that are available for processing ...
"""
values = column.dropna().value_counts().index.values
if size > 0 and column.size > size:
values = values[:size]
values.sort()
return values
def get_missing(self,column,size=-1):
values = column.dropna().value_counts().index.values
if size > 0 and column.size > size :
values = values[size:]
else:
values = np.array([])
values.sort()
return values.tolist();
def _get_column_values(self,column,size=-1):
values = column.dropna().unique()
values.sort()
#
# Let's treat the case of missing values i.e nulls
#
row_count,col_count = column.size,values.size
if col_count > size and size > 0:
# N = np.divide(size,row_count).astype(int)
# N =
i = np.random.choice(col_count,size)
values = values[-i]
col_count = size
return values
def _Export(self,df) :
"""
This function will convert a data-frame to a binary matrix
:return _map,matrix
@ -191,12 +236,14 @@ class Binary :
#
# This will give us a map of how each column was mapped to a bitstream
_map = df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0)
# _map = df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0)
# _map = df.fillna(np.nan).apply(lambda column: column,axis=0)
print (df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0))
#
# We will merge this to have a healthy matrix
_matrix = _map.apply(lambda row: list(list(itertools.chain(*row.values.tolist()))),axis=1)
_matrix = np.matrix([list(item) for item in _matrix])
_matrix = np.matrix([list(item) for item in _matrix]).astype(np.float32)
#
# let's format the map so we don't have an unreasonable amount of data
#
@ -210,7 +257,8 @@ class Binary :
_m[name] = {"start":beg,"end":end}
beg = end
return _m,_matrix.astype(np.float32)
# return _m,_matrix.astype(np.float32)
return _matrix
def Import(self,df,values,_map):
"""
@ -237,37 +285,41 @@ if __name__ == '__main__' :
--pseudo will create pseudonyms for a given
--export will export data to a specified location
"""
has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
if has_basic and has_action :
builder = Builder()
if 'export' in SYS_ARGS :
print ()
print ("exporting ....")
if not os.path.exists(SYS_ARGS['export']) :
os.mkdir(SYS_ARGS['export'])
SQL = builder.encode(**SYS_ARGS)
#
# Assuming the user wants to filter the records returned :
#
df = pd.read_csv('sample.csv')
print ( df.race.value_counts())
print ( (Binary()).apply(df['race'], 3))
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
# if has_basic and has_action :
# builder = Builder()
# if 'export' in SYS_ARGS :
# print ()
# print ("exporting ....")
# if not os.path.exists(SYS_ARGS['export']) :
# os.mkdir(SYS_ARGS['export'])
# SQL = builder.encode(**SYS_ARGS)
# #
# # Assuming the user wants to filter the records returned :
# #
credentials = service_account.Credentials.from_service_account_file(SYS_ARGS['key'])
df = pd.read_gbq(SQL,credentials =credentials,dialect='standard')
FILENAME = os.sep.join([SYS_ARGS['export'],SYS_ARGS['table']+'.csv'])
#
# This would allow us to export it to wherever we see fit
print (FILENAME)
df.to_csv(FILENAME,index=False)
f = open(FILENAME.replace('.csv','.sql'),'w+')
f.write(SQL)
f.close()
elif 'pseudo' in SYS_ARGS :
builder.process(**SYS_ARGS)
else:
print ("")
print (SYS_ARGS.keys())
print ("has basic ",has_basic)
print ("has action ",has_action)
# credentials = service_account.Credentials.from_service_account_file(SYS_ARGS['key'])
# df = pd.read_gbq(SQL,credentials =credentials,dialect='standard')
# FILENAME = os.sep.join([SYS_ARGS['export'],SYS_ARGS['table']+'.csv'])
# #
# # This would allow us to export it to wherever we see fit
# print (FILENAME)
# df.to_csv(FILENAME,index=False)
# f = open(FILENAME.replace('.csv','.sql'),'w+')
# f.write(SQL)
# f.close()
# elif 'pseudo' in SYS_ARGS :
# builder.process(**SYS_ARGS)
# else:
# print ("")
# print (SYS_ARGS.keys())
# print ("has basic ",has_basic)
# print ("has action ",has_action)
# pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json')
# args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"}
# builder = Builder()

File diff suppressed because it is too large Load Diff

View File

@ -11,68 +11,592 @@ This package is designed to generate synthetic data from a dataset from an origi
import pandas as pd
import numpy as np
import data.gan as gan
from transport import factory
def train (**args) :
"""
This function is intended to train the GAN in order to learn about the distribution of the features
:column columns that need to be synthesized (discrete)
:logs where the output of the (location on disk)
:id identifier of the dataset
:data data-frame to be synthesized
:context label of what we are synthesizing
"""
column = args['column']
column_id = args['id']
df = args['data']
logs = args['logs']
real = pd.get_dummies(df[column]).astype(np.float32).values
labels = pd.get_dummies(df[column_id]).astype(np.float32).values
num_gpu = 1 if 'num_gpu' not in args else args['num_gpu']
max_epochs = 10 if 'max_epochs' not in args else args['max_epochs']
context = args['context']
if 'store' in args :
args['store']['args']['doc'] = context
logger = factory.instance(**args['store'])
import transport
# from data.bridge import Binary
import threading
from data.maker import prepare
from data.maker.state import State
import copy
import os
import nujson as json
from multiprocessing import Process, RLock
from datetime import datetime, timedelta
from multiprocessing import Queue
import time
class Learner(Process):
def __init__(self,**_args):
else:
logger = None
trainer = gan.Train(context=context,max_epochs=max_epochs,num_gpu=num_gpu,real=real,label=labels,column=column,column_id=column_id,logger = logger,logs=logs)
return trainer.apply()
super(Learner, self).__init__()
self.ndx = 0
self._queue = Queue()
self.lock = RLock()
if 'gpu' in _args :
os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu'])
self.gpu = int(_args['gpu'])
else:
self.gpu = None
self.info = _args['info']
self.columns = self.info['columns'] if 'columns' in self.info else None
self.store = _args['store']
if 'network_args' not in _args :
self.network_args ={
'context':self.info['context'] ,
'logs':_args['logs'] if 'logs' in _args else 'logs',
'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2,
'batch_size':int (_args['batch']) if 'batch' in _args else 2000
}
else:
self.network_args = _args['network_args']
self._encoder = None
self._map = None
self._df = _args['data'] if 'data' in _args else None
self.name = self.__class__.__name__
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
_log = {'action':'init','gpu':(self.gpu if self.gpu is not None else -1)}
self.log(**_log)
self.cache = []
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
self.logger = None
if 'logger' in self.store :
self.logger = transport.factory.instance(**self.store['logger'])
self.autopilot = False #-- to be set by caller
self._initStateSpace()
def _initStateSpace(self):
"""
Initializing state-space for the data-maker, The state-space functions are used as pre-post processing functions applied to the data accordingly i.e
- Trainer -> pre-processing
- Generation -> post processing
The specifications of a state space in the configuration file is as such
state:{pre:{path,pipeline:[]}, post:{path,pipeline:[]}}
"""
self._states = None
if 'state' in self.info :
try:
_config = self.info ['state']
self._states = State.instance(_config)
except Exception as e:
print (e)
pass
finally:
# __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records')
if self._states :
__info = {}
for key in self._states :
__info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key]]
self.log(object='state-space',action='load',input=__info)
def generate(**args):
def log(self,**_args):
try:
_context = self.info['context']
_label = self.info['info'] if 'info' in self.info else _context
# logger =
_args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args})
if 'logger' in self.store :
logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider=transport.providers.CONSOLE,context='write',lock=True)
logger.write(_args)
self.ndx += 1
# if hasattr(logger,'close') :
# logger.close()
pass
except Exception as e:
# print ()
# print (_args)
# print (e)
pass
finally:
pass
def get_schema(self):
# if self.store['source']['provider'] != 'bigquery' :
# return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
# else:
# reader = transport.factory.instance(**self.store['source'])
# return reader.meta(table=self.info['from'])
reader = transport.factory.instance(**self.store['source'])
return reader.meta(table=self.info['from'])
def initalize(self):
reader = transport.factory.instance(**self.store['source'])
_read_args= self.info
if self._df is None :
self._df = reader.read(**_read_args)
#
# NOTE : PRE
# At this point we apply pre-processing of the data if there were ever a need for it
#
_log = {}
HAS_STATES = self._states is not None and 'pre' in self._states
NOT_GENERATING = self.name in ['Trainer','Shuffle']
IS_AUTOPILOT = self.autopilot
#
# allow calling pre-conditions if either of the conditions is true
# 1. states and not generating
# 2. IS_GENERATING and states and not autopilot
_ALLOW_PRE_CALL = (HAS_STATES and NOT_GENERATING) or (NOT_GENERATING is False and HAS_STATES and IS_AUTOPILOT is False)
if _ALLOW_PRE_CALL :
# if HAS_STATES and NOT_GENERATING or (HAS_STATES and IS_AUTOPILOT is False and NOT_GENERATING is False):
_logs = {'action':'status','input':{'pre':self._states['pre']}}
_beg = list(self._df.shape)
self._df = State.apply(self._df,self._states['pre'])
_end = list(self._df.shape)
_logs['input']['size'] = _beg,_end
self.log(**_log)
#
#
columns = self.columns if self.columns else self._df.columns
#
# Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases
# - The code below tries to address the issue (Perhaps better suited for the reading components)
for name in columns :
#
# randomly sampling 5 elements to make sense of data-types
if self._df[name].size < 5 :
continue
_index = np.random.choice(np.arange(self._df[name].size),5,False)
no_value = [type(value) in [int,float,np.int64,np.int32,np.float32,np.float64] for value in self._df[name].values[_index]]
no_value = 0 if np.sum(no_value) > 0 else ''
try:
self._df[name] = self._df[name].fillna(no_value)
finally:
pass
_log[name] = self._df[name].dtypes.name
_log = {'action':'structure','input':_log}
self.log(**_log)
#
# convert the data to binary here ...
_schema = self.get_schema()
_args = {"schema":_schema,"data":self._df,"columns":columns}
if self._map :
_args['map'] = self._map
self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None
_log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } }
self.log(**_log)
def get(self):
if self.cache :
return self.cache if len(self.cache) > 0 else(self.cache if not self.cache else self.cache[0])
else:
return self._queue.get() if self._queue.qsize() > 0 else []
def listen(self):
while True :
_info = self._queue.get()
self.cache.append(_info)
self._queue.task_done()
def publish(self,caller):
if hasattr(caller,'_queue') :
_queue = caller._queue
_queue.put(self.cache)
# _queue.join()
pass
class Trainer(Learner):
"""
This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset
@return pandas.DataFrame
:data data-frame to be synthesized
:column columns that need to be synthesized (discrete)
:id column identifying an entity
:logs location on disk where the learnt knowledge of the dataset is
This will perform training using a GAN
"""
df = args['data']
column = args['column']
column_id = args['id']
logs = args['logs']
context = args['context']
num_gpu = 1 if 'num_gpu' not in args else args['num_gpu']
max_epochs = 10 if 'max_epochs' not in args else args['max_epochs']
def __init__(self,**_args):
super().__init__(**_args)
# self.info = _args['info']
self.limit = int(_args['limit']) if 'limit' in _args else None
self.autopilot = _args['autopilot'] if 'autopilot' in _args else False
self.generate = None
self.candidates = int(_args['candidates']) if 'candidates' in _args else 1
self.checkpoint_skips = _args['checkpoint_skips'] if 'checkpoint_skips' in _args else None
def run(self):
self.initalize()
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
_space,_matrix = self._encoder.convert()
_args = self.network_args
if self.gpu :
_args['gpu'] = self.gpu
_args['real'] = _matrix
_args['candidates'] = self.candidates
if 'logger' in self.store :
_args['logger'] = transport.factory.instance(**self.store['logger'])
if self.checkpoint_skips :
_args['checkpoint_skips'] = self.checkpoint_skips
#
# At this point we have the binary matrix, we can initiate training
#
beg = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S')
gTrain = gan.Train(**_args)
gTrain.apply()
writer = transport.factory.instance(provider=transport.providers.FILE,context='write',path=os.sep.join([gTrain.out_dir,'map.json']))
writer.write(self._encoder._map,overwrite=True)
writer.close()
#
# @TODO: At this point we need to generate another some other objects
#
_args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df}
_args['logs'] = self.network_args['logs']
_args['autopilot'] = self.autopilot
if self.gpu :
_args['gpu'] = self.gpu
#
# Let us find the smallest, the item is sorted by loss on disk
#
_epochs = [_e for _e in gTrain.logs['epochs'] if _e['path'] != '']
_epochs.sort(key=lambda _item: _item['loss'],reverse=False)
_args['network_args']['max_epochs'] = _epochs[0]['epochs']
self.log(action='autopilot',input={'epoch':_epochs[0]})
g = Generator(**_args)
# g.run()
end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S')
_min = float((end-beg).seconds/ 60)
_logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}}
self.log(**_logs)
self._g = g
if self.autopilot :
self._g.run()
#
#@TODO Find a way to have the data in the object ....
def generate (self):
if self.autopilot :
print( "Autopilot is set ... No need to call this function")
else:
raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object")
class Generator (Learner):
def __init__(self,**_args):
super().__init__(**_args)
#
# We need to load the mapping information for the space we are working with ...
#
self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1
filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json'])
self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}})
if os.path.exists(filename):
file = open(filename)
self._map = json.loads(file.read())
file.close()
else:
self._map = {}
self.autopilot = False if 'autopilot' not in _args else _args['autopilot']
def run(self):
self.initalize()
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
#
# The values will be returned because we have provided _map information from the constructor
#
values,_matrix = self._encoder.convert()
_args = self.network_args
_args['map'] = self._map
_args['values'] = np.array(values)
_args['row_count'] = self._df.shape[0]
if self.gpu :
_args['gpu'] = self.gpu
if 'logger' in self.store :
_args['logger'] = transport.factory.instance(**self.store['logger'])
gHandler = gan.Predict(**_args)
gHandler.load_meta(columns=None)
_iomatrix = gHandler.apply()
_candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix]
_size = np.sum([len(_item) for _item in _iomatrix])
_log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}}
self.log(**_log)
# self.cache = _candidates
self.post(_candidates)
def approximate(self,_df):
_columns = self.info['approximate']
for name in _columns :
if _df[name].size > 100 :
BATCH_SIZE = 10
else:
BATCH_SIZE = 1
batches = np.array_split(_df[name].fillna(np.nan).values,BATCH_SIZE)
_type = np.int64 if 'int' in self.info['approximate'][name]else np.float64
x = []
_log = {'action':'approximate','input':{'batch':BATCH_SIZE,'col':name}}
for values in batches :
index = [ _x not in ['',None,np.nan] for _x in values]
if np.sum(index) == 0:
#
# Sometimes messy data has unpleasant surprises
continue
_values = np.random.rand( len(values[index]))
_values += np.std(values[index]) / 4
values[index] = list(values[index] + _values )if np.random.randint(0,2) else list(values[index] - _values)
values[index] = values[index].astype(_type)
x += values.tolist()
if x :
_log['input']['identical_percentage'] = 100 * (np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size))
_df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
self.log(**_log)
return _df
def make_date(self,**_args) :
"""
:param year initial value
"""
if _args['year'] in ['',None,np.nan] :
return None
year = int(_args['year'])
offset = _args['offset'] if 'offset' in _args else 0
month = np.random.randint(1,13)
if month == 2:
_end = 28 if year % 4 != 0 else 29
else:
_end = 31 if month in [1,3,5,7,8,10,12] else 30
day = np.random.randint(1,_end)
#-- synthetic date
_date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0)
FORMAT = '%Y-%m-%d'
_name = _args['field'] if 'field' in _args else None
if 'format' in self.info and _name in self.info['format']:
# _name = _args['field']
FORMAT = self.info['format'][_name]
# print ([_name,FORMAT, _date.strftime(FORMAT)])
r = []
if offset :
r = [_date.strftime(FORMAT)]
for _delta in offset :
_date = _date + timedelta(_delta)
r.append(_date.strptime(FORMAT))
return r
else:
return _date.strftime(FORMAT)
pass
def format(self,_df,_schema):
r = {}
for _item in _schema :
name = _item['name']
if _item['type'].upper() in ['DATE','DATETIME','TIMESTAMP'] :
FORMAT = '%Y-%m-%d'
try:
#
#-- Sometimes data isn't all it's meant to be
SIZE = -1
if 'format' in self.info and name in self.info['format'] :
FORMAT = self.info['format'][name]
SIZE = 10
elif _item['type'] in ['DATETIME','TIMESTAMP'] :
FORMAT = '%Y-%m-%-d %H:%M:%S'
SIZE = 19
if SIZE > 0 :
values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64)
# _df[name] = [_date[:SIZE].strip() for _date in values]
# _df[name] = _df[name].astype(str)
r[name] = FORMAT
# _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]')
if _item['type'] in ['DATETIME','TIMESTAMP']:
pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]')
except Exception as e:
pass
finally:
pass
else:
#
# Because types are inferred on the basis of the sample being processed they can sometimes be wrong
# To help disambiguate we add the schema information
_type = None
if 'int' in _df[name].dtypes.name or 'int' in _item['type'].lower():
_type = np.int
elif 'float' in _df[name].dtypes.name or 'float' in _item['type'].lower():
_type = np.float
if _type :
_df[name] = _df[name].fillna(0).replace(' ',0).replace('',0).replace('NA',0).replace('nan',0).astype(_type)
# else:
# _df[name] = _df[name].astype(str)
# _df = _df.replace('NaT','').replace('NA','')
if r :
self.log(**{'action':'format','input':r})
return _df
pass
def post(self,_candidates):
if 'target' in self.store :
_store = self.store['target'] if 'target' in self.store else {'provider':'console'}
_store['lock'] = True
_store['context'] = 'write' #-- Just in case
if 'table' not in _store :
_store['table'] = self.info['from']
else:
_store = None
N = 0
for _iodf in _candidates :
_df = self._df.copy()
_df[self.columns] = _iodf[self.columns]
N += _df.shape[0]
if self._states and 'post' in self._states:
_df = State.apply(_df,self._states['post'])
# #
# #@TODO:
# # Improve formatting with better post-processing pipeline
# if 'approximate' in self.info :
# _df = self.approximate(_df)
# if 'make_date' in self.info :
# for name in self.info['make_date'] :
# # iname = self.info['make_date']['init_field']
# iname = self.info['make_date'][name]
# years = _df[iname]
# _dates = [self.make_date(year=_year,field=name) for _year in years]
# if _dates :
# _df[name] = _dates
_schema = self.get_schema()
_df = self.format(_df,_schema)
_log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ]
self.log(**{"action":"consolidate","input":_log})
if _store :
writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery':
writer.write(_df,schema=[],table=self.info['from'])
else:
writer.write(_df,table=self.info['from'])
else:
self.cache.append(_df)
self.log(**{'action':'write','input':{'rows':N,'candidates':len(_candidates)}})
class Shuffle(Generator):
"""
This is a method that will yield data with low utility
"""
def __init__(self,**_args):
super().__init__(**_args)
def run(self):
np.random.seed(1)
self.initalize()
_index = np.arange(self._df.shape[0])
np.random.shuffle(_index)
np.random.shuffle(_index)
_iocolumns = self.info['columns']
_ocolumns = list(set(self._df.columns) - set(_iocolumns) )
# _iodf = pd.DataFrame(self._df[_ocolumns],self._df.loc[_index][_iocolumns],index=np.arange(_index.size))
_iodf = pd.DataFrame(self._df[_iocolumns].copy(),index = np.arange(_index.size))
# self._df = self._df.loc[_index][_ocolumns].join(_iodf)
self._df = self._df.loc[_index][_ocolumns]
self._df.index = np.arange(self._df.shape[0])
self._df = self._df.join(_iodf)
#
# The following is a full shuffle
self._df = self._df.loc[_index]
self._df.index = np.arange(self._df.shape[0])
_log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}}
self.log(**_log)
try:
self.post([self._df])
self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}})
except Exception as e :
# print (e)
self.log(**{'action':'failed','input':{'msg':e,'info':self.info}})
class apply :
TRAIN,GENERATE,RANDOM = 'train','generate','random'
class factory :
_infocache = {}
@staticmethod
def instance(**_args):
"""
An instance of an object that trains and generates candidate datasets
:param gpu (optional) index of the gpu to be used if using one
:param store {source,target} if no target is provided console will be output
:param epochs (default 2) number of epochs to train
:param candidates(default 1) number of candidates to generate
:param info {columns,sql,from}
:param autopilot will generate output automatically
:param batch (default 2k) size of the batch
"""
if _args['apply'] in [apply.RANDOM] :
pthread = Shuffle(**_args)
elif _args['apply'] == apply.GENERATE :
pthread = Generator(**_args)
else:
pthread= Trainer(**_args)
if 'start' in _args and _args['start'] == True :
pthread.start()
return pthread
class plugins:
@staticmethod
def load(_config):
"""
This function attempts to load the plugins to insure they are valid
_config configuration for plugin specifications {pre:{pipeline,path},post:{pipeline,path}}
"""
#
#@TODO:
# If the identifier is not present, we should fine a way to determine or make one
#
#ocolumns= list(set(df.columns.tolist())- set(columns))
values = df[column].unique().tolist()
values.sort()
labels = pd.get_dummies(df[column_id]).astype(np.float32).values
handler = gan.Predict (context=context,label=labels,max_epochs=max_epochs,num_gpu=num_gpu,values=values,column=column,logs=logs)
handler.load_meta(column)
r = handler.apply()
_df = df.copy()
_df[column] = r[column]
return _df

View File

@ -1,10 +0,0 @@
import pandas as pd
import data.maker
df = pd.read_csv('sample.csv')
column = 'gender'
id = 'id'
context = 'demo'
store = {"type":"mongo.MongoWriter","args":{"host":"localhost:27017","dbname":"GAN"}}
max_epochs = 11
data.maker.train(store=store,max_epochs=max_epochs,context=context,data=df,column=column,id=id,logs='foo')

76
data/maker/apply.py Normal file
View File

@ -0,0 +1,76 @@
"""
This file is designed to specify the appliction of pre/post-processing code.
The pre-processing code gets applied after the data has been loaded
The post-processing code get applied after the data has been generated for instance:
-approximation code/logic; date shifting; suppression; adding noise
-
"""
import numpy as np
from datetime import datetime, timedelta
import time
class Phase:
def __init__(self,**_args):
self._df = _args['data']
self.callback = _args['callback']
def apply(self,**_args):
"""
:param data data-frame
:param _info arguments needed to be applied
:param callback callback function once done
"""
raise Exception ("Function needs to be Implemented")
class Pre(Phase):
pass
class Post(Phase):
def __init__(self,**_args):
super().__init__(**_args)
pass
class Date(Post):
def __init__(self,**_args):
super().__init__(**_args)
def make(self,**_args):
"""
This function generates a random date given a year and optionally a set of days from the randomly generated date
:param year initial value of a year
:param offset list of days between initial date
"""
if _args['year'] in ['',None,np.nan] :
return None
year = int(_args['year'])
offset = _args['offset'] if 'offset' in _args else 0
month = np.random.randint(1,13)
if month == 2:
_end = 28 if year % 4 != 0 else 29
else:
_end = 31 if month in [1,3,5,7,8,10,12] else 30
day = np.random.randint(1,_end)
#-- synthetic date
_date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0)
FORMAT = '%Y-%m-%d' if 'format' not in _args else _args['format']
# print ([_name,FORMAT, _date.strftime(FORMAT)])
r = []
if offset :
r = [_date.strftime(FORMAT)]
for _delta in offset :
_date = _date + timedelta(_delta)
r.append(_date.strptime(FORMAT))
return r
else:
return _date.strftime(FORMAT)
def apply(self,**_args):
"""
"""
pass
class Approximate(Post):
def apply(**_args):
pass
def applyWithRange(**_args):

View File

@ -0,0 +1,284 @@
"""
(c) 2018 - 2021, Vanderbilt University Medical Center
Steve L. Nyemba, steve.l.nyemba@vumc.org
This file is designed to handle preconditions for a generative adversarial network:
- The file will read/get data from a source specified by transport (or data-frame)
- The class will convert the data to a binary vector
- The class will also help rebuild the data from a binary matrix.
Usage :
"""
import transport
import json
import pandas as pd
import numpy as np
# import cupy as cp
import sys
import os
#
# The following is to address the issue over creating a large matrix ...
#
# from multiprocessing import Process, Queue
# if 'GPU' in os.environ :
# import cupy as np
# else:
# import numpy as np
class void:
pass
class Hardware :
"""
This class is intended to allow the use of hardware i.e GPU, index or CPU
"""
pass
class Input :
class NOVALUES :
RANDOM,IGNORE,ALWAYS = ['random','ignore','always']
"""
This class is designed to read data from a source and and perform a variet of operations :
- provide a feature space, and rows (matrix profile)
- a data index map
"""
def __init__(self,**_args):
"""
:param data
:param store data-store parameters/configuration
:param sql sql query that pulls a representative sample of the data
"""
self._schema = _args['schema'] if 'schema' in _args else {}
#
# schema data should be in a hash map for these purposes
#
# if self._schema :
# r = {}
# for _item in self._schema :
# r[_item['name']] = r[_item['type']]
# self._schema = r
self.df = _args['data']
if 'sql' not in _args :
self._initdata(**_args)
#
pass
else:
self._initsql(**_args)
#
# We need to have a means to map of values,columns and vector positions in order
# to perform convert and revert to and from binary
#
self._map = {} if 'map' not in _args else _args['map']
def _initsql(self,**_args):
"""
This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized
:param store data-store configuration
:param columns list of columns to be
"""
if 'columns' not in _args :
self._initcols(data=self.df)
else:
self._initcols(data=self.df,columns=_args['columns'])
pass
def _init_map(self,values):
self._map = dict(zip(np.arange(len(values)),values))
for key in self._map :
self._map[key] = self._map[key].tolist()
def _initcols (self,**_args) :
"""
This function will initialize the columns to be synthesized and/or determine which ones can be synthesized
:param data data-frame that holds the data (matrix)
:param columns optional columns to be synthesized
"""
# df = _args['data'].copy()
row_count = self.df.shape[0]
cols = None if 'columns' not in _args else _args['columns']
self.columns = self.df.columns.tolist()
self._io = []
if 'columns' in _args :
self._columns = _args['columns']
# else:
#
# We will look into the count and make a judgment call
try:
# _df = pd.DataFrame(self.df.apply(lambda col: col.dropna().unique().size )).T
# MIN_SPACE_SIZE = 2
# self._columns = cols if cols else _df.apply(lambda col:None if col[0] == row_count or col[0] < MIN_SPACE_SIZE else col.name).dropna().tolist()
# self._io = _df.to_dict(orient='records')
_df = pd.DataFrame(self.df.nunique().T / self.df.shape[0]).T
self._io = (_df.to_dict(orient='records'))
except Exception as e:
print (e)
self._io = []
def _initdata(self,**_args):
"""
This function will initialize the class with a data-frame and columns of interest (if any)
:param data data-frame that holds the data
:param columns columns that need to be synthesized if any
"""
self._initcols(**_args)
def _convert(self,**_args):
"""
This function will convert a data-frame into a binary matrix and provide a map to be able to map the values back to the matrix
:param columns in case we specify the columns to account for (just in case the original assumptions don't hold)
"""
if 'columns' in _args or 'column' in _args :
columns = _args['columns'] if 'columns' in _args else [_args['column']]
else:
columns = self._columns
_df = self.df if 'data' not in _args else _args['data']
#
# At this point we have the list of features we want to use
i = 0
_m = np.array([])
_values = []
for name in columns :
#
# In case we have dataset with incomplete value space, we should still be able to generate something meaningful
#
values = None if name not in self._map else list(self._map[name]['values'])
_type = self._schema[name] if name in self._schema else _df[name].dtype
cols, _matrix = self.tobinary(_df[name],values)
_beg,_end = i,i+len(cols)
if name not in self._map :
self._map[name] = {"beg":_beg,"end":_end ,"values":cols.tolist()}
i += len(cols)
if not _m.shape[0]:
_m = _matrix ;
else:
_m = np.concatenate((_m,_matrix),axis=1)
if values :
_values += list(values)
#
# @NOTE:
# The map should allow us to be able to convert or reconvert the binary matrix to whatever we want ...
#
# self._matrix = _m
return _values,_m
def _revert(self,**_args) :
"""
This function will take in a binary matrix and based on the map of values it will repopulate it with values
:param _matrix binary matrix
:param column|columns column name or columns if the column is specified
"""
_column = _args['column'] if 'column' in _args else None
matrix = _args['matrix']
row_count = matrix.shape[0]
r = {}
for key in self._map :
if _column and key != _column :
continue
_item = self._map[key]
_beg = _item['beg']
_end = _item['end']
columns = np.array(_item['values'])
#
# @NOTE: We are accessing matrices in terms of [row,col],
# The beg,end variables are for the columns in the matrix (mini matrix)
#
# if not _column :
# _matrix = matrix[:,_beg:_end] #-- The understanding is that _end is not included
# else:
# _matrix = matrix
_matrix = matrix[:,_beg:_end]
#
# vectorize the matrix to replace the bits by their actual values (accounting for the data-types)
# @TODO: Find ways to do this on a GPU (for big data) or across threads
#
row_count = _matrix.shape[0]
# r[key] = [columns[np.where(row == 1) [0][0] ] for row in _matrix[:,_beg:_end]]
r[key] = [columns[np.where(row==1)[0][0]] if np.where(row==1)[0].size > 0 else '' for row in _matrix]
#
# we should consider decoding the matrix if possible
#
return pd.DataFrame(r)
def tobinary(self,rows,cols=None) :
"""
This function will compile a binary matrix from a row of values this allows hopefully this can be done in parallel, this function can be vectorized and processed
:param rows np.array or list of vector of values
:param cols a space of values if it were to be different fromt he current sample.
"""
if not cols:
#
# In the advent the sample rows do NOT have the values of the
cols = rows.unique()
cols = np.array(cols)
row_count = np.int64(len(rows))
# if 'GPU' not in os.environ :
# _matrix = np.zeros([row_count,cols.size],dtype=int)
#
# @NOTE: For some reason, there is an out of memory error created here, this seems to fix it (go figure)
#
_matrix = np.array([np.repeat(0,cols.size) for i in range(0,row_count)])
[np.put(_matrix[i], np.where(cols == rows[i]) ,1)for i in np.arange(row_count) if np.where(cols == rows[i])[0].size > 0]
# else:
# _matrix = cp.zeros([row_count,cols.size])
# [cp.put(_matrix[i], cp.where(cols == rows[i]),1)for i in cp.arange(row_count) ]
# _matrix = _matrix.asnumpy()
return cols,_matrix
def convert(self,**_args):
if 'columns' in _args or 'column' in _args :
columns = _args['columns'] if 'columns' in _args else [_args['column']]
else:
columns = self._columns
_df = self.df if 'data' not in _args else _args['data']
_values,_matrix = self.encode(_df,columns)
_, _matrix = self.tobinary(_matrix)
self._init_map(_values)
return _values,_matrix #-- matrix has been updated !
def revert(self,**_args):
# _columns = _args['column'] if 'column' in _args else None
_matrix = _args['matrix']
# print (_matrix)
return self.decode(_matrix,columns=self._columns)
pass
def encode(self,df,columns) :
_df = df[columns].drop_duplicates()
_values = _df.values.tolist()
_encoded = df[columns].apply(lambda row: _values.index( list(row)) ,axis=1)
return np.array(_values),_encoded
def decode (self,_matrix,**_args):
#
# _matrix binary matrix
#
columns = _args['columns']
_values = np.array( list(self._map.values()))
_matrix = pd.DataFrame(_matrix) #if type(_matrix) != pd.DataFrame else _matrix
# x = _matrix.apply(lambda row: _values[row.values == 1 ].tolist()[0] if row.values.sum() > 0 else None, axis=1).tolist()
#@TODO: Provide random values for things that are missing
# x = _matrix.apply(lambda row: _values[row.values == 1].tolist()[0] if (row.values == 1).sum() > 0 else np.repeat(None,len(self._columns)) ,axis=1).tolist()
#
# @TODO: Provide a parameter to either:
# - missing = {outlier,random,none}
# - outlier: select an outlier, random: randomly select a value, none: do nothing ...
#
if np.random.choice([0,1],1)[0] :
novalues = _values[np.random.choice( len(_values),1)[0]].tolist()
else:
novalues = np.repeat(None,len(self._columns))
x = _matrix.apply(lambda row: _values[row.values == 1].tolist()[0] if (row.values == 1).sum() > 0 else novalues ,axis=1).tolist()
return pd.DataFrame(x,columns=columns)

View File

@ -0,0 +1 @@
__init__.py

View File

@ -0,0 +1,105 @@
"""
This file handles state-space of the data training/generation process i.e Upon specification of the pre/post conditiions
"""
"""
This file handles state-space of the data training/generation process i.e Upon specification of the pre/post conditions,
The specifications for this are as follows (within an entry of the configuration)
{
"state":{
"pre":[{"approximate":{"field":"int"}},{"newdate":{"field":"format"}}],"post":[{"limit":10}]
}
}
"""
import importlib
import importlib.util
import sys
from datetime import datetime
from data.maker.state.default import *
import os
class State :
@staticmethod
def apply(_data,lpointers):
"""
This function applies a pipeline against a given data-frame, the calling code must decide whether it is a pre/post
:_data data-frame
:_lpointers functions modules returned by instance (module,_args)
"""
for _item in lpointers :
if _item is None :
continue
pointer = _item['module']
_args = _item['args']
_data = pointer(_data,_args)
return _data
@staticmethod
def instance(_args):
pre = []
post=[]
out = {}
for key in _args :
#
# If the item has a path property is should be ignored
path = _args[key]['path'] if 'path' in _args[key] else ''
out[key] = [ State._build(dict(_item,**{'path':path})) if 'path' not in _item else State._build(_item) for _item in _args[key]['pipeline']]
return out
# if 'pre' in _args:
# path = _args['pre']['path'] if 'path' in _args['pre'] else ''
# pre = [ State._build(dict(_item,**{'path':path})) for _item in _args['pre']['pipeline']]
# else:
# path = _args['post']['path'] if 'path' in _args['post'] else ''
# post = [ State._build(dict(_item,**{'path':path})) for _item in _args['post']['pipeline']]
# return {'pre':pre,'post':post}
@staticmethod
def _extract(_entry):
_name = list(set(_entry.keys()) - set(['path']) )
_name = _name[0]
path = _entry['path'] if 'path' in _entry and os.path.exists(_entry['path']) else ''
return {"module": _name,"args": _entry[_name],'name':_name,'path':path}
pass
@staticmethod
def _build(_args):
_info = State._extract(_args)
# _info = dict(_args,**_info)
_info['module'] = State._instance(_info)
return _info if _info['module'] is not None else None
@staticmethod
def _instance(_args):
"""
:path optional path of the file on disk
:module name of the function
"""
_name = _args['module']
if 'path' in _args and os.path.exists(_args['path']):
path= _args['path']
spec = importlib.util.spec_from_file_location(_name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
else:
#
# Probably calling a built-in module (should be in this file)
module = sys.modules['data.maker.state.default']
return getattr(module,_name) if hasattr(module,_name) else None
#
# Adding a few custom functions that should be able to help ....
# These functions can be called without specifying a path
#

116
data/maker/state/default.py Normal file
View File

@ -0,0 +1,116 @@
"""
This file contains default functions applied to a data-frame/dataset as pre/post processing jobs.
The functions are organized in a pipeline i.e the data will be applied to each function
Custom functions :
functions must tak 2 arguments (_data,_args) : where _data is a data frame and _arg is a object describing the input parameters
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def limit(_data,size):
"""
...,{limit:size}
"""
# size = int(_args['limit'])
return _data.iloc[:size]
def format(_data,_schema):
"""
This function enforces a schema against a data-frame, this may or may not work depending on the persistence storage
:_data data-frame containing all data
:_args schema to enforce the data, we are expecting the format as a list of {name,type,description}
"""
return _data
def approximate(_data,_args):
"""
:_args Object of {field:type}
This function will approximate n-fields in the data given it's distribution
"""
_m = {'int':int,'float':float,'integer':int,'double':float}
columns = list(_args.keys())
for _name in columns :
if _name not in _data :
continue
otype = _args[_name]
otype = str if otype not in _m else _m[otype]
_data.loc[:,_name] = np.random.uniform(_data[_name].values).astype(otype)
return _data
def split_date(_data,_args):
"""
This function takes a field and applies the format from other fields
:_data data-frame
:_config configuration entry {column:{format,column:format,type}}
"""
_columns = list(_args.keys())
_m = {'int':int,'float':float,'integer':int,'double':float}
for _name in _columns :
_iname = _args[_name]['column']
_iformat = _args[_name]['format']['in']
_oformat = _args[_name]['format']['out']
_otype = str if 'type' not in _args[_name] else _args[_name]['type']
_data.loc[:,_name] = _data[_iname].apply(lambda _date: datetime.strftime(datetime.strptime(str(_date),_iformat),_oformat)).astype(_otype)
return _data
def newdate(_data,_args):
"""
This function creates a new data on a given column from another
:_data data frame
:_args configuration column:{format,column}
"""
_columns = list(_args.keys())
for _name in _columns :
format = _args[_name]['format']
ROW_COUNT = _data[_name].size
if 'column' in _args[_name] :
srcName = _args[_name]['column']
years = _data[srcName].values
else:
years = np.random.choice(np.arange(datetime.now().year- 90,datetime.now().year),ROW_COUNT)
_data.loc[:,_name] = [ _makedate(year = years[_index],format = format) for _index in np.arange(ROW_COUNT)]
return _data
def _makedate(**_args):
"""
This function creates a new date and applies it to a column
:_data data-frame with columns
:_args arguments for col1:format
"""
_columns = list(_args.keys())
# if _args['year'] in ['',None,np.nan] :
# year = np.random.choice(np.arange(1920,222),1)
# else:
# year = int(_args['year'])
year = int(_args['year'])
offset = _args['offset'] if 'offset' in _args else 0
month = np.random.randint(1,13)
if month == 2:
_end = 28 if year % 4 != 0 else 29
else:
_end = 31 if month in [1,3,5,7,8,10,12] else 30
day = np.random.randint(1,_end)
#-- synthetic date
_date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0)
FORMAT = '%Y-%m-%d'
if 'format' in _args:
FORMAT = _args['format']
# print ([_name,FORMAT, _date.strftime(FORMAT)])
r = []
if offset :
r = [_date.strftime(FORMAT)]
for _delta in offset :
_date = _date + timedelta(_delta)
r.append(_date.strptime(FORMAT))
return r
else:
return _date.strftime(FORMAT)

View File

@ -1,6 +1,6 @@
import sys
SYS_ARGS = {'context':''}
SYS_ARGS = {}
if len(sys.argv) > 1:
N = len(sys.argv)
@ -9,8 +9,10 @@ if len(sys.argv) > 1:
if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1
if i + 1 < N:
if i + 1 < N and not sys.argv[i + 1].startswith('--'):
value = sys.argv[i + 1] = sys.argv[i+1].strip()
else:
value = None
if key and value:
SYS_ARGS[key] = value

303
drive/pipeline.py Normal file
View File

@ -0,0 +1,303 @@
#!/usr/bin/env python3
import json
from transport import factory
import numpy as np
import os
from multiprocessing import Process
import pandas as pd
from google.oauth2 import service_account
import data.maker
from data.params import SYS_ARGS
#
# The configuration array is now loaded and we will execute the pipe line as follows
DATASET='combined20190510'
class Components :
@staticmethod
def get(args):
"""
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
:sql basic sql statement
:condition optional condition and filters
"""
SQL = args['sql']
if 'condition' in args :
condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')'])
SQL = " ".join([SQL,'WHERE',condition])
SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 "
if 'limit' in args :
SQL = SQL + 'LIMIT ' + args['limit']
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
return df
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
@staticmethod
def split(X,MAX_ROWS=3,PART_SIZE=3):
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
def train(self,**args):
"""
This function will perform training on the basis of a given pointer that reads data
"""
#
# @TODO: we need to log something here about the parameters being passed
pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args)
df = pointer()
#
# Now we can parse the arguments and submit the entire thing to training
#
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = args['logs'] if 'logs' in args else 'logs'
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = args['part_size'] if 'part_size' in args else 0
if df.shape[0] > MAX_ROWS and 'partition' not in args:
lbound = 0
bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'})
for b in bounds :
part_index = bounds.index(b)
ubound = int(b.right)
_data = df.iloc[lbound:ubound][args['columns']]
lbound = ubound
# _args['logs'] = os.sep.join([log_folder,str(part_index)])
_args['partition'] = str(part_index)
_args['logger'] = {'args':{'dbname':'aou','doc':args['context']},'type':'mongo.MongoWriter'}
#
# We should post the the partitions to a queue server (at least the instructions on ):
# - where to get the data
# - and athe arguments to use (partition #,columns,gpu,epochs)
#
info = {"rows":_data.shape[0],"cols":_data.shape[1], "paritition":part_index,"logs":_args['logs']}
p = {"args":_args,"data":_data.to_dict(orient="records"),"info":info}
qwriter.write(p)
#
# @TODO:
# - Notify that information was just posted to the queue
info['max_rows'] = MAX_ROWS
info['part_size'] = PART_SIZE
logger.write({"module":"train","action":"setup-partition","input":info})
pass
else:
partition = args['partition'] if 'partition' in args else ''
log_folder = os.sep.join([log_folder,args['context'],partition])
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
_args['data'] = df
#
# @log :
# Logging information about the training process for this partition (or not)
#
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":partition,"logs":_args['logs']}
logger.write({"module":"train","action":"train","input":info})
data.maker.train(**_args)
pass
# @staticmethod
def generate(self,args):
"""
This function will generate data and store it to a given,
"""
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = args['logs'] if 'logs' in args else 'logs'
partition = args['partition'] if 'partition' in args else ''
log_folder = os.sep.join([log_folder,args['context'],partition])
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
_args['no_value']= args['no_value']
MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = args['part_size'] if 'part_size' in args else 0
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
reader = args['reader']
df = reader()
if 'partition' in args :
bounds = Components.split(df,MAX_ROWS,PART_SIZE)
# bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
lbound = int(bounds[int(partition)].left)
ubound = int(bounds[int(partition)].right)
df = df.iloc[lbound:ubound]
_args['data'] = df
# _args['data'] = reader()
#_args['data'] = _args['data'].astype(object)
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
_dc = data.maker.generate(**_args)
#
# We need to post the generate the data in order to :
# 1. compare immediately
# 2. synthetic copy
#
cols = _dc.columns.tolist()
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
for name in cols :
_args['data'][name] = _dc[name]
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
if partition != '' :
info['partition'] = partition
logger.write(info)
# filename = os.sep.join([log_folder,'output',name+'.csv'])
# data_comp[[name]].to_csv(filename,index=False)
#
#-- Let us store all of this into bigquery
prefix = args['notify']+'.'+_args['context']
table = '_'.join([prefix,partition,'io']).replace('__','_')
folder = os.sep.join([args['logs'],args['context'],partition,'output'])
if 'file' in args :
_fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
_pname = os.sep.join([folder,table])+'.csv'
data_comp.to_csv( _pname,index=False)
_args['data'].to_csv(_fname,index=False)
else:
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_pname = os.sep.join([folder,table+'.csv'])
_fname = table.replace('_io','_full_io')
data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000)
data_comp.to_csv(_pname,index=False)
INSERT_FLAG = 'replace' if 'partition' not in args else 'append'
_args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000)
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} }
if partition :
info ['partition'] = partition
logger.write({"module":"generate","action":"write","info":info} )
@staticmethod
def callback(channel,method,header,stream):
info = json.loads(stream)
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
logger.write({'module':'process','action':'read-partition','input':info['info']})
df = pd.DataFrame(info['data'])
args = info['args']
if int(args['num_gpu']) > 1 and args['gpu'] > 0:
args['gpu'] = args['gpu'] + args['num_gpu']
args['reader'] = lambda: df
#
# @TODO: Fix
# There is an inconsistency in column/columns ... fix this shit!
#
args['columns'] = args['column']
(Components()).train(**args)
logger.write({"module":"process","action":"exit","info":info["info"]})
channel.close()
channel.connection.close()
pass
if __name__ == '__main__' :
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
f = open (filename)
PIPELINE = json.loads(f.read())
f.close()
index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else 0
args = (PIPELINE[index])
args['dataset'] = 'combined20190510'
args = dict(args,**SYS_ARGS)
args['max_rows'] = int(args['max_rows']) if 'max_rows' in args else 3
args['part_size']= int(args['part_size']) if 'part_size' in args else 3
#
# @TODO:
# Log what was initiated so we have context of this processing ...
#
if 'listen' not in SYS_ARGS :
if 'file' in args :
reader = lambda: pd.read_csv(args['file']) ;
else:
reader = lambda: Components().get(args)
args['reader'] = reader
if 'generate' in SYS_ARGS :
#
# Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],args['context']]))
generator = Components()
if ''.join(content).isnumeric() :
#
# we have partitions we are working with
for id in ''.join(content) :
args['partition'] = id
generator.generate(args)
else:
generator.generate(args)
# Components.generate(args)
elif 'listen' in args :
#
# This will start a worker just in case to listen to a queue
if 'read' in SYS_ARGS :
QUEUE_TYPE = 'queue.QueueReader'
pointer = lambda qreader: qreader.read(1)
else:
QUEUE_TYPE = 'queue.QueueListener'
pointer = lambda qlistener: qlistener.listen()
N = int(SYS_ARGS['jobs']) if 'jobs' in SYS_ARGS else 1
qhandlers = [factory.instance(type=QUEUE_TYPE,args={'queue':'aou.io'}) for i in np.arange(N)]
jobs = []
for qhandler in qhandlers :
qhandler.callback = Components.callback
job = Process(target=pointer,args=(qhandler,))
job.start()
jobs.append(job)
#
# let us wait for the jobs
print (["Started ",len(jobs)," trainers"])
while len(jobs) > 0 :
jobs = [job for job in jobs if job.is_alive()]
# pointer(qhandler)
# qreader.read(1)
pass
else:
trainer = Components()
trainer.train(**args)
# Components.train(**args)
#for args in PIPELINE :
#args['dataset'] = 'combined20190510'
#process = Process(target=Components.train,args=(args,))
#process.name = args['context']
#process.start()
# Components.train(args)

692
pipeline.py Normal file
View File

@ -0,0 +1,692 @@
#!/usr/bin/env python3
import json
from transport import factory
import numpy as np
import time
import os
from multiprocessing import Process, Lock
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery as bq
import data.maker
import copy
from data.params import SYS_ARGS
#
# The configuration array is now loaded and we will execute the pipe line as follows
class Components :
lock = Lock()
class KEYS :
PIPELINE_KEY = 'pipeline'
SQL_FILTER = 'filter'
@staticmethod
def get_filter (**args):
if args['qualifier'] == 'IN' :
return ' '.join([args['field'],args['qualifier'],'(',args['value'],')'])
else:
return ' '.join([args['field'],args['qualifier'],args['value']])
@staticmethod
def get_logger(**args) :
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
@staticmethod
def get(args):
"""
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
:sql basic sql statement
:condition optional condition and filters
"""
SQL = args['sql']
if Components.KEYS.SQL_FILTER in args :
FILTER_KEY = Components.KEYS.SQL_FILTER
SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]]
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER])
SQL = " ".join([SQL,'WHERE',condition])
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
if 'limit' in args :
SQL = SQL + ' LIMIT ' + args['limit']
#
# let's log the sql query that has been performed here
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}})
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
return df
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
@staticmethod
def split(X,MAX_ROWS=3,PART_SIZE=3):
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
def format_schema(self,schema):
_schema = {}
for _item in schema :
_type = int
_value = 0
if _item.field_type == 'FLOAT' :
_type =float
elif _item.field_type != 'INTEGER' :
_type = str
_value = ''
_schema[_item.name] = _type
return _schema
def get_ignore(self,**_args) :
if 'columns' in _args and 'data' in _args :
_df = _args['data']
terms = _args['columns']
return [name for name in _df.columns if np.sum( [int(field in name )for field in terms ]) ]
return []
def set_gpu(self,**_args) :
if 'gpu' in _args :
gpu = _args['gpu'] if type(_args['gpu']) != str else [_args['gpu']]
_index = str(gpu[0])
os.environ['CUDA_VISIBLE_DEVICES'] = _index
return gpu
else :
return None
def train(self,**args):
"""
This function will perform training on the basis of a given pointer that reads data
"""
schema = None
if 'file' in args :
df = pd.read_csv(args['file'])
del args['file']
elif 'data' not in args :
reader = factory.instance(**args['store']['source'])
if 'row_limit' in args :
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None
else:
df = args['data']
#
#
# df = df.fillna('')
if schema :
_schema = []
for _item in schema :
_type = int
_value = 0
if _item.field_type == 'FLOAT' :
_type =float
elif _item.field_type != 'INTEGER' :
_type = str
_value = ''
_schema += [{"name":_item.name,"type":_item.field_type}]
df[_item.name] = df[_item.name].fillna(_value).astype(_type)
args['schema'] = _schema
# df[_item.name] = df[_item.name].astype(_type)
_args = copy.deepcopy(args)
# _args['store'] = args['store']['source']
_args['data'] = df
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if 'continuous' in args :
x_cols = args['continuous']
else:
x_cols = []
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
_args['data'] = df[ list(set(df.columns)- set(_cols))]
#
# We need to make sure that continuous columns are removed
if x_cols :
_args['data'] = _args['data'][list(set(_args['data'].columns) - set(x_cols))]
if 'gpu' in args :
_args['gpu'] = self.set_gpu(gpu=args['gpu'])
if 'partition' in args :
_args['partition'] = args['partition']
if df.shape[0] and df.shape[0] :
#
# We have a full blown matrix to be processed
print ('-- Training --')
data.maker.train(**_args)
else:
print ("... skipping training !!")
if 'autopilot' in ( list(args.keys())) :
args['data'] = df
print (['autopilot mode enabled ....',args['context']])
self.generate(args)
pass
def approximate(self,values):
"""
:param values array of values to be approximated
"""
if values.dtype in [int,float] :
#
# @TODO: create bins?
r = np.random.dirichlet(values+.001) #-- dirichlet doesn't work on values with zeros
_sd = values[values > 0].std()
_me = values[values > 0].mean()
_mi = values.min()
x = []
_type = values.dtype
for index in np.arange(values.size) :
if np.random.choice([0,1],1)[0] :
value = values[index] + (values[index] * r[index])
else :
value = values[index] - (values[index] * r[index])
#
# randomly shifting the measurements
if np.random.choice([0,1],1)[0] and _me > _sd :
if np.random.choice([0,1],1)[0] :
value = value * np.divide(_me,_sd)
else:
value = value + (np.divide(_me,_sd))
value = int(value) if _type == int else np.round(value,2)
x.append( value)
np.random.shuffle(x)
return np.array(x)
else:
return values
pass
def shuffle(self,_args):
if 'data' in args :
df = data['data']
else:
reader = factory.instance(**args['store']['source'])
if 'file' in args :
df = pd.read_csv(args['file'])
elif 'data' in _args :
df = _args['data']
else:
if 'row_limit' in args and 'sql' in args:
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
schema = None
if 'schema' not in args and hasattr(reader,'meta') and 'file' not in args:
schema = reader.meta(table=args['from'])
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
#
# We are shufling designated colmns and will be approximating the others
#
x_cols = [] #-- coumns tobe approximated.
_cols = [] #-- columns to be ignored
if 'continuous' in args :
x_cols = args['continuous']
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
columns = args['columns'] if 'columns' in args else df.columns
columns = list(set(columns) - set(_cols))
for name in columns:
i = np.arange(df.shape[0])
np.random.shuffle(i)
if name in x_cols :
if df[name].unique().size > 0 :
df[name] = self.approximate(df.iloc[i][name].fillna(0).values)
# df[name] = df[name].astype(str)
# pass
df.index = np.arange(df.shape[0])
self.post(data=df,schema=schema,store=args['store']['target'])
def post(self,**_args) :
table = _args['from'] if 'from' in _args else _args['store']['table']
_schema = _args['schema'] if 'schema' in _args else None
writer = factory.instance(**_args['store'])
_df = _args['data']
if _schema :
columns = []
for _item in _schema :
name = _item['name']
_type = str
_value = 0
if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] :
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
#
# There is an issue with missing dates that needs to be resolved.
# for some reason a missing date/time here will cause the types to turn into timestamp (problem)
# The following is a hack to address the issue (alas) assuming 10 digit dates and 'NaT' replaces missing date values (pandas specifications)
#
_df[name] = _df[name].apply(lambda value: None if str(value) == 'NaT' else (str(value)[:10]) if _item['type'] in ['DATE','DATETIME'] else str(value))
#_df[name] = _df[name].dt.date
# _df[name] = pd.to_datetime(_df[name].fillna(''),errors='coerce')
else:
pass
_df[name] = pd.to_datetime(_df[name])
else:
value = 0
if _item['type'] == 'INTEGER' :
_type = np.int64
elif _item['type'] in ['FLOAT','NUMERIC']:
_type = np.float64
else:
_value = ''
_df[name] = _df[name].fillna(_value) #.astype(_type)
columns.append(name)
fields = _df.columns.tolist()
if not writer.has(table=table) and _args['store']['provider'] != 'bigquery':
_map = {'STRING':'VARCHAR(256)','INTEGER':'BIGINT'} if 'provider' in _args['store'] and _args['store']['provider'] != 'bigquery' else {}
_params = {'map':_map,'table':args['from']}
if _schema :
_params['schema'] = _schema
else:
_params['fields'] = fields
writer.make(**_params)
fields = _df.columns.tolist()
_df = _df[fields]
# writer.fields = fields
if _args['store']['provider'] == 'bigquery' :
print (['_______ POSTING ______________ ',table])
print (['_______________ ',_df.shape[0],' ___________________'])
writer.write(_df.astype(object),schema=_schema,table=table)
else:
writer.table = table
writer.write(_df)
# else:
# writer.write(_df,table=args['from'])
def finalize(self,args):
"""
This function performs post-processing opertions on a synthetic table i.e :
- remove duplicate keys
- remove orphaned keys i.e
"""
reader = factory.instance(**args['store']['source'])
logger = factory.instance(**args['store']['logs'])
target = args['store']['target']['args']['dataset']
source = args['store']['source']['args']['dataset']
table = args['from']
schema = reader.meta(table=args['from'])
#
# keys :
unique_field = "_".join([args['from'],'id']) if 'unique_fields' not in args else args['unique_fields']
fields = [ item.name if item.name != unique_field else "y."+item.name for item in schema]
SQL = [
"SELECT :fields FROM ",
"(SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x","INNER JOIN",
"(SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table ORDER BY RAND()) y",
"ON y.row_number = x.row_number"
]
SQL = " ".join(SQL).replace(":fields",",".join(fields)).replace(":table",table).replace(":source",source).replace(":target",target)
SQL = SQL.replace(":unique_field",unique_field)
#
# Use a native job to get this done ...
#
client = bq.Client.from_service_account_json(args['store']['source']['args']["private_key"])
job = bq.QueryJobConfig()
job.destination = client.dataset(target).table(table)
job.use_query_cache = True
job.allow_large_results = True
# job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY)
job.write_disposition = "WRITE_TRUNCATE"
job.priority = 'BATCH'
r = client.query(SQL,location='US',job_config=job)
logger.write({"job":r.job_id,"action":"finalize", "args":{"sql":SQL,"source":"".join([source,table]),"destimation":".".join([target,table])}})
#
# Keep a log of what just happened...
#
otable = ".".join([args['store']['source']['args']['dataset'],args['from']])
dtable = ".".join([args['store']['target']['args']['dataset'],args['from']])
def generate(self,args):
"""
This function will generate data and store it to a given,
"""
store = args['store']['logs']
if 'args' in store :
store['args']['doc'] = args['context']
else:
store['doc'] = args['context']
logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
ostore = args['store']['target']
writer = factory.instance(**ostore)
schema = args['schema'] if 'schema' in args else None
if 'data' in args :
df = args['data']
else:
reader = factory.instance(**args['store']['source'])
if 'row_limit' in args :
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
if 'schema' not in args and hasattr(reader,'meta'):
schema = reader.meta(table=args['from'])
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
# else:
# #
# # This will account for autopilot mode ...
# df = args['data']
_cast = {}
if schema :
for _item in schema :
dtype = str
name = _item['name']
novalue = 0
if _item['type'] in ['INTEGER','NUMERIC']:
dtype = np.int64
elif _item['type'] == 'FLOAT' :
dtype = np.float64
else:
novalue = ''
# _cast[schema['name']] = dtype
df[name] = df[name].fillna(novalue).astype(dtype)
_info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[1]},"schema":schema}
logger.write(_info)
_dc = pd.DataFrame()
# for mdf in df :
args['data'] = df.copy()
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if 'continuous' in args :
x_cols = args['continuous']
else:
x_cols = []
if 'ignore' in args and 'columns' in args['ignore'] :
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
args['data'] = args['data'][ list(set(df.columns)- set(_cols))]
#
# We need to remove the continuous columns from the data-frame
# @TODO: Abstract this !!
#
real_df = pd.DataFrame()
if x_cols :
args['data'] = args['data'][list(set(args['data'].columns) - set(x_cols))]
real_df = df[x_cols].copy()
args['candidates'] = 1 if 'candidates' not in args else int(args['candidates'])
if 'gpu' in args :
args['gpu'] = self.set_gpu(gpu=args['gpu'])
# if 'partition' in args :
# args['logs'] = os.sep.join([args['logs'],str(args['partition'])])
_info = {"module":"gan-prep","action":"prune","shape":{"rows":args['data'].shape[0],"columns":args['data'].shape[1]}}
logger.write(_info)
if args['data'].shape[0] > 0 and args['data'].shape[1] > 0 :
candidates = (data.maker.generate(**args))
else:
candidates = [df]
# if 'sql.BQWriter' in ostore['type'] :
_columns = None
skip_columns = []
_schema = schema
if schema :
cols = [_item['name'] for _item in _schema]
else:
cols = df.columns.tolist()
_info = {"module":"gan-prep","action":"selection","input":{"candidates":len(candidates),"features":cols}}
logger.write(_info)
for _df in candidates :
#
# we need to format the fields here to make sure we have something cohesive
#
if not skip_columns :
if 'ignore' in args and 'columns' in args['ignore'] :
skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns'])
#
# We perform a series of set operations to insure that the following conditions are met:
# - the synthetic dataset only has fields that need to be synthesized
# - The original dataset has all the fields except those that need to be synthesized
#
_df = _df[list(set(_df.columns) - set(skip_columns))].copy()
if x_cols :
_approx = {}
for _col in x_cols :
if real_df[_col].unique().size > 0 :
_df[_col] = self.approximate(real_df[_col].values)
_approx[_col] = {
"io":{"min":_df[_col].min().astype(float),"max":_df[_col].max().astype(float),"mean":_df[_col].mean().astype(float),"sd":_df[_col].values.std().astype(float),"missing": _df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":_df[_col].where(_df[_col] == 0).dropna().count().astype(float)},
"real":{"min":real_df[_col].min().astype(float),"max":real_df[_col].max().astype(float),"mean":real_df[_col].mean().astype(float),"sd":real_df[_col].values.std().astype(float),"missing": real_df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":real_df[_col].where(_df[_col] == 0).dropna().count().astype(float)}
}
else:
_df[_col] = -1
logger.write({"module":"gan-generate","action":"approximate","status":_approx})
if set(df.columns) & set(_df.columns) :
_columns = list(set(df.columns) - set(_df.columns))
df = df[_columns]
#
# Let us merge the dataset here and and have a comprehensive dataset
_df = pd.DataFrame.join(df,_df)
_params = {'data':_df,'store' : ostore,'from':args['from']}
if _schema :
_params ['schema'] = _schema
_info = {"module":"gan-prep","action":"write","input":{"rows":_df.shape[0],"cols":_df.shape[1]}}
logger.write(_info)
self.post(**_params)
# print (['_______ posting _________________',_df.shape])
break
pass
# else:
# pass
def bind(self,**_args):
print (_args)
if __name__ == '__main__' :
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
f = open (filename)
_config = json.loads(f.read())
f.close()
PIPELINE = _config['pipeline']
index = SYS_ARGS['index']
if index.isnumeric() :
index = int(SYS_ARGS['index'])
else:
#
# The index provided is a key to a pipeline entry mainly the context
#
N = len(PIPELINE)
f = [i for i in range(0,N) if PIPELINE[i]['context'] == index]
index = f[0] if f else 0
#
print ("..::: ",PIPELINE[index]['context'],':::..')
args = (PIPELINE[index])
for key in _config :
if key == 'pipeline' or key in args:
#
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
#
continue
args[key] = _config[key]
args = dict(args,**SYS_ARGS)
if 'matrix_size' in args :
args['matrix_size'] = int(args['matrix_size'])
if 'batch_size' not in args :
args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
if 'dataset' not in args :
args['dataset'] = 'combined20191004v2_deid'
args['logs'] = args['logs'] if 'logs' in args else 'logs'
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
#
# @TODO:
# Log what was initiated so we have context of this processing ...
#
GPU_CHIPS = args['gpu'] if 'gpu' in args else None
if GPU_CHIPS and type(GPU_CHIPS) != list :
GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS]
if 'gpu' in SYS_ARGS :
args['gpu'] = GPU_CHIPS
jobs = []
if 'generate' in SYS_ARGS :
#
# Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
if 'all-chips' in SYS_ARGS and GPU_CHIPS:
index = 0
jobs = []
for _gpu in GPU_CHIPS :
_args = copy.deepcopy(args)
_args['gpu'] = [int(_gpu)]
_args['partition'] = int(_gpu) #index
index += 1
make = lambda _params: (Components()).generate(_params)
job = Process(target=make,args=( dict(_args),))
job.name = 'Trainer # ' + str(index)
job.start()
jobs.append(job)
pass
else:
generator = Components()
generator.generate(args)
elif 'bind' in SYS_ARGS :
import binder
_args = _config['_map']
_args['store'] = copy.deepcopy(_config['store'])
if 'init' in SYS_ARGS :
#
# Creating and persisting the map ...
print (['.... Binding Initialization'])
# jobs = binder.Init(**_args)
_mapped = binder.Init(**_args)
_schema = [{"name":_name,"type":"INTEGER"} for _name in _mapped.columns.tolist()]
publisher = lambda _params: (Components()).post(**_params)
_args = {'data':_mapped,'store':_config['store']['target']}
_args['store']['table'] = '_map'
if _args['store']['provider'] =='bigquery' :
_args['schema'] = _schema
job = Process (target = publisher,args=(_args,))
job.start()
jobs = [job]
else:
#
# Applying the map of k on a particular dataset
#
index = int(SYS_ARGS['index'])
_args['config'] = _config['pipeline'][index]
_args['original_key'] = 'person_id' if 'original_key' in _config else 'person_id'
table = _config['pipeline'][index]['from']
_df = binder.ApplyOn(**_args)
_df = np.array_split(_df,PART_SIZE)
jobs = []
print (['Publishing ',PART_SIZE,' PARTITION'])
for data in _df :
publisher = lambda _params: ( Components() ).post(**_params)
_args = {'data':data,'store':_config['store']['target']}
_args['store']['table'] = table
print (_args['store'])
job = Process(target = publisher,args=(_args,))
job.name = "Publisher "+str(len(jobs)+1)
job.start()
jobs.append(job)
elif 'shuffle' in SYS_ARGS :
index = 0
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
for index in GPU_CHIPS :
publisher = lambda _params: ( Components() ).shuffle(_params)
job = Process (target = publisher,args=( args,))
job.name = 'Shuffler #' + str(index)
job.start()
jobs.append(job)
else:
shuffler = Components()
shuffler.shuffle(args)
pass
elif 'train' in SYS_ARGS:
# DATA = np.array_split(DATA,PART_SIZE)
#
# Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition
# @TODO: Find better name for partition
#
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
index = 0
print (['... launching ',len(GPU_CHIPS),' jobs',args['context']])
for _gpu in GPU_CHIPS :
_args = copy.deepcopy(args)
_args['gpu'] = [int(_gpu)]
_args['partition'] = int(_gpu) #index
index += 1
make = lambda _params: (Components()).train(**_params)
job = Process(target=make,args=( _args,))
job.name = 'Trainer # ' + str(index)
job.start()
jobs.append(job)
else:
#
# The choice of the chip will be made internally
agent = Components()
agent.train(**args)
#
# If we have any obs we should wait till they finish
#
DIRTY = 0
if (len(jobs)) :
print (['.... waiting on ',len(jobs),' jobs'])
while len(jobs)> 0 :
DIRTY =1
jobs = [job for job in jobs if job.is_alive()]
time.sleep(2)
if DIRTY:
print (["..:: jobs finished "])
#
# We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations
# This holds true for bigquery - bigquery only
IS_BIGQUERY = _config['store']['source']['provider'] == _config['store']['target']['provider'] and _config['store']['source']['provider'] == 'bigquery'
# if 'bind' not in SYS_ARGS and IS_BIGQUERY and ('autopilot' in SYS_ARGS or 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS)) :
# #
# # We should pull all the primary keys and regenerate them in order to insure some form of consistency
# #
# #
# #
# print (["..:: Finalizing process"])
# (Components()).finalize(args)

View File

@ -4,9 +4,10 @@ import sys
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {"name":"data-maker","version":"1.0.5","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
args = {"name":"data-maker","version":"1.6.4",
"author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vumc.org","license":"MIT",
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
args["install_requires"] = ['data-transport@git+https://github.com/lnyemba/data-transport.git','tensorflow']
args['url'] = 'https://hiplab.mc.vanderbilt.edu/aou/data-maker.git'
if sys.version_info[0] == 2 :