2020-01-01 05:27:53 +00:00
"""
( c ) 2019 Data Maker , hiplab . mc . vanderbilt . edu
version 1.0 .0
This package serves as a proxy to the overall usage of the framework .
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO :
- Make configurable GPU , EPOCHS
"""
import pandas as pd
import numpy as np
2020-01-05 05:02:15 +00:00
import data . gan as gan
2020-01-04 03:47:05 +00:00
from transport import factory
2020-02-18 08:59:39 +00:00
from data . bridge import Binary
2020-02-11 18:00:16 +00:00
import threading as thread
2020-02-29 03:37:26 +00:00
class ContinuousToDiscrete :
2020-03-04 17:49:18 +00:00
ROUND_UP = 2
2020-02-29 03:37:26 +00:00
@staticmethod
def binary ( X , n = 4 ) :
"""
This function will convert a continous stream of information into a variety a bit stream of bins
"""
# BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist()
2020-03-12 14:41:54 +00:00
# print ( X.values.astype(np.float32))
# print ("___________________________")
2020-03-12 19:37:01 +00:00
values = np . array ( X ) . astype ( np . float32 )
2020-03-12 14:41:54 +00:00
BOUNDS = ContinuousToDiscrete . bounds ( values , n )
2020-02-29 03:37:26 +00:00
# _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS]
2020-04-01 05:21:51 +00:00
# _matrix = []
# m = []
# for value in X :
# x_ = np.zeros(n)
2020-03-12 19:37:01 +00:00
2020-04-01 05:21:51 +00:00
# for row in BOUNDS :
2020-02-29 03:37:26 +00:00
2020-04-01 05:21:51 +00:00
# if value>= row.left and value <= row.right :
# index = BOUNDS.index(row)
# x_[index] = 1
# break
# _matrix += x_.tolist()
# #
# # for items in BOUNDS :
# # index = BOUNDS.index(items)
# return np.array(_matrix).reshape(len(X),n)
matrix = np . repeat ( np . zeros ( n ) , len ( X ) ) . reshape ( len ( X ) , n )
2020-02-29 03:37:26 +00:00
@staticmethod
def bounds ( x , n ) :
2020-03-07 15:16:17 +00:00
# return np.array_split(x,n)
2020-03-12 14:41:54 +00:00
values = np . round ( x , ContinuousToDiscrete . ROUND_UP )
return list ( pd . cut ( values , n ) . categories )
2020-02-29 03:37:26 +00:00
@staticmethod
def continuous ( X , BIN_SIZE = 4 ) :
"""
This function will approximate a binary vector given boundary information
: X binary matrix
: BIN_SIZE
"""
BOUNDS = ContinuousToDiscrete . bounds ( X , BIN_SIZE )
values = [ ]
2020-03-25 22:43:23 +00:00
# _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE)
# # # print (BOUNDS)
l = { }
2020-04-01 05:21:51 +00:00
for i in np . arange ( len ( X ) ) : #value in X :
value = X [ i ]
2020-03-25 22:43:23 +00:00
2020-04-01 05:21:51 +00:00
for item in BOUNDS :
if value > = item . left and value < = item . right :
values + = [ np . round ( np . random . uniform ( item . left , item . right ) , ContinuousToDiscrete . ROUND_UP ) ]
break
# values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ]
2020-02-29 03:37:26 +00:00
2020-03-25 22:43:23 +00:00
# # values = []
# for row in _BINARY :
# # ubound = BOUNDS[row.index(1)]
# index = np.where(row == 1)[0][0]
# ubound = BOUNDS[ index ].right
# lbound = BOUNDS[ index ].left
2020-02-29 03:37:26 +00:00
2020-03-25 22:43:23 +00:00
# x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
# values.append(x_)
2020-02-29 03:37:26 +00:00
2020-03-25 22:43:23 +00:00
# lbound = ubound
# values = [np.random.uniform() for item in BOUNDS]
2020-02-29 03:37:26 +00:00
return values
2020-01-01 05:27:53 +00:00
def train ( * * args ) :
"""
This function is intended to train the GAN in order to learn about the distribution of the features
: column columns that need to be synthesized ( discrete )
: logs where the output of the ( location on disk )
: id identifier of the dataset
: data data - frame to be synthesized
: context label of what we are synthesizing
"""
2020-02-11 18:00:16 +00:00
column = args [ ' column ' ] if ( isinstance ( args [ ' column ' ] , list ) ) else [ args [ ' column ' ] ]
2020-03-12 19:37:01 +00:00
# CONTINUOUS = args['continuous'] if 'continuous' in args else []
2020-02-18 18:25:47 +00:00
# column_id = args['id']
2020-01-10 19:12:58 +00:00
df = args [ ' data ' ] if not isinstance ( args [ ' data ' ] , str ) else pd . read_csv ( args [ ' data ' ] )
2020-02-11 18:00:16 +00:00
df . columns = [ name . lower ( ) for name in df . columns ]
2020-02-29 03:37:26 +00:00
#
# @TODO:
# Consider sequential training of sub population for extremely large datasets
#
2020-02-11 18:00:16 +00:00
#
# If we have several columns we will proceed one at a time (it could be done in separate threads)
# @TODO : Consider performing this task on several threads/GPUs simulataneously
#
2020-02-29 03:37:26 +00:00
for col in column :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
# if 'float' not in df[col].dtypes.name :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
2020-03-12 19:37:01 +00:00
# if col in CONTINUOUS:
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32)
# # args['real'] = args['real'].reshape(df.shape[0],BIN_SIZE)
# else:
2020-03-09 00:33:08 +00:00
# df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False)
2020-03-08 13:48:38 +00:00
# print (df[col].dtypes)
# print (df[col].dropna/(axis=1).unique())
2020-03-12 19:37:01 +00:00
args [ ' real ' ] = pd . get_dummies ( df [ col ] . dropna ( ) ) . astype ( np . float32 ) . values
2020-03-08 13:48:38 +00:00
2020-02-29 03:37:26 +00:00
2020-02-11 18:00:16 +00:00
context = args [ ' context ' ]
if ' store ' in args :
args [ ' store ' ] [ ' args ' ] [ ' doc ' ] = context
logger = factory . instance ( * * args [ ' store ' ] )
args [ ' logger ' ] = logger
2020-03-09 00:33:08 +00:00
info = { " rows " : args [ ' real ' ] . shape [ 0 ] , " cols " : args [ ' real ' ] . shape [ 1 ] , " name " : col , " partition " : args [ ' partition ' ] }
2020-03-08 13:48:38 +00:00
logger . write ( { " module " : " gan-train " , " action " : " data-prep " , " input " : info } )
2020-02-11 18:00:16 +00:00
else :
logger = None
2020-03-08 13:48:38 +00:00
args [ ' column ' ] = col
args [ ' context ' ] = col
#
# If the s
2020-02-11 18:00:16 +00:00
trainer = gan . Train ( * * args )
trainer . apply ( )
def post ( * * args ) :
"""
This uploads the tensorflow checkpoint to a data - store ( mongodb , biguqery , s3 )
"""
pass
2020-02-12 18:43:30 +00:00
def get ( * * args ) :
2020-02-11 18:00:16 +00:00
"""
This function will restore a checkpoint from a persistant storage on to disk
"""
pass
2020-01-01 05:27:53 +00:00
def generate ( * * args ) :
"""
This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset
@return pandas . DataFrame
: data data - frame to be synthesized
: column columns that need to be synthesized ( discrete )
: id column identifying an entity
: logs location on disk where the learnt knowledge of the dataset is
"""
2020-01-10 19:12:58 +00:00
# df = args['data']
df = args [ ' data ' ] if not isinstance ( args [ ' data ' ] , str ) else pd . read_csv ( args [ ' data ' ] )
2020-03-08 13:48:38 +00:00
2020-03-09 01:27:27 +00:00
CONTINUOUS = args [ ' continuous ' ] if ' continuous ' in args else [ ]
2020-02-11 18:00:16 +00:00
column = args [ ' column ' ] if ( isinstance ( args [ ' column ' ] , list ) ) else [ args [ ' column ' ] ]
2020-02-25 17:41:40 +00:00
# column_id = args['id']
2020-01-01 05:27:53 +00:00
#
#@TODO:
# If the identifier is not present, we should fine a way to determine or make one
#
2020-03-12 19:37:01 +00:00
BIN_SIZE = 4 if ' bin_size ' not in args else int ( args [ ' bin_size ' ] )
2020-03-25 22:43:23 +00:00
NO_VALUE = dict ( args [ ' no_value ' ] ) if type ( args [ ' no_value ' ] ) == dict else args [ ' no_value ' ]
2020-02-11 18:00:16 +00:00
_df = df . copy ( )
for col in column :
args [ ' context ' ] = col
args [ ' column ' ] = col
2020-02-29 03:37:26 +00:00
2020-03-04 17:49:18 +00:00
# if 'float' in df[col].dtypes.name or col in CONTINUOUS :
# #
# # We should create the bins for the values we are observing here
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE)
# # values = np.unique(values).tolist()
2020-03-12 19:37:01 +00:00
# else:
# if col in CONTINUOUS :
# values = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32).T
2020-03-04 17:49:18 +00:00
# else:
2020-03-08 13:48:38 +00:00
values = df [ col ] . dropna ( ) . unique ( ) . tolist ( )
2020-02-29 03:37:26 +00:00
2020-03-12 19:37:01 +00:00
2020-02-29 03:37:26 +00:00
args [ ' values ' ] = values
args [ ' row_count ' ] = df . shape [ 0 ]
2020-03-25 22:43:23 +00:00
if col in NO_VALUE :
args [ ' no_value ' ] = NO_VALUE [ col ]
else :
args [ ' no_value ' ] = NO_VALUE
2020-02-11 18:00:16 +00:00
#
# we can determine the cardinalities here so we know what to allow or disallow
2020-03-04 17:49:18 +00:00
handler = gan . Predict ( * * args )
2020-02-11 18:00:16 +00:00
handler . load_meta ( col )
2020-03-04 17:49:18 +00:00
r = handler . apply ( )
2020-03-25 22:43:23 +00:00
if col in CONTINUOUS :
r [ col ] = np . array ( r [ col ] )
MISSING = np . nan if args [ ' no_value ' ] in [ ' na ' , ' ' , ' NA ' ] else args [ ' no_value ' ]
if np . isnan ( MISSING ) :
i = np . isnan ( r [ col ] )
i = np . where ( i == False ) [ 0 ]
else :
i = np . where ( r [ col ] != None ) [ 0 ]
2020-04-01 05:21:51 +00:00
_approx = ContinuousToDiscrete . continuous ( r [ col ] [ i ] , BIN_SIZE ) #-- approximating based on arbitrary bins
2020-03-25 22:43:23 +00:00
r [ col ] [ i ] = _approx
2020-04-01 05:21:51 +00:00
_df [ col ] = r [ col ]
2020-02-29 03:37:26 +00:00
#
# @TODO: log basic stats about the synthetic attribute
#
2020-03-04 17:49:18 +00:00
# print (r)s
2020-02-11 18:00:16 +00:00
# break
2020-03-14 16:12:13 +00:00
2020-01-10 15:53:23 +00:00
return _df