data-maker/data/maker/__init__.py

255 lines
9.3 KiB
Python
Raw Normal View History

"""
(c) 2019 Data Maker, hiplab.mc.vanderbilt.edu
version 1.0.0
This package serves as a proxy to the overall usage of the framework.
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO:
- Make configurable GPU, EPOCHS
"""
import pandas as pd
import numpy as np
2020-01-05 05:02:15 +00:00
import data.gan as gan
2020-01-04 03:47:05 +00:00
from transport import factory
2020-02-18 08:59:39 +00:00
from data.bridge import Binary
2020-02-11 18:00:16 +00:00
import threading as thread
2020-02-29 03:37:26 +00:00
class ContinuousToDiscrete :
ROUND_UP = 2
2020-02-29 03:37:26 +00:00
@staticmethod
def binary(X,n=4) :
"""
This function will convert a continous stream of information into a variety a bit stream of bins
"""
# BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist()
2020-03-12 14:41:54 +00:00
# print ( X.values.astype(np.float32))
# print ("___________________________")
2020-03-12 19:37:01 +00:00
values = np.array(X).astype(np.float32)
2020-03-12 14:41:54 +00:00
BOUNDS = ContinuousToDiscrete.bounds(values,n)
2020-02-29 03:37:26 +00:00
# _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS]
2020-04-01 05:21:51 +00:00
# _matrix = []
# m = []
# for value in X :
# x_ = np.zeros(n)
2020-03-12 19:37:01 +00:00
2020-04-01 05:21:51 +00:00
# for row in BOUNDS :
2020-02-29 03:37:26 +00:00
2020-04-01 05:21:51 +00:00
# if value>= row.left and value <= row.right :
# index = BOUNDS.index(row)
# x_[index] = 1
# break
# _matrix += x_.tolist()
# #
# # for items in BOUNDS :
# # index = BOUNDS.index(items)
# return np.array(_matrix).reshape(len(X),n)
matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n)
2020-02-29 03:37:26 +00:00
@staticmethod
def bounds(x,n):
2020-03-07 15:16:17 +00:00
# return np.array_split(x,n)
2020-03-12 14:41:54 +00:00
values = np.round(x,ContinuousToDiscrete.ROUND_UP)
return list(pd.cut(values,n).categories)
2020-02-29 03:37:26 +00:00
@staticmethod
def continuous(X,BIN_SIZE=4) :
"""
This function will approximate a binary vector given boundary information
:X binary matrix
:BIN_SIZE
"""
BOUNDS = ContinuousToDiscrete.bounds(X,BIN_SIZE)
values = []
# _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE)
# # # print (BOUNDS)
l = {}
2020-04-01 05:21:51 +00:00
for i in np.arange(len(X)): #value in X :
value = X[i]
2020-04-01 05:21:51 +00:00
for item in BOUNDS :
if value >= item.left and value <= item.right :
values += [np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP)]
break
# values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ]
2020-02-29 03:37:26 +00:00
# # values = []
# for row in _BINARY :
# # ubound = BOUNDS[row.index(1)]
# index = np.where(row == 1)[0][0]
# ubound = BOUNDS[ index ].right
# lbound = BOUNDS[ index ].left
2020-02-29 03:37:26 +00:00
# x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
# values.append(x_)
2020-02-29 03:37:26 +00:00
# lbound = ubound
# values = [np.random.uniform() for item in BOUNDS]
2020-02-29 03:37:26 +00:00
return values
def train (**args) :
"""
This function is intended to train the GAN in order to learn about the distribution of the features
:column columns that need to be synthesized (discrete)
:logs where the output of the (location on disk)
:id identifier of the dataset
:data data-frame to be synthesized
:context label of what we are synthesizing
"""
2020-02-11 18:00:16 +00:00
column = args['column'] if (isinstance(args['column'],list)) else [args['column']]
2020-03-12 19:37:01 +00:00
# CONTINUOUS = args['continuous'] if 'continuous' in args else []
# column_id = args['id']
2020-01-10 19:12:58 +00:00
df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data'])
2020-02-11 18:00:16 +00:00
df.columns = [name.lower() for name in df.columns]
2020-02-29 03:37:26 +00:00
#
# @TODO:
# Consider sequential training of sub population for extremely large datasets
#
2020-02-11 18:00:16 +00:00
#
# If we have several columns we will proceed one at a time (it could be done in separate threads)
# @TODO : Consider performing this task on several threads/GPUs simulataneously
#
2020-02-29 03:37:26 +00:00
for col in column :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
# if 'float' not in df[col].dtypes.name :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
2020-03-12 19:37:01 +00:00
# if col in CONTINUOUS:
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32)
# # args['real'] = args['real'].reshape(df.shape[0],BIN_SIZE)
# else:
2020-03-09 00:33:08 +00:00
# df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False)
2020-03-08 13:48:38 +00:00
# print (df[col].dtypes)
# print (df[col].dropna/(axis=1).unique())
# args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values
2020-04-14 21:24:02 +00:00
msize = args['matrix_size'] if 'matrix_size' in args else 128
args['real'] = (Binary()).apply(df[col],msize)
2020-03-08 13:48:38 +00:00
2020-02-29 03:37:26 +00:00
2020-02-11 18:00:16 +00:00
context = args['context']
if 'store' in args :
args['store']['args']['doc'] = context
logger = factory.instance(**args['store'])
args['logger'] = logger
2020-03-09 00:33:08 +00:00
info = {"rows":args['real'].shape[0],"cols":args['real'].shape[1],"name":col,"partition":args['partition']}
2020-03-08 13:48:38 +00:00
logger.write({"module":"gan-train","action":"data-prep","input":info})
2020-02-11 18:00:16 +00:00
else:
logger = None
2020-03-08 13:48:38 +00:00
args['column'] = col
args['context'] = col
#
# If the s
2020-02-11 18:00:16 +00:00
trainer = gan.Train(**args)
trainer.apply()
def post(**args):
"""
This uploads the tensorflow checkpoint to a data-store (mongodb, biguqery, s3)
"""
pass
def get(**args):
2020-02-11 18:00:16 +00:00
"""
This function will restore a checkpoint from a persistant storage on to disk
"""
pass
def generate(**args):
"""
This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset
@return pandas.DataFrame
:data data-frame to be synthesized
:column columns that need to be synthesized (discrete)
:id column identifying an entity
:logs location on disk where the learnt knowledge of the dataset is
"""
2020-01-10 19:12:58 +00:00
# df = args['data']
df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data'])
2020-03-08 13:48:38 +00:00
2020-03-09 01:27:27 +00:00
CONTINUOUS = args['continuous'] if 'continuous' in args else []
2020-02-11 18:00:16 +00:00
column = args['column'] if (isinstance(args['column'],list)) else [args['column']]
# column_id = args['id']
#
#@TODO:
# If the identifier is not present, we should fine a way to determine or make one
#
2020-03-12 19:37:01 +00:00
BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
NO_VALUE = dict(args['no_value']) if type(args['no_value']) == dict else args['no_value']
bhandler = Binary()
2020-02-11 18:00:16 +00:00
_df = df.copy()
for col in column :
args['context'] = col
args['column'] = col
2020-02-29 03:37:26 +00:00
# if 'float' in df[col].dtypes.name or col in CONTINUOUS :
# #
# # We should create the bins for the values we are observing here
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE)
# # values = np.unique(values).tolist()
2020-03-12 19:37:01 +00:00
# else:
# if col in CONTINUOUS :
# values = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32).T
# else:
# values = df[col].dropna().unique().tolist()
2020-04-14 21:24:02 +00:00
msize = args['matrix_size'] if 'matrix_size' in args else 128
values = bhandler.get_column_values(df[col])
2020-03-08 13:48:38 +00:00
2020-02-29 03:37:26 +00:00
2020-03-12 19:37:01 +00:00
2020-02-29 03:37:26 +00:00
args['values'] = values
args['row_count'] = df.shape[0]
if col in NO_VALUE :
args['no_value'] = NO_VALUE[col]
else:
args['no_value'] = NO_VALUE
2020-02-11 18:00:16 +00:00
#
# we can determine the cardinalities here so we know what to allow or disallow
handler = gan.Predict (**args)
2020-02-11 18:00:16 +00:00
handler.load_meta(col)
r = handler.apply()
if col in CONTINUOUS :
r[col] = np.array(r[col])
MISSING= np.nan if args['no_value'] in ['na','','NA'] else args['no_value']
if np.isnan(MISSING):
i = np.isnan(r[col])
i = np.where (i == False)[0]
else:
i = np.where( r[col] != None)[0]
2020-04-01 05:21:51 +00:00
_approx = ContinuousToDiscrete.continuous(r[col][i],BIN_SIZE) #-- approximating based on arbitrary bins
r[col][i] = _approx
2020-04-01 05:21:51 +00:00
_df[col] = r[col]
2020-02-29 03:37:26 +00:00
#
# Let's cast the type to the original type (it makes the data more usable)
#
otype = df[col].dtype
_df[col] = _df[col].astype(otype)
#
2020-02-29 03:37:26 +00:00
# @TODO: log basic stats about the synthetic attribute
#
# print (r)s
2020-02-11 18:00:16 +00:00
# break
return _df