""" (c) 2019 Data Maker, hiplab.mc.vanderbilt.edu version 1.0.0 This package serves as a proxy to the overall usage of the framework. This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques @TODO: - Make configurable GPU, EPOCHS """ import pandas as pd import numpy as np import data.gan as gan import transport from data.bridge import Binary import threading as thread from data.maker import prepare import copy import os import json from multiprocessing import Process, RLock class ContinuousToDiscrete : ROUND_UP = 2 @staticmethod def binary(X,n=4) : """ This function will convert a continous stream of information into a variety a bit stream of bins """ values = np.array(X).astype(np.float32) BOUNDS = ContinuousToDiscrete.bounds(values,n) matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n) @staticmethod def bounds(x,n): # return np.array_split(x,n) values = np.round(x,ContinuousToDiscrete.ROUND_UP) return list(pd.cut(values,n).categories) @staticmethod def continuous(X,BIN_SIZE=4) : """ This function will approximate a binary vector given boundary information :X binary matrix :BIN_SIZE """ BOUNDS = ContinuousToDiscrete.bounds(X,BIN_SIZE) values = [] # _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE) # # # print (BOUNDS) l = {} for i in np.arange(len(X)): #value in X : value = X[i] for item in BOUNDS : if value >= item.left and value <= item.right : values += [np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP)] break # values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ] # # values = [] # for row in _BINARY : # # ubound = BOUNDS[row.index(1)] # index = np.where(row == 1)[0][0] # ubound = BOUNDS[ index ].right # lbound = BOUNDS[ index ].left # x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float) # values.append(x_) # lbound = ubound # values = [np.random.uniform() for item in BOUNDS] return values def train (**_args): """ :params sql :params store """ _inputhandler = prepare.Input(**_args) values,_matrix = _inputhandler.convert() args = {"real":_matrix,"context":_args['context']} _map = {} if 'store' in _args : # # This args['store'] = copy.deepcopy(_args['store']['logs']) if 'args' in _args['store']: args['store']['args']['doc'] = _args['context'] else: args['store']['doc'] = _args['context'] logger = transport.factory.instance(**args['store']) args['logger'] = logger for key in _inputhandler._map : beg = _inputhandler._map[key]['beg'] end = _inputhandler._map[key]['end'] values = _inputhandler._map[key]['values'].tolist() _map[key] = {"beg":beg,"end":end,"values":np.array(values).astype(str).tolist()} info = {"rows":_matrix.shape[0],"cols":_matrix.shape[1],"map":_map} print() # print ([_args['context'],_inputhandler._io]) logger.write({"module":"gan-train","action":"data-prep","context":_args['context'],"input":_inputhandler._io}) args['logs'] = _args['logs'] if 'logs' in _args else 'logs' args ['max_epochs'] = _args['max_epochs'] args['matrix_size'] = _matrix.shape[0] args['batch_size'] = 2000 if 'partition' in _args : args['partition'] = _args['partition'] if 'gpu' in _args : args['gpu'] = _args['gpu'] # os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' trainer = gan.Train(**args) # # @TODO: Write the map.json in the output directory for the logs # # f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w') f = open(os.sep.join([trainer.out_dir,'map.json']),'w') f.write(json.dumps(_map)) f.close() trainer.apply() pass def get(**args): """ This function will restore a checkpoint from a persistant storage on to disk """ pass def generate(**_args): """ This function will generate a set of records, before we must load the parameters needed :param data :param context :param logs """ _args['logs'] = _args['logs'] if 'logs' in _args else 'logs' partition = _args['partition'] if 'partition' in _args else None if not partition : MAP_FOLDER = os.sep.join([_args['logs'],'output',_args['context']]) # f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json'])) else: MAP_FOLDER = os.sep.join([_args['logs'],'output',_args['context'],str(partition)]) # f = open(os.sep.join([_args['logs'],'output',_args['context'],str(partition),'map.json'])) f = open(os.sep.join([MAP_FOLDER,'map.json'])) _map = json.loads(f.read()) f.close() # # # if 'file' in _args : # df = pd.read_csv(_args['file']) # else: # df = _args['data'] if not isinstance(_args['data'],str) else pd.read_csv(_args['data']) args = {"context":_args['context'],"max_epochs":_args['max_epochs'],"candidates":_args['candidates']} args['logs'] = _args['logs'] if 'logs' in _args else 'logs' args ['max_epochs'] = _args['max_epochs'] # args['matrix_size'] = _matrix.shape[0] args['batch_size'] = 2000 args['partition'] = 0 if 'partition' not in _args else _args['partition'] args['row_count'] = _args['data'].shape[0] # # @TODO: perhaps get the space of values here ... (not sure it's a good idea) # _args['map'] = _map _inputhandler = prepare.Input(**_args) values,_matrix = _inputhandler.convert() args['values'] = np.array(values) if 'gpu' in _args : args['gpu'] = _args['gpu'] handler = gan.Predict (**args) lparams = {'columns':None} if partition : lparams['partition'] = partition handler.load_meta(**lparams) # # Let us now format the matrices by reverting them to a data-frame with values # candidates = handler.apply(candidates=args['candidates']) return [_inputhandler.revert(matrix=_matrix) for _matrix in candidates] class Learner(Process): def __init__(self,**_args): super(Learner, self).__init__() if 'gpu' in _args : print (_args['gpu']) os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu']) self.gpu = int(_args['gpu']) else: self.gpu = None self.info = _args['info'] self.columns = self.info['columns'] if 'columns' in self.info else None self.store = _args['store'] if 'network_args' not in _args : self.network_args ={ 'context':_args['context'] if 'context' in _args else 'GENERAL', 'logs':_args['logpath'] if 'logpath' in _args else 'logs', 'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2, 'batch_size':int (_args['batch']) if 'batch' in _args else 2000 } else: self.network_args = _args['network_args'] self._encoder = None self._map = None self._df = _args['data'] if 'data' in _args else None # # @TODO: allow for verbose mode so we have a sens of what is going on within the newtork # # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' # sel.max_epoc def get_schema(self): return [{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] def initalize(self): reader = transport.factory.instance(**self.store['source']) _read_args= self.info if self._df is None : self._df = reader.read(**_read_args) columns = self.columns if self.columns else self._df.columns # # convert the data to binary here ... _args = {"schema":self.get_schema(),"data":self._df,"columns":columns} if self._map : _args['map'] = self._map self._encoder = prepare.Input(**_args) class Trainer(Learner): """ This will perform training using a GAN """ def __init__(self,**_args): super().__init__(**_args) # self.info = _args['info'] self.limit = int(_args['limit']) if 'limit' in _args else None self.name = _args['name'] self.autopilot = _args['autopilot'] if 'autopilot' in _args else False self.generate = None self.candidates = int(_args['candidates']) if 'candidates' in _args else 1 def run(self): self.initalize() _space,_matrix = self._encoder.convert() _args = self.network_args if self.gpu : _args['gpu'] = self.gpu _args['real'] = _matrix _args['candidates'] = self.candidates # # At this point we have the binary matrix, we can initiate training # gTrain = gan.Train(**_args) gTrain.apply() writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json'])) writer.write(self._encoder._map,overwrite=True) writer.close() # # @TODO: At this point we need to generate another some other objects # _args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df} if self.gpu : _args['gpu'] = self.gpu g = Generator(**_args) # g.run() self.generate = g if self.autopilot : self.generate.run() def generate (self): if self.autopilot : print( "Autopilot is set ... No need to call this function") else: raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object") class Generator (Learner): def __init__(self,**_args): super().__init__(**_args) # # We need to load the mapping information for the space we are working with ... # self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1 filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json']) file = open(filename) self._map = json.loads(file.read()) file.close() def run(self): self.initalize() # # The values will be returned because we have provided _map information from the constructor # values,_matrix = self._encoder.convert() _args = self.network_args _args['map'] = self._map _args['values'] = np.array(values) _args['row_count'] = self._df.shape[0] gHandler = gan.Predict(**_args) gHandler.load_meta(columns=None) _iomatrix = gHandler.apply() _candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix] self.post(_candidates) def appriximate(self,_df): _columns = self.info['approximate'] _schema = {} for _info in self.get_schema() : _schema[_info['name']] = _info['type'] for name in _columns : batches = np.array_split(_df[name].values,10) x = [] for values in batches : _values = np.random.dirichlet(values) x += list(values + _values )if np.random.randint(0,2) else list(values - _values) _df[name] = np.int64(x) if 'int' in _schema[name] else np.float64(x) return _df def format(self,_df): pass def post(self,_candidates): _store = self.store['target'] if 'target' in self.store else {'provider':'console'} _store['lock'] = True writer = transport.factory.instance(**_store) for _iodf in _candidates : _df = self._df.copy() _df[self.columns] = _iodf[self.columns] if 'approximate' in self.info : _df = self.appriximate(_df) writer.write(_df,schema=self.get_schema()) pass class factory : _infocache = {} @staticmethod def instance(**_args): """ An instance of an object that trains and generates candidate datasets :param gpu (optional) index of the gpu to be used if using one :param store {source,target} if no target is provided console will be output :param epochs (default 2) number of epochs to train :param candidates(default 1) number of candidates to generate :param info {columns,sql,from} :param autopilot will generate output automatically :param batch (default 2k) size of the batch """ return Trainer(**_args)