From 2fdc7c8f5c92c1159dc8d716f10d23a352d61892 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 13 Apr 2022 10:07:27 -0500 Subject: [PATCH] bug fix --- data/maker/__init__.py | 221 +++++------------------------------------ 1 file changed, 26 insertions(+), 195 deletions(-) diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 382c209..3acddc1 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -21,181 +21,6 @@ import json from multiprocessing import Process, RLock from datetime import datetime, timedelta -class ContinuousToDiscrete : - ROUND_UP = 2 - @staticmethod - def binary(X,n=4) : - """ - This function will convert a continous stream of information into a variety a bit stream of bins - """ - values = np.array(X).astype(np.float32) - BOUNDS = ContinuousToDiscrete.bounds(values,n) - matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n) - - - @staticmethod - def bounds(x,n): - # return np.array_split(x,n) - values = np.round(x,ContinuousToDiscrete.ROUND_UP) - return list(pd.cut(values,n).categories) - - - - @staticmethod - def continuous(X,BIN_SIZE=4) : - """ - This function will approximate a binary vector given boundary information - :X binary matrix - :BIN_SIZE - """ - BOUNDS = ContinuousToDiscrete.bounds(X,BIN_SIZE) - - values = [] - # _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE) - # # # print (BOUNDS) - l = {} - for i in np.arange(len(X)): #value in X : - - value = X[i] - - for item in BOUNDS : - if value >= item.left and value <= item.right : - values += [np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP)] - break - # values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ] - - - # # values = [] - # for row in _BINARY : - # # ubound = BOUNDS[row.index(1)] - # index = np.where(row == 1)[0][0] - - # ubound = BOUNDS[ index ].right - # lbound = BOUNDS[ index ].left - - # x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float) - # values.append(x_) - - # lbound = ubound - - # values = [np.random.uniform() for item in BOUNDS] - - return values - - -def train (**_args): - """ - :params sql - :params store - """ - - _inputhandler = prepare.Input(**_args) - values,_matrix = _inputhandler.convert() - args = {"real":_matrix,"context":_args['context']} - _map = {} - if 'store' in _args : - # - # This - - args['store'] = copy.deepcopy(_args['store']['logs']) - if 'args' in _args['store']: - args['store']['args']['doc'] = _args['context'] - else: - - args['store']['doc'] = _args['context'] - logger = transport.factory.instance(**args['store']) - args['logger'] = logger - - for key in _inputhandler._map : - beg = _inputhandler._map[key]['beg'] - end = _inputhandler._map[key]['end'] - values = _inputhandler._map[key]['values'].tolist() - _map[key] = {"beg":beg,"end":end,"values":np.array(values).astype(str).tolist()} - info = {"rows":_matrix.shape[0],"cols":_matrix.shape[1],"map":_map} - print() - # print ([_args['context'],_inputhandler._io]) - logger.write({"module":"gan-train","action":"data-prep","context":_args['context'],"input":_inputhandler._io}) - - args['logs'] = _args['logs'] if 'logs' in _args else 'logs' - args ['max_epochs'] = _args['max_epochs'] - args['matrix_size'] = _matrix.shape[0] - args['batch_size'] = 2000 - if 'partition' in _args : - args['partition'] = _args['partition'] - if 'gpu' in _args : - args['gpu'] = _args['gpu'] - # os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' - - trainer = gan.Train(**args) - # - # @TODO: Write the map.json in the output directory for the logs - # - # f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w') - f = open(os.sep.join([trainer.out_dir,'map.json']),'w') - f.write(json.dumps(_map)) - f.close() - - trainer.apply() - pass - -def get(**args): - """ - This function will restore a checkpoint from a persistant storage on to disk - """ - pass -def generate(**_args): - """ - This function will generate a set of records, before we must load the parameters needed - :param data - :param context - :param logs - """ - _args['logs'] = _args['logs'] if 'logs' in _args else 'logs' - partition = _args['partition'] if 'partition' in _args else None - if not partition : - MAP_FOLDER = os.sep.join([_args['logs'],'output',_args['context']]) - # f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json'])) - else: - MAP_FOLDER = os.sep.join([_args['logs'],'output',_args['context'],str(partition)]) - # f = open(os.sep.join([_args['logs'],'output',_args['context'],str(partition),'map.json'])) - f = open(os.sep.join([MAP_FOLDER,'map.json'])) - _map = json.loads(f.read()) - f.close() - # - # - # if 'file' in _args : - # df = pd.read_csv(_args['file']) - # else: - # df = _args['data'] if not isinstance(_args['data'],str) else pd.read_csv(_args['data']) - args = {"context":_args['context'],"max_epochs":_args['max_epochs'],"candidates":_args['candidates']} - args['logs'] = _args['logs'] if 'logs' in _args else 'logs' - args ['max_epochs'] = _args['max_epochs'] - # args['matrix_size'] = _matrix.shape[0] - args['batch_size'] = 2000 - args['partition'] = 0 if 'partition' not in _args else _args['partition'] - args['row_count'] = _args['data'].shape[0] - # - # @TODO: perhaps get the space of values here ... (not sure it's a good idea) - # - _args['map'] = _map - _inputhandler = prepare.Input(**_args) - values,_matrix = _inputhandler.convert() - args['values'] = np.array(values) - if 'gpu' in _args : - args['gpu'] = _args['gpu'] - - handler = gan.Predict (**args) - lparams = {'columns':None} - if partition : - lparams['partition'] = partition - handler.load_meta(**lparams) - # - # Let us now format the matrices by reverting them to a data-frame with values - # - - candidates = handler.apply(candidates=args['candidates']) - return [_inputhandler.revert(matrix=_matrix) for _matrix in candidates] - class Learner(Process): def __init__(self,**_args): @@ -211,7 +36,7 @@ class Learner(Process): self.info = _args['info'] self.columns = self.info['columns'] if 'columns' in self.info else None self.store = _args['store'] - self.logger = transport.factory.instance(_args['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True) + if 'network_args' not in _args : self.network_args ={ 'context':self.info['context'] , @@ -228,12 +53,18 @@ class Learner(Process): # # @TODO: allow for verbose mode so we have a sens of what is going on within the newtork # - if self.logger : - _args = {'module':self.name,'action':'init','context':self.info['context'],'gpu':(self.gpu if self.gpu is not None else -1)} - self.logger.write(_args) + + _log = {'module':self.name,'action':'init','context':self.info['context'],'gpu':(self.gpu if self.gpu is not None else -1)} + self.log(**_log) # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' # sel.max_epoc + def log(self,**_args): + logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True) + logger.write(_args) + if hasattr(logger,'close') : + logger.close() + def get_schema(self): if self.store['source']['provider'] != 'bigquery' : return [{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] @@ -253,9 +84,9 @@ class Learner(Process): if self._map : _args['map'] = self._map self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None - if self.logger : - _args = {'module':self.name,'action':'data-prep','input':{'rows':self._df.shape[0],'cols':self._df.shape[1]} } - self.logger.write(_args) + + _log = {'module':self.name,'action':'data-prep','input':{'rows':self._df.shape[0],'cols':self._df.shape[1]} } + self.log(**_log) class Trainer(Learner): """ This will perform training using a GAN @@ -301,10 +132,10 @@ class Trainer(Learner): _args['gpu'] = self.gpu g = Generator(**_args) # g.run() - if self.logger : - end = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - logs = {'module':self.name,'action':'train','input':{'start':beg,'end':end}} - self.logger.write(logs) + + end = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + _logs = {'module':self.name,'action':'train','input':{'start':beg,'end':end}} + self.log(**_logs) self.generate = g if self.autopilot : self.generate.run() @@ -347,10 +178,10 @@ class Generator (Learner): gHandler.load_meta(columns=None) _iomatrix = gHandler.apply() _candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix] - if self.logger : - _size = np.sum([len(_item) for _item in _iomatrix]) - _log = {'module':self.name,'action':'io-data','input':{'candidates':len(_candidates),'rows':_size}} - self.logger.write(_log) + + _size = np.sum([len(_item) for _item in _iomatrix]) + _log = {'module':self.name,'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}} + self.log(**_log) self.post(_candidates) def approximate(self,_df): _columns = self.info['approximate'] @@ -373,10 +204,10 @@ class Generator (Learner): values[index] = values[index].astype(_type) x += values.tolist() if x : - _log['input']['diff'] = 1 - np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size) + _log['input']['diff_pct'] = 100 * (1 - np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size)) _df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64) - if self.logger : - self.logger.write(_log) + + self.log(**_log) return _df def make_date(self,**_args) : """ @@ -446,8 +277,8 @@ class Generator (Learner): _schema = [{'name':_item.name,'type':_item.field_type} for _item in _schema] writer.write(_df,schema=_schema) - if self.logger : - self.logger.write({'module':self.name,'action':'write','input':{'rows':N,'candidates':len(_candidates)}}) + + self.log(**{'module':self.name,'action':'write','input':{'rows':N,'candidates':len(_candidates)}}) class factory : _infocache = {} @staticmethod