""" (c) 2019 Data Maker, hiplab.mc.vanderbilt.edu version 1.0.0 This package serves as a proxy to the overall usage of the framework. This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques @TODO: - Make configurable GPU, EPOCHS """ import pandas as pd import numpy as np import data.gan as gan import transport # from data.bridge import Binary import threading from data.maker import prepare from data.maker.state import State import copy import os import nujson as json from multiprocessing import Process, RLock from datetime import datetime, timedelta from multiprocessing import Queue from data.maker.version import __version__ import time class Learner(Process): def __init__(self,**_args): super(Learner, self).__init__() self._arch = {'init':_args} self.ndx = 0 self._queue = Queue() self.lock = RLock() if 'gpu' in _args : os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu']) self.gpu = int(_args['gpu']) else: self.gpu = None self.info = _args['info'] if 'context' not in self.info : self.info['context'] = self.info['from'] self.columns = self.info['columns'] if 'columns' in self.info else None self.store = _args['store'] if 'network_args' not in _args : self.network_args ={ 'context':self.info['context'] , 'logs':_args['logs'] if 'logs' in _args else 'logs', 'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2, 'batch_size':int (_args['batch']) if 'batch' in _args else 2000 } else: self.network_args = _args['network_args'] self._encoder = None self._map = None self._df = _args['data'] if 'data' in _args else None self.name = self.__class__.__name__ # # @TODO: allow for verbose mode so we have a sens of what is going on within the newtork # _log = {'action':'init','gpu':(self.gpu if self.gpu is not None else -1)} self.log(**_log) self.cache = [] # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' # sel.max_epoc self.logger = None if 'logger' in self.store : self.logger = transport.factory.instance(**self.store['logger']) self.autopilot = False #-- to be set by caller self._initStateSpace() def _initStateSpace(self): """ Initializing state-space for the data-maker, The state-space functions are used as pre-post processing functions applied to the data accordingly i.e - Trainer -> pre-processing - Generation -> post processing The specifications of a state space in the configuration file is as such state:{pre:{path,pipeline:[]}, post:{path,pipeline:[]}} """ self._states = None if 'state' in self.info : try: _config = self.info ['state'] self._states = State.instance(_config) except Exception as e: print (e) pass finally: # __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records') if self._states : __info = {} # print (self._states) for key in self._states : _pipeline = self._states[key] # __info[key] = ([{'name':_payload['name']} for _payload in _pipeline]) __info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key] if _item ] self.log(object='state-space',action='load',input=__info) def log(self,**_args): try: _context = self.info['context'] _label = self.info['info'] if 'info' in self.info else _context # logger = _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args}) if 'logger' in self.store : logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider=transport.providers.CONSOLE,context='write',lock=True) logger.write(_args) self.ndx += 1 # if hasattr(logger,'close') : # logger.close() pass except Exception as e: # print () # print (_args) # print (e) pass finally: pass def get_schema(self): # if self.store['source']['provider'] != 'bigquery' : # return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] # else: # reader = transport.factory.instance(**self.store['source']) # return reader.meta(table=self.info['from']) reader = transport.factory.instance(**self.store['source']) return reader.meta(table=self.info['from']) def initalize(self): reader = transport.factory.instance(**self.store['source']) _read_args= self.info if self._df is None : self._df = reader.read(**_read_args) # # NOTE : PRE # At this point we apply pre-processing of the data if there were ever a need for it # _log = {} HAS_STATES = self._states is not None and 'pre' in self._states NOT_GENERATING = self.name in ['Trainer','Shuffle'] IS_AUTOPILOT = self.autopilot # # allow calling pre-conditions if either of the conditions is true # 1. states and not generating # 2. IS_GENERATING and states and not autopilot _ALLOW_PRE_CALL = (HAS_STATES and NOT_GENERATING) or (NOT_GENERATING is False and HAS_STATES and IS_AUTOPILOT is False) if _ALLOW_PRE_CALL : # if HAS_STATES and NOT_GENERATING or (HAS_STATES and IS_AUTOPILOT is False and NOT_GENERATING is False): _logs = {'action':'status','input':{'pre':self._states['pre']}} _beg = list(self._df.shape) self._df = State.apply(self._df,self._states['pre']) _end = list(self._df.shape) _logs['input']['size'] = _beg,_end self.log(**_log) # # columns = self.columns if self.columns else self._df.columns # # Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases # - The code below tries to address the issue (Perhaps better suited for the reading components) for name in columns : # # randomly sampling 5 elements to make sense of data-types if self._df[name].size < 5 : continue _index = np.random.choice(np.arange(self._df[name].size),5,False) no_value = [type(value) in [int,float,np.int64,np.int32,np.float32,np.float64] for value in self._df[name].values[_index]] no_value = 0 if np.sum(no_value) > 0 else '' try: self._df[name] = self._df[name].fillna(no_value) except Exception as e: print (['.... skipping ',name,no_value]) finally: pass _log[name] = self._df[name].dtypes.name _log = {'action':'structure','input':_log} self.log(**_log) # # convert the data to binary here ... _schema = self.get_schema() _args = {"schema":_schema,"data":self._df,"columns":columns} if self._map : _args['map'] = self._map self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None _log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } } self.log(**_log) def get(self): if self.cache : return self.cache if len(self.cache) > 0 else(self.cache if not self.cache else self.cache[0]) else: return self._queue.get() if self._queue.qsize() > 0 else [] def listen(self): while True : _info = self._queue.get() self.cache.append(_info) self._queue.task_done() def publish(self,caller): if hasattr(caller,'_queue') : _queue = caller._queue _queue.put(self.cache) # _queue.join() pass class Trainer(Learner): """ This will perform training using a GAN """ def __init__(self,**_args): super().__init__(**_args) # self.info = _args['info'] self.limit = int(_args['limit']) if 'limit' in _args else None self.autopilot = _args['autopilot'] if 'autopilot' in _args else False self.generate = None self.candidates = int(_args['candidates']) if 'candidates' in _args else 1 self.checkpoint_skips = _args['checkpoint_skips'] if 'checkpoint_skips' in _args else None def run(self): self.initalize() if self._encoder is None : # # @TODO Log that the dataset was empty or not statistically relevant return _space,_matrix = self._encoder.convert() _args = self.network_args if self.gpu : _args['gpu'] = self.gpu _args['real'] = _matrix _args['candidates'] = self.candidates if 'logger' in self.store : _args['logger'] = transport.factory.instance(**self.store['logger']) if self.checkpoint_skips : _args['checkpoint_skips'] = self.checkpoint_skips # # At this point we have the binary matrix, we can initiate training # beg = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S') gTrain = gan.Train(**_args) gTrain.apply() writer = transport.factory.instance(provider=transport.providers.FILE,context='write',path=os.sep.join([gTrain.out_dir,'map.json'])) writer.write(self._encoder._map,overwrite=True) writer.close() # # @TODO: At this point we need to generate another some other objects # _args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df} _args['logs'] = self.network_args['logs'] _args['autopilot'] = self.autopilot if self.gpu : _args['gpu'] = self.gpu # # Let us find the smallest, the item is sorted by loss on disk # _epochs = [_e for _e in gTrain.logs['epochs'] if _e['path'] != ''] _epochs.sort(key=lambda _item: _item['loss'],reverse=False) _args['network_args']['max_epochs'] = _epochs[0]['epochs'] self.log(action='autopilot',input={'epoch':_epochs[0]}) # g.run() end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S') _min = float((end-beg).seconds/ 60) _logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}} self.log(**_logs) if self.autopilot : # g = Generator(**_args) g = Generator(**self._arch['init']) self._g = g self._g.run() # #@TODO Find a way to have the data in the object .... def generate (self): if self.autopilot : print( "Autopilot is set ... No need to call this function") else: raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object") class Generator (Learner): def __init__(self,**_args): super().__init__(**_args) # # We need to load the mapping information for the space we are working with ... # self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1 # filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json']) _suffix = self.network_args['context'] filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'meta-',_suffix,'.json']) self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}}) if os.path.exists(filename): file = open(filename) self._map = json.loads(file.read()) file.close() else: self._map = {} self.autopilot = False if 'autopilot' not in _args else _args['autopilot'] def run(self): self.initalize() if self._encoder is None : # # @TODO Log that the dataset was empty or not statistically relevant return # # The values will be returned because we have provided _map information from the constructor # values,_matrix = self._encoder.convert() _args = self.network_args _args['map'] = self._map _args['values'] = np.array(values) _args['row_count'] = self._df.shape[0] if self.gpu : _args['gpu'] = self.gpu if 'logger' in self.store : _args['logger'] = transport.factory.instance(**self.store['logger']) gHandler = gan.Predict(**_args) gHandler.load_meta(columns=None) _iomatrix = gHandler.apply() _candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix] _size = np.sum([len(_item) for _item in _iomatrix]) _log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}} self.log(**_log) # self.cache = _candidates self.post(_candidates) def approximate(self,_df): _columns = self.info['approximate'] for name in _columns : if _df[name].size > 100 : BATCH_SIZE = 10 else: BATCH_SIZE = 1 batches = np.array_split(_df[name].fillna(np.nan).values,BATCH_SIZE) _type = np.int64 if 'int' in self.info['approximate'][name]else np.float64 x = [] _log = {'action':'approximate','input':{'batch':BATCH_SIZE,'col':name}} for values in batches : index = [ _x not in ['',None,np.nan] for _x in values] if np.sum(index) == 0: # # Sometimes messy data has unpleasant surprises continue _values = np.random.rand( len(values[index])) _values += np.std(values[index]) / 4 values[index] = list(values[index] + _values )if np.random.randint(0,2) else list(values[index] - _values) values[index] = values[index].astype(_type) x += values.tolist() if x : _log['input']['identical_percentage'] = 100 * (np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size)) _df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64) self.log(**_log) return _df def make_date(self,**_args) : """ :param year initial value """ if _args['year'] in ['',None,np.nan] : return None year = int(_args['year']) offset = _args['offset'] if 'offset' in _args else 0 month = np.random.randint(1,13) if month == 2: _end = 28 if year % 4 != 0 else 29 else: _end = 31 if month in [1,3,5,7,8,10,12] else 30 day = np.random.randint(1,_end) #-- synthetic date _date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0) FORMAT = '%Y-%m-%d' _name = _args['field'] if 'field' in _args else None if 'format' in self.info and _name in self.info['format']: # _name = _args['field'] FORMAT = self.info['format'][_name] # print ([_name,FORMAT, _date.strftime(FORMAT)]) r = [] if offset : r = [_date.strftime(FORMAT)] for _delta in offset : _date = _date + timedelta(_delta) r.append(_date.strptime(FORMAT)) return r else: return _date.strftime(FORMAT) pass def format(self,_df,_schema): r = {} for _item in _schema : name = _item['name'] if _item['type'].upper() in ['DATE','DATETIME','TIMESTAMP'] : FORMAT = '%Y-%m-%d' try: # #-- Sometimes data isn't all it's meant to be SIZE = -1 if 'format' in self.info and name in self.info['format'] : FORMAT = self.info['format'][name] SIZE = 10 elif _item['type'] in ['DATETIME','TIMESTAMP'] : FORMAT = '%Y-%m-%-d %H:%M:%S' SIZE = 19 if SIZE > 0 : values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64) # _df[name] = [_date[:SIZE].strip() for _date in values] # _df[name] = _df[name].astype(str) r[name] = FORMAT # _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]') if _item['type'] in ['DATETIME','TIMESTAMP']: pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]') except Exception as e: pass finally: pass else: # # Because types are inferred on the basis of the sample being processed they can sometimes be wrong # To help disambiguate we add the schema information _type = None if 'int' in _df[name].dtypes.name or 'int' in _item['type'].lower(): _type = np.int elif 'float' in _df[name].dtypes.name or 'float' in _item['type'].lower(): _type = np.float if _type : _df[name] = _df[name].fillna(0).replace(' ',0).replace('',0).replace('NA',0).replace('nan',0).astype(_type) # else: # _df[name] = _df[name].astype(str) # _df = _df.replace('NaT','').replace('NA','') if r : self.log(**{'action':'format','input':r}) return _df pass def post(self,_candidates): if 'target' in self.store : _store = self.store['target'] if 'target' in self.store else {'provider':'console'} _store['lock'] = True _store['context'] = 'write' #-- Just in case if 'table' not in _store : _store['table'] = self.info['from'] else: _store = None N = 0 for _iodf in _candidates : _df = self._df.copy() if self.columns : _df[self.columns] = _iodf[self.columns] N += _df.shape[0] if self._states and 'post' in self._states: _df = State.apply(_df,self._states['post']) # # # #@TODO: # # Improve formatting with better post-processing pipeline # if 'approximate' in self.info : # _df = self.approximate(_df) # if 'make_date' in self.info : # for name in self.info['make_date'] : # # iname = self.info['make_date']['init_field'] # iname = self.info['make_date'][name] # years = _df[iname] # _dates = [self.make_date(year=_year,field=name) for _year in years] # if _dates : # _df[name] = _dates _schema = self.get_schema() _df = self.format(_df,_schema) _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ] self.log(**{"action":"consolidate","input":_log}) if _store : writer = transport.factory.instance(**_store) if _store['provider'] == 'bigquery': writer.write(_df,schema=[],table=self.info['from']) else: writer.write(_df,table=self.info['from']) else: self.cache.append(_df) self.log(**{'action':'write','input':{'rows':N,'candidates':len(_candidates)}}) class Shuffle(Generator): """ This is a method that will yield data with low utility """ def __init__(self,**_args): super().__init__(**_args) if 'data' not in _args : reader = transport.factory.instance(**self.store['source']) self._df = reader.read(sql=self.info['sql']) def run(self): self.initalize() # # If we are given lists of columns instead of a list-of-list # unpack the list _invColumns = [] _colNames = [] _ucolNames= [] for _item in self.info['columns'] : if type(_item) == list : _invColumns.append(_item) elif _item in self._df.columns.tolist(): _colNames.append(_item) # # At this point we build the matrix of elements we are interested in considering the any unspecified column # if _colNames : _invColumns.append(_colNames) _ucolNames = list(set(self._df.columns) - set(_colNames)) if _ucolNames : _invColumns += [ [_name] for _name in _ucolNames] _xdf = pd.DataFrame() _xdf = pd.DataFrame() _index = np.arange(self._df.shape[0]) for _columns in _invColumns : _tmpdf = self._df[_columns].copy()[_columns] np.random.seed(1) np.random.shuffle(_index) # _values = _tmpdf.values[_index] #_tmpdf = _tmpdf.iloc[_index] _tmpdf = pd.DataFrame(_tmpdf.values[_index],columns=_columns) if _xdf.shape[0] == 0 : _xdf = _tmpdf else: _xdf = _xdf.join(_tmpdf) _xdf = _xdf[self._df.columns] self._df = _xdf _log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}} self.log(**_log) try: self.post([self._df]) self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}}) except Exception as e : # print (e) self.log(**{'action':'failed','input':{'msg':e,'info':self.info}}) class apply : TRAIN,GENERATE,RANDOM = 'train','generate','random' class factory : _infocache = {} @staticmethod def instance(**_args): """ An instance of an object that trains and generates candidate datasets :param gpu (optional) index of the gpu to be used if using one :param store {source,target} if no target is provided console will be output :param epochs (default 2) number of epochs to train :param candidates(default 1) number of candidates to generate :param info {columns,sql,from} :param autopilot will generate output automatically :param batch (default 2k) size of the batch """ # if _args['apply'] in [apply.RANDOM] : pthread = Shuffle(**_args) elif _args['apply'] == apply.GENERATE : pthread = Generator(**_args) else: pthread= Trainer(**_args) if 'start' in _args and _args['start'] == True : pthread.start() return pthread class plugins: @staticmethod def load(_config): """ This function attempts to load the plugins to insure they are valid _config configuration for plugin specifications {pre:{pipeline,path},post:{pipeline,path}} """