From 322b21aaacccaf2458caff72fd0a090c48c7d371 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 9 Aug 2022 12:22:07 -0500 Subject: [PATCH] bug fix: encoding/decoding to improve correlations between attributes --- data/maker/__init__.py | 123 ++++++++++++++++++++++----------- data/maker/prepare/__init__.py | 57 ++++++++++++++- 2 files changed, 136 insertions(+), 44 deletions(-) diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 2d1e1f8..0d8bf33 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -13,13 +13,17 @@ import numpy as np import data.gan as gan import transport # from data.bridge import Binary -import threading as thread +import threading from data.maker import prepare import copy import os -import json +import nujson as json from multiprocessing import Process, RLock from datetime import datetime, timedelta +from multiprocessing import Queue + +import time + class Learner(Process): @@ -28,6 +32,7 @@ class Learner(Process): super(Learner, self).__init__() self.ndx = 0 + self._queue = Queue() self.lock = RLock() if 'gpu' in _args : @@ -61,34 +66,38 @@ class Learner(Process): _log = {'action':'init','gpu':(self.gpu if self.gpu is not None else -1)} self.log(**_log) - + self.cache = [] # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' # sel.max_epoc def log(self,**_args): - # self.lock.acquire() + try: - _context = self.info['context'] - _label = self.info['info'] if 'info' in self.info else _context - logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True) - _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args}) - logger.write(_args) - self.ndx += 1 - if hasattr(logger,'close') : - logger.close() + # _context = self.info['context'] + # _label = self.info['info'] if 'info' in self.info else _context + # logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider=transport.providers.CONSOLE,context='write',lock=True) + # _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args}) + # logger.write(_args) + # self.ndx += 1 + # if hasattr(logger,'close') : + # logger.close() + pass except Exception as e: print () print (_args) print (e) pass finally: - # self.lock.release() + pass def get_schema(self): - if self.store['source']['provider'] != 'bigquery' : - return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] - else: - reader = transport.factory.instance(**self.store['source']) - return reader.meta(table=self.info['from']) + # if self.store['source']['provider'] != 'bigquery' : + # return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] + # else: + # reader = transport.factory.instance(**self.store['source']) + # return reader.meta(table=self.info['from']) + reader = transport.factory.instance(**self.store['source']) + return reader.meta(table=self.info['from']) + def initalize(self): reader = transport.factory.instance(**self.store['source']) _read_args= self.info @@ -124,6 +133,25 @@ class Learner(Process): self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None _log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } } self.log(**_log) + def get(self): + + if self.cache : + return self.cache if len(self.cache) > 0 else(self.cache if not self.cache else self.cache[0]) + else: + return self._queue.get() if self._queue.qsize() > 0 else [] + + def listen(self): + while True : + _info = self._queue.get() + self.cache.append(_info) + self._queue.task_done() + def publish(self,caller): + if hasattr(caller,'_queue') : + _queue = caller._queue + _queue.put(self.cache) + + # _queue.join() + pass class Trainer(Learner): """ This will perform training using a GAN @@ -157,7 +185,8 @@ class Trainer(Learner): gTrain = gan.Train(**_args) gTrain.apply() - writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json'])) + writer = transport.factory.instance(provider=transport.providers.FILE,context='write',path=os.sep.join([gTrain.out_dir,'map.json'])) + writer.write(self._encoder._map,overwrite=True) writer.close() @@ -174,9 +203,14 @@ class Trainer(Learner): _min = float((end-beg).seconds/ 60) _logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}} self.log(**_logs) - self.generate = g - if self.autopilot : - self.generate.run() + self._g = g + if self.autopilot : + self._g.run() + # + #@TODO Find a way to have the data in the object .... + + + def generate (self): if self.autopilot : print( "Autopilot is set ... No need to call this function") @@ -224,6 +258,7 @@ class Generator (Learner): _size = np.sum([len(_item) for _item in _iomatrix]) _log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}} self.log(**_log) + # self.cache = _candidates self.post(_candidates) def approximate(self,_df): _columns = self.info['approximate'] @@ -359,12 +394,14 @@ class Generator (Learner): pass def post(self,_candidates): - _store = self.store['target'] if 'target' in self.store else {'provider':'console'} - _store['lock'] = True - _store['context'] = 'write' #-- Just in case - if 'table' not in _store : - _store['table'] = self.info['from'] - + if 'target' in self.store : + _store = self.store['target'] if 'target' in self.store else {'provider':'console'} + _store['lock'] = True + _store['context'] = 'write' #-- Just in case + if 'table' not in _store : + _store['table'] = self.info['from'] + else: + _store = None N = 0 for _iodf in _candidates : _df = self._df.copy() @@ -397,13 +434,15 @@ class Generator (Learner): # w.write(_df) # cols = [name for name in _df.columns if name.endswith('datetime')] # print (_df[cols]) - - writer = transport.factory.instance(**_store) - if _store['provider'] == 'bigquery': - writer.write(_df,schema=[],table=self.info['from']) + if _store : + writer = transport.factory.instance(**_store) + if _store['provider'] == 'bigquery': + writer.write(_df,schema=[],table=self.info['from']) + else: + writer.write(_df,table=self.info['from']) else: - writer.write(_df,table=self.info['from']) - + self.cache.append(_df) + @@ -444,6 +483,8 @@ class Shuffle(Generator): except Exception as e : # print (e) self.log(**{'action':'failed','input':{'msg':e,'info':self.info}}) +class apply : + TRAIN,GENERATE,RANDOM = 'train','generate','random' class factory : _infocache = {} @staticmethod @@ -459,12 +500,12 @@ class factory : :param batch (default 2k) size of the batch """ - if _args['apply'] == 'shuffle' : - return Shuffle(**_args) - elif _args['apply'] == 'generate' : - return Generator(**_args) + if _args['apply'] in [apply.RANDOM] : + pthread = Shuffle(**_args) + elif _args['apply'] == apply.GENERATE : + pthread = Generator(**_args) else: pthread= Trainer(**_args) - if 'start' in _args and _args['start'] == True : - pthread.start() - return pthread \ No newline at end of file + if 'start' in _args and _args['start'] == True : + pthread.start() + return pthread \ No newline at end of file diff --git a/data/maker/prepare/__init__.py b/data/maker/prepare/__init__.py index 45fc61c..d589c17 100644 --- a/data/maker/prepare/__init__.py +++ b/data/maker/prepare/__init__.py @@ -47,6 +47,15 @@ class Input : :param sql sql query that pulls a representative sample of the data """ self._schema = _args['schema'] if 'schema' in _args else {} + # + # schema data should be in a hash map for these purposes + # + if self._schema : + r = {} + for _item in self._schema : + r[_item['name']] = r[_item['type']] + self._schema = r + self.df = _args['data'] if 'sql' not in _args : self._initdata(**_args) @@ -60,6 +69,7 @@ class Input : # self._map = {} if 'map' not in _args else _args['map'] + def _initsql(self,**_args): """ This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized @@ -73,6 +83,10 @@ class Input : self._initcols(data=self.df,columns=_args['columns']) pass + def _init_map(self,values): + self._map = dict(zip(np.arange(len(values)),values)) + for key in self._map : + self._map[key] = self._map[key].tolist() def _initcols (self,**_args) : """ This function will initialize the columns to be synthesized and/or determine which ones can be synthesized @@ -109,7 +123,7 @@ class Input : """ self._initcols(**_args) - def convert(self,**_args): + def _convert(self,**_args): """ This function will convert a data-frame into a binary matrix and provide a map to be able to map the values back to the matrix :param columns in case we specify the columns to account for (just in case the original assumptions don't hold) @@ -150,7 +164,7 @@ class Input : return _values,_m - def revert(self,**_args) : + def _revert(self,**_args) : """ This function will take in a binary matrix and based on the map of values it will repopulate it with values :param _matrix binary matrix @@ -186,7 +200,9 @@ class Input : # r[key] = [columns[np.where(row == 1) [0][0] ] for row in _matrix[:,_beg:_end]] r[key] = [columns[np.where(row==1)[0][0]] if np.where(row==1)[0].size > 0 else '' for row in _matrix] - + # + # we should consider decoding the matrix if possible + # return pd.DataFrame(r) @@ -217,4 +233,39 @@ class Input : return cols,_matrix + def convert(self,**_args): + if 'columns' in _args or 'column' in _args : + columns = _args['columns'] if 'columns' in _args else [_args['column']] + else: + columns = self._columns + _df = self.df if 'data' not in _args else _args['data'] + _values,_matrix = self.encode(_df,columns) + _, _matrix = self.tobinary(_matrix) + self._init_map(_values) + return _values,_matrix #-- matrix has been updated ! + def revert(self,**_args): + # _columns = _args['column'] if 'column' in _args else None + _matrix = _args['matrix'] + # print (_matrix) + return self.decode(_matrix,columns=self._columns) + pass + def encode(self,df,columns) : + _df = df[columns].drop_duplicates() + _values = _df.values.tolist() + _encoded = df[columns].apply(lambda row: _values.index( list(row)) ,axis=1) + return np.array(_values),_encoded + def decode (self,_matrix,**_args): + # + # _matrix binary matrix + # _values value space given the columns + # columns name of the columns ... + # + + columns = _args['columns'] + _values = np.array( list(self._map.values())) + _matrix = pd.DataFrame(_matrix) #if type(_matrix) != pd.DataFrame else _matrix + x = _matrix.apply(lambda row: _values[row.values == 1 ].tolist()[0] if row.values.sum() > 0 else np.repeat(None,row.size), axis=1).tolist() + return pd.DataFrame(x,columns=columns) + +