2020-01-01 05:27:53 +00:00
"""
( c ) 2019 Data Maker , hiplab . mc . vanderbilt . edu
version 1.0 .0
This package serves as a proxy to the overall usage of the framework .
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO :
- Make configurable GPU , EPOCHS
"""
import pandas as pd
import numpy as np
2020-01-05 05:02:15 +00:00
import data . gan as gan
2022-04-11 23:33:07 +00:00
import transport
2022-04-12 04:27:25 +00:00
# from data.bridge import Binary
2020-02-11 18:00:16 +00:00
import threading as thread
2021-03-29 16:10:57 +00:00
from data . maker import prepare
import copy
import os
import json
2022-04-11 23:33:07 +00:00
from multiprocessing import Process , RLock
2022-04-12 04:27:25 +00:00
from datetime import datetime , timedelta
2021-03-29 16:10:57 +00:00
2020-02-29 03:37:26 +00:00
class ContinuousToDiscrete :
2020-03-04 17:49:18 +00:00
ROUND_UP = 2
2020-02-29 03:37:26 +00:00
@staticmethod
def binary ( X , n = 4 ) :
"""
This function will convert a continous stream of information into a variety a bit stream of bins
"""
2020-03-12 19:37:01 +00:00
values = np . array ( X ) . astype ( np . float32 )
2020-03-12 14:41:54 +00:00
BOUNDS = ContinuousToDiscrete . bounds ( values , n )
2020-04-01 05:21:51 +00:00
matrix = np . repeat ( np . zeros ( n ) , len ( X ) ) . reshape ( len ( X ) , n )
2020-02-29 03:37:26 +00:00
@staticmethod
def bounds ( x , n ) :
2020-03-07 15:16:17 +00:00
# return np.array_split(x,n)
2020-03-12 14:41:54 +00:00
values = np . round ( x , ContinuousToDiscrete . ROUND_UP )
return list ( pd . cut ( values , n ) . categories )
2020-02-29 03:37:26 +00:00
@staticmethod
def continuous ( X , BIN_SIZE = 4 ) :
"""
This function will approximate a binary vector given boundary information
: X binary matrix
: BIN_SIZE
"""
BOUNDS = ContinuousToDiscrete . bounds ( X , BIN_SIZE )
values = [ ]
2020-03-25 22:43:23 +00:00
# _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE)
# # # print (BOUNDS)
l = { }
2020-04-01 05:21:51 +00:00
for i in np . arange ( len ( X ) ) : #value in X :
value = X [ i ]
2020-03-25 22:43:23 +00:00
2020-04-01 05:21:51 +00:00
for item in BOUNDS :
if value > = item . left and value < = item . right :
values + = [ np . round ( np . random . uniform ( item . left , item . right ) , ContinuousToDiscrete . ROUND_UP ) ]
break
# values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ]
2020-02-29 03:37:26 +00:00
2020-03-25 22:43:23 +00:00
# # values = []
# for row in _BINARY :
# # ubound = BOUNDS[row.index(1)]
# index = np.where(row == 1)[0][0]
# ubound = BOUNDS[ index ].right
# lbound = BOUNDS[ index ].left
2020-02-29 03:37:26 +00:00
2020-03-25 22:43:23 +00:00
# x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
# values.append(x_)
2020-02-29 03:37:26 +00:00
2020-03-25 22:43:23 +00:00
# lbound = ubound
# values = [np.random.uniform() for item in BOUNDS]
2020-02-29 03:37:26 +00:00
return values
2021-03-29 16:10:57 +00:00
def train ( * * _args ) :
"""
: params sql
: params store
"""
2021-03-30 09:56:01 +00:00
2021-03-29 16:10:57 +00:00
_inputhandler = prepare . Input ( * * _args )
values , _matrix = _inputhandler . convert ( )
args = { " real " : _matrix , " context " : _args [ ' context ' ] }
_map = { }
if ' store ' in _args :
#
# This
2021-03-30 09:56:01 +00:00
2021-03-29 16:10:57 +00:00
args [ ' store ' ] = copy . deepcopy ( _args [ ' store ' ] [ ' logs ' ] )
2022-03-24 16:38:52 +00:00
if ' args ' in _args [ ' store ' ] :
args [ ' store ' ] [ ' args ' ] [ ' doc ' ] = _args [ ' context ' ]
else :
args [ ' store ' ] [ ' doc ' ] = _args [ ' context ' ]
2022-04-11 23:33:07 +00:00
logger = transport . factory . instance ( * * args [ ' store ' ] )
2021-03-29 16:10:57 +00:00
args [ ' logger ' ] = logger
for key in _inputhandler . _map :
beg = _inputhandler . _map [ key ] [ ' beg ' ]
end = _inputhandler . _map [ key ] [ ' end ' ]
values = _inputhandler . _map [ key ] [ ' values ' ] . tolist ( )
_map [ key ] = { " beg " : beg , " end " : end , " values " : np . array ( values ) . astype ( str ) . tolist ( ) }
info = { " rows " : _matrix . shape [ 0 ] , " cols " : _matrix . shape [ 1 ] , " map " : _map }
2022-01-13 21:05:00 +00:00
print ( )
# print ([_args['context'],_inputhandler._io])
2021-03-30 21:14:48 +00:00
logger . write ( { " module " : " gan-train " , " action " : " data-prep " , " context " : _args [ ' context ' ] , " input " : _inputhandler . _io } )
2021-03-29 16:10:57 +00:00
args [ ' logs ' ] = _args [ ' logs ' ] if ' logs ' in _args else ' logs '
args [ ' max_epochs ' ] = _args [ ' max_epochs ' ]
args [ ' matrix_size ' ] = _matrix . shape [ 0 ]
args [ ' batch_size ' ] = 2000
2021-04-07 20:30:59 +00:00
if ' partition ' in _args :
args [ ' partition ' ] = _args [ ' partition ' ]
2021-04-01 18:20:35 +00:00
if ' gpu ' in _args :
args [ ' gpu ' ] = _args [ ' gpu ' ]
2021-04-01 18:09:06 +00:00
# os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
2020-02-29 03:37:26 +00:00
2021-03-29 16:10:57 +00:00
trainer = gan . Train ( * * args )
#
# @TODO: Write the map.json in the output directory for the logs
#
2021-04-07 20:30:59 +00:00
# f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w')
f = open ( os . sep . join ( [ trainer . out_dir , ' map.json ' ] ) , ' w ' )
2021-03-29 16:10:57 +00:00
f . write ( json . dumps ( _map ) )
f . close ( )
trainer . apply ( )
pass
2020-03-08 13:48:38 +00:00
2020-02-12 18:43:30 +00:00
def get ( * * args ) :
2020-02-11 18:00:16 +00:00
"""
This function will restore a checkpoint from a persistant storage on to disk
"""
pass
2021-03-29 16:10:57 +00:00
def generate ( * * _args ) :
"""
This function will generate a set of records , before we must load the parameters needed
: param data
: param context
: param logs
"""
2022-01-13 21:05:00 +00:00
_args [ ' logs ' ] = _args [ ' logs ' ] if ' logs ' in _args else ' logs '
2021-04-07 20:30:59 +00:00
partition = _args [ ' partition ' ] if ' partition ' in _args else None
if not partition :
2022-01-13 21:05:00 +00:00
MAP_FOLDER = os . sep . join ( [ _args [ ' logs ' ] , ' output ' , _args [ ' context ' ] ] )
2021-05-10 19:33:18 +00:00
# f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']))
2021-04-07 20:30:59 +00:00
else :
2021-05-10 20:02:55 +00:00
MAP_FOLDER = os . sep . join ( [ _args [ ' logs ' ] , ' output ' , _args [ ' context ' ] , str ( partition ) ] )
2021-05-10 19:33:18 +00:00
# f = open(os.sep.join([_args['logs'],'output',_args['context'],str(partition),'map.json']))
2021-05-10 20:02:55 +00:00
f = open ( os . sep . join ( [ MAP_FOLDER , ' map.json ' ] ) )
2021-03-29 16:10:57 +00:00
_map = json . loads ( f . read ( ) )
f . close ( )
2021-05-10 20:02:55 +00:00
#
#
2021-03-30 14:00:57 +00:00
# if 'file' in _args :
# df = pd.read_csv(_args['file'])
# else:
# df = _args['data'] if not isinstance(_args['data'],str) else pd.read_csv(_args['data'])
2021-03-29 16:10:57 +00:00
args = { " context " : _args [ ' context ' ] , " max_epochs " : _args [ ' max_epochs ' ] , " candidates " : _args [ ' candidates ' ] }
2021-05-10 20:02:55 +00:00
args [ ' logs ' ] = _args [ ' logs ' ] if ' logs ' in _args else ' logs '
2021-03-29 16:10:57 +00:00
args [ ' max_epochs ' ] = _args [ ' max_epochs ' ]
# args['matrix_size'] = _matrix.shape[0]
args [ ' batch_size ' ] = 2000
args [ ' partition ' ] = 0 if ' partition ' not in _args else _args [ ' partition ' ]
2021-03-30 14:00:57 +00:00
args [ ' row_count ' ] = _args [ ' data ' ] . shape [ 0 ]
2021-03-29 16:10:57 +00:00
#
# @TODO: perhaps get the space of values here ... (not sure it's a good idea)
#
_args [ ' map ' ] = _map
_inputhandler = prepare . Input ( * * _args )
values , _matrix = _inputhandler . convert ( )
args [ ' values ' ] = np . array ( values )
2021-04-01 18:20:35 +00:00
if ' gpu ' in _args :
args [ ' gpu ' ] = _args [ ' gpu ' ]
2021-04-01 18:09:06 +00:00
2021-03-29 16:10:57 +00:00
handler = gan . Predict ( * * args )
2021-05-10 19:43:29 +00:00
lparams = { ' columns ' : None }
if partition :
lparams [ ' partition ' ] = partition
2021-05-10 19:49:08 +00:00
handler . load_meta ( * * lparams )
2021-03-29 16:10:57 +00:00
#
2021-03-29 23:53:57 +00:00
# Let us now format the matrices by reverting them to a data-frame with values
2021-03-29 16:10:57 +00:00
#
candidates = handler . apply ( candidates = args [ ' candidates ' ] )
return [ _inputhandler . revert ( matrix = _matrix ) for _matrix in candidates ]
2022-04-11 23:33:07 +00:00
class Learner ( Process ) :
def __init__ ( self , * * _args ) :
super ( Learner , self ) . __init__ ( )
if ' gpu ' in _args :
print ( _args [ ' gpu ' ] )
os . environ [ ' CUDA_VISIBLE_DEVICES ' ] = str ( _args [ ' gpu ' ] )
self . gpu = int ( _args [ ' gpu ' ] )
else :
self . gpu = None
2022-04-12 18:16:48 +00:00
2022-04-11 23:33:07 +00:00
self . info = _args [ ' info ' ]
self . columns = self . info [ ' columns ' ] if ' columns ' in self . info else None
self . store = _args [ ' store ' ]
2022-04-12 18:16:48 +00:00
self . logger = transport . factory . instance ( _args [ ' logger ' ] ) if ' logger ' in self . store else transport . factory . instance ( provider = ' console ' , context = ' write ' , lock = True )
2022-04-11 23:33:07 +00:00
if ' network_args ' not in _args :
self . network_args = {
2022-04-12 19:00:03 +00:00
' context ' : self . info [ ' context ' ] ,
2022-04-11 23:33:07 +00:00
' logs ' : _args [ ' logpath ' ] if ' logpath ' in _args else ' logs ' ,
' max_epochs ' : int ( _args [ ' epochs ' ] ) if ' epochs ' in _args else 2 ,
' batch_size ' : int ( _args [ ' batch ' ] ) if ' batch ' in _args else 2000
}
else :
self . network_args = _args [ ' network_args ' ]
self . _encoder = None
self . _map = None
self . _df = _args [ ' data ' ] if ' data ' in _args else None
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
def get_schema ( self ) :
2022-04-12 04:27:25 +00:00
if self . store [ ' source ' ] [ ' provider ' ] != ' bigquery ' :
return [ { ' name ' : self . _df . dtypes . index . tolist ( ) [ i ] , ' type ' : self . _df . dtypes . astype ( str ) . tolist ( ) [ i ] } for i in range ( self . _df . dtypes . shape [ 0 ] ) ]
else :
reader = transport . factory . instance ( * * self . store [ ' source ' ] )
return reader . meta ( table = self . info [ ' from ' ] )
2022-04-11 23:33:07 +00:00
def initalize ( self ) :
reader = transport . factory . instance ( * * self . store [ ' source ' ] )
_read_args = self . info
if self . _df is None :
self . _df = reader . read ( * * _read_args )
columns = self . columns if self . columns else self . _df . columns
#
# convert the data to binary here ...
_args = { " schema " : self . get_schema ( ) , " data " : self . _df , " columns " : columns }
if self . _map :
_args [ ' map ' ] = self . _map
self . _encoder = prepare . Input ( * * _args )
class Trainer ( Learner ) :
"""
This will perform training using a GAN
"""
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
# self.info = _args['info']
self . limit = int ( _args [ ' limit ' ] ) if ' limit ' in _args else None
self . name = _args [ ' name ' ]
self . autopilot = _args [ ' autopilot ' ] if ' autopilot ' in _args else False
self . generate = None
self . candidates = int ( _args [ ' candidates ' ] ) if ' candidates ' in _args else 1
def run ( self ) :
self . initalize ( )
_space , _matrix = self . _encoder . convert ( )
_args = self . network_args
if self . gpu :
_args [ ' gpu ' ] = self . gpu
_args [ ' real ' ] = _matrix
_args [ ' candidates ' ] = self . candidates
#
# At this point we have the binary matrix, we can initiate training
#
gTrain = gan . Train ( * * _args )
gTrain . apply ( )
writer = transport . factory . instance ( provider = ' file ' , context = ' write ' , path = os . sep . join ( [ gTrain . out_dir , ' map.json ' ] ) )
writer . write ( self . _encoder . _map , overwrite = True )
writer . close ( )
#
# @TODO: At this point we need to generate another some other objects
#
_args = { " network_args " : self . network_args , " store " : self . store , " info " : self . info , " candidates " : self . candidates , " data " : self . _df }
if self . gpu :
_args [ ' gpu ' ] = self . gpu
g = Generator ( * * _args )
# g.run()
self . generate = g
if self . autopilot :
self . generate . run ( )
def generate ( self ) :
if self . autopilot :
print ( " Autopilot is set ... No need to call this function " )
else :
raise Exception ( " Autopilot has not been, Wait till training is finished. Use is_alive function on process object " )
class Generator ( Learner ) :
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
#
# We need to load the mapping information for the space we are working with ...
#
self . network_args [ ' candidates ' ] = int ( _args [ ' candidates ' ] ) if ' candidates ' in _args else 1
filename = os . sep . join ( [ self . network_args [ ' logs ' ] , ' output ' , self . network_args [ ' context ' ] , ' map.json ' ] )
file = open ( filename )
self . _map = json . loads ( file . read ( ) )
file . close ( )
def run ( self ) :
self . initalize ( )
#
# The values will be returned because we have provided _map information from the constructor
#
values , _matrix = self . _encoder . convert ( )
_args = self . network_args
_args [ ' map ' ] = self . _map
_args [ ' values ' ] = np . array ( values )
_args [ ' row_count ' ] = self . _df . shape [ 0 ]
2022-04-12 19:50:19 +00:00
if self . gpu :
_args [ ' gpu ' ] = self . gpu
2022-04-11 23:33:07 +00:00
gHandler = gan . Predict ( * * _args )
gHandler . load_meta ( columns = None )
_iomatrix = gHandler . apply ( )
_candidates = [ self . _encoder . revert ( matrix = _item ) for _item in _iomatrix ]
self . post ( _candidates )
2022-04-12 04:27:25 +00:00
def approximate ( self , _df ) :
2022-04-11 23:33:07 +00:00
_columns = self . info [ ' approximate ' ]
2022-04-12 04:27:25 +00:00
# _schema = {}
# for _info in self.get_schema() :
# _schema[_info['name']] = _info['type']
2022-04-11 23:33:07 +00:00
for name in _columns :
2022-04-12 04:27:25 +00:00
batches = np . array_split ( _df [ name ] . fillna ( np . nan ) . values , 2 )
_type = np . int64 if ' int ' in self . info [ ' approximate ' ] [ name ] else np . float64
2022-04-11 23:33:07 +00:00
x = [ ]
for values in batches :
2022-04-12 04:27:25 +00:00
index = np . where ( values != ' ' )
_values = np . random . dirichlet ( values [ index ] . astype ( _type ) )
values [ index ] = list ( values [ index ] + _values ) if np . random . randint ( 0 , 2 ) else list ( values [ index ] - _values )
values [ index ] = values [ index ] . astype ( _type )
x + = values . tolist ( )
if x :
_df [ name ] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
2022-04-11 23:33:07 +00:00
return _df
2022-04-12 04:27:25 +00:00
def make_date ( self , * * _args ) :
"""
: param year initial value
"""
if _args [ ' year ' ] in [ ' ' , None , np . nan ] :
return None
year = int ( _args [ ' year ' ] )
offset = _args [ ' offset ' ] if ' offset ' in _args else 0
month = np . random . randint ( 1 , 13 )
if month == 2 :
_end = 28 if year % 4 != 0 else 29
else :
_end = 31 if month in [ 1 , 3 , 5 , 7 , 8 , 10 , 12 ] else 30
day = np . random . randint ( 1 , _end )
#-- synthetic date
_date = datetime ( year = year , month = month , day = day )
2022-04-12 19:00:03 +00:00
FORMAT = ' % Y- % m- %d '
if ' format ' in self . info and ' field ' in _args and _args [ ' field ' ] in self . info [ ' format ' ] :
_name = _args [ ' field ' ]
FORMAT = self . info [ ' format ' ] [ _name ]
2022-04-12 04:27:25 +00:00
r = [ ]
if offset :
r = [ _date . strftime ( FORMAT ) ]
for _delta in offset :
_date = _date + timedelta ( _delta )
r . append ( _date . strftime ( FORMAT ) )
return r
else :
return _date . strftime ( FORMAT )
pass
2022-04-11 23:33:07 +00:00
def format ( self , _df ) :
pass
def post ( self , _candidates ) :
_store = self . store [ ' target ' ] if ' target ' in self . store else { ' provider ' : ' console ' }
_store [ ' lock ' ] = True
2022-04-12 18:16:48 +00:00
_store [ ' context ' ] = ' write ' #-- Just in case
2022-04-12 19:00:03 +00:00
if ' table ' not in _store :
_store [ ' table ' ] = self . info [ ' from ' ]
2022-04-11 23:33:07 +00:00
writer = transport . factory . instance ( * * _store )
for _iodf in _candidates :
_df = self . _df . copy ( )
_df [ self . columns ] = _iodf [ self . columns ]
2022-04-12 18:16:48 +00:00
#
#@TODO:
# Improve formatting with better post-processing pipeline
2022-04-12 04:27:25 +00:00
if ' approximate ' in self . info :
_df = self . approximate ( _df )
if ' make_date ' in self . info :
for name in self . info [ ' make_date ' ] :
# iname = self.info['make_date']['init_field']
iname = self . info [ ' make_date ' ] [ name ]
years = _df [ iname ]
2022-04-12 19:00:03 +00:00
_dates = [ self . make_date ( year = year , field = name ) for year in years ]
2022-04-12 04:27:25 +00:00
if _dates :
_df [ name ] = _dates
2022-04-12 19:00:03 +00:00
_schema = self . get_schema ( )
_schema = [ { ' name ' : _item . name , ' type ' : _item . field_type } for _item in _schema ]
2022-04-12 19:35:58 +00:00
writer . write ( _df [ self . columns ] , schema = _schema )
2022-04-11 23:33:07 +00:00
pass
class factory :
_infocache = { }
@staticmethod
def instance ( * * _args ) :
"""
An instance of an object that trains and generates candidate datasets
: param gpu ( optional ) index of the gpu to be used if using one
: param store { source , target } if no target is provided console will be output
: param epochs ( default 2 ) number of epochs to train
: param candidates ( default 1 ) number of candidates to generate
: param info { columns , sql , from }
: param autopilot will generate output automatically
: param batch ( default 2 k ) size of the batch
"""
return Trainer ( * * _args )