212 lines
7.4 KiB
Python
212 lines
7.4 KiB
Python
"""
|
|
(c) 2019 Data Maker, hiplab.mc.vanderbilt.edu
|
|
version 1.0.0
|
|
|
|
This package serves as a proxy to the overall usage of the framework.
|
|
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
|
|
|
|
@TODO:
|
|
- Make configurable GPU, EPOCHS
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
import data.gan as gan
|
|
from transport import factory
|
|
from data.bridge import Binary
|
|
import threading as thread
|
|
class ContinuousToDiscrete :
|
|
ROUND_UP = 2
|
|
@staticmethod
|
|
def binary(X,n=4) :
|
|
"""
|
|
This function will convert a continous stream of information into a variety a bit stream of bins
|
|
"""
|
|
# BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist()
|
|
# print ( X.values.astype(np.float32))
|
|
# print ("___________________________")
|
|
values = np.array(X).astype(np.float32)
|
|
BOUNDS = ContinuousToDiscrete.bounds(values,n)
|
|
# _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS]
|
|
_matrix = []
|
|
m = []
|
|
for value in X :
|
|
x_ = np.zeros(n)
|
|
|
|
for row in BOUNDS :
|
|
|
|
if value>= row.left and value <= row.right :
|
|
index = BOUNDS.index(row)
|
|
x_[index] = 1
|
|
break
|
|
_matrix += x_.tolist()
|
|
#
|
|
# for items in BOUNDS :
|
|
# index = BOUNDS.index(items)
|
|
return np.array(_matrix).reshape(len(X),n)
|
|
|
|
@staticmethod
|
|
def bounds(x,n):
|
|
# return np.array_split(x,n)
|
|
values = np.round(x,ContinuousToDiscrete.ROUND_UP)
|
|
return list(pd.cut(values,n).categories)
|
|
|
|
|
|
|
|
@staticmethod
|
|
def continuous(X,BIN_SIZE=4) :
|
|
"""
|
|
This function will approximate a binary vector given boundary information
|
|
:X binary matrix
|
|
:BIN_SIZE
|
|
"""
|
|
BOUNDS = ContinuousToDiscrete.bounds(X,BIN_SIZE)
|
|
|
|
values = []
|
|
_BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE)
|
|
# # print (BOUNDS)
|
|
|
|
# values = []
|
|
for row in _BINARY :
|
|
# ubound = BOUNDS[row.index(1)]
|
|
index = np.where(row == 1)[0][0]
|
|
|
|
ubound = BOUNDS[ index ].right
|
|
lbound = BOUNDS[ index ].left
|
|
|
|
x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
|
|
values.append(x_)
|
|
|
|
lbound = ubound
|
|
|
|
return values
|
|
|
|
|
|
|
|
|
|
def train (**args) :
|
|
"""
|
|
This function is intended to train the GAN in order to learn about the distribution of the features
|
|
:column columns that need to be synthesized (discrete)
|
|
:logs where the output of the (location on disk)
|
|
:id identifier of the dataset
|
|
:data data-frame to be synthesized
|
|
:context label of what we are synthesizing
|
|
"""
|
|
column = args['column'] if (isinstance(args['column'],list)) else [args['column']]
|
|
# CONTINUOUS = args['continuous'] if 'continuous' in args else []
|
|
# column_id = args['id']
|
|
df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data'])
|
|
df.columns = [name.lower() for name in df.columns]
|
|
#
|
|
# @TODO:
|
|
# Consider sequential training of sub population for extremely large datasets
|
|
#
|
|
|
|
#
|
|
# If we have several columns we will proceed one at a time (it could be done in separate threads)
|
|
# @TODO : Consider performing this task on several threads/GPUs simulataneously
|
|
#
|
|
for col in column :
|
|
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
|
|
# if 'float' not in df[col].dtypes.name :
|
|
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
|
|
# if col in CONTINUOUS:
|
|
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
|
|
# args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32)
|
|
# # args['real'] = args['real'].reshape(df.shape[0],BIN_SIZE)
|
|
|
|
# else:
|
|
# df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False)
|
|
# print (df[col].dtypes)
|
|
# print (df[col].dropna/(axis=1).unique())
|
|
args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values
|
|
|
|
|
|
|
|
|
|
context = args['context']
|
|
if 'store' in args :
|
|
args['store']['args']['doc'] = context
|
|
logger = factory.instance(**args['store'])
|
|
args['logger'] = logger
|
|
info = {"rows":args['real'].shape[0],"cols":args['real'].shape[1],"name":col,"partition":args['partition']}
|
|
logger.write({"module":"gan-train","action":"data-prep","input":info})
|
|
|
|
else:
|
|
logger = None
|
|
args['column'] = col
|
|
args['context'] = col
|
|
|
|
#
|
|
# If the s
|
|
trainer = gan.Train(**args)
|
|
trainer.apply()
|
|
def post(**args):
|
|
"""
|
|
This uploads the tensorflow checkpoint to a data-store (mongodb, biguqery, s3)
|
|
|
|
"""
|
|
pass
|
|
def get(**args):
|
|
"""
|
|
This function will restore a checkpoint from a persistant storage on to disk
|
|
"""
|
|
pass
|
|
def generate(**args):
|
|
"""
|
|
This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset
|
|
@return pandas.DataFrame
|
|
|
|
:data data-frame to be synthesized
|
|
:column columns that need to be synthesized (discrete)
|
|
:id column identifying an entity
|
|
:logs location on disk where the learnt knowledge of the dataset is
|
|
"""
|
|
# df = args['data']
|
|
df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data'])
|
|
|
|
CONTINUOUS = args['continuous'] if 'continuous' in args else []
|
|
column = args['column'] if (isinstance(args['column'],list)) else [args['column']]
|
|
# column_id = args['id']
|
|
#
|
|
#@TODO:
|
|
# If the identifier is not present, we should fine a way to determine or make one
|
|
#
|
|
BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
|
|
_df = df.copy()
|
|
for col in column :
|
|
args['context'] = col
|
|
args['column'] = col
|
|
|
|
# if 'float' in df[col].dtypes.name or col in CONTINUOUS :
|
|
# #
|
|
# # We should create the bins for the values we are observing here
|
|
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
|
|
# values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE)
|
|
# # values = np.unique(values).tolist()
|
|
# else:
|
|
# if col in CONTINUOUS :
|
|
# values = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32).T
|
|
|
|
# else:
|
|
values = df[col].dropna().unique().tolist()
|
|
|
|
|
|
|
|
args['values'] = values
|
|
args['row_count'] = df.shape[0]
|
|
#
|
|
# we can determine the cardinalities here so we know what to allow or disallow
|
|
handler = gan.Predict (**args)
|
|
handler.load_meta(col)
|
|
r = handler.apply()
|
|
|
|
_df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col]
|
|
# _df[col] = r[col]
|
|
#
|
|
# @TODO: log basic stats about the synthetic attribute
|
|
#
|
|
# print (r)s
|
|
# break
|
|
|
|
return _df |