data-maker/data/maker/__init__.py

667 lines
25 KiB
Python

"""
(c) 2019 Data Maker, hiplab.mc.vanderbilt.edu
version 1.0.0
This package serves as a proxy to the overall usage of the framework.
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO:
- Make configurable GPU, EPOCHS
"""
import pandas as pd
import numpy as np
import data.gan as gan
import transport
# from data.bridge import Binary
import threading
from data.maker import prepare
from data.maker.state import State
import copy
import os
import nujson as json
from multiprocessing import Process, RLock
from datetime import datetime, timedelta
from multiprocessing import Queue
from data.maker.version import __version__
import time
class Learner(Process):
def __init__(self,**_args):
super(Learner, self).__init__()
self._arch = {'init':_args}
self.ndx = 0
self._queue = Queue()
self.lock = RLock()
if 'gpu' in _args :
os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu'])
self.gpu = int(_args['gpu'])
else:
self.gpu = None
self.info = _args['info']
if 'context' not in self.info :
self.info['context'] = self.info['from']
self.columns = self.info['columns'] if 'columns' in self.info else None
self.store = _args['store']
if 'network_args' not in _args :
self.network_args ={
'context':self.info['context'] ,
'logs':_args['logs'] if 'logs' in _args else 'logs',
'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2,
'batch_size':int (_args['batch']) if 'batch' in _args else 2000
}
else:
self.network_args = _args['network_args']
self._encoder = None
self._map = None
self._df = _args['data'] if 'data' in _args else None
self.name = self.__class__.__name__
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
_log = {'action':'init','gpu':(self.gpu if self.gpu is not None else -1)}
self.log(**_log)
self.cache = []
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
self.logger = None
if 'logger' in self.store :
self.logger = transport.factory.instance(**self.store['logger'])
self.autopilot = False #-- to be set by caller
self._initStateSpace()
def _initStateSpace(self):
"""
Initializing state-space for the data-maker, The state-space functions are used as pre-post processing functions applied to the data accordingly i.e
- Trainer -> pre-processing
- Generation -> post processing
The specifications of a state space in the configuration file is as such
state:{pre:{path,pipeline:[]}, post:{path,pipeline:[]}}
"""
self._states = None
if 'state' in self.info :
try:
_config = self.info ['state']
self._states = State.instance(_config)
except Exception as e:
print (e)
pass
finally:
# __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records')
if self._states :
__info = {}
# print (self._states)
for key in self._states :
_pipeline = self._states[key]
# __info[key] = ([{'name':_payload['name']} for _payload in _pipeline])
__info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key] if _item ]
self.log(object='state-space',action='load',input=__info)
def log(self,**_args):
try:
_context = self.info['context']
_label = self.info['info'] if 'info' in self.info else _context
# logger =
_args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args})
if 'logger' in self.store :
logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider=transport.providers.CONSOLE,context='write',lock=True)
logger.write(_args)
self.ndx += 1
# if hasattr(logger,'close') :
# logger.close()
pass
except Exception as e:
# print ()
# print (_args)
# print (e)
pass
finally:
pass
def get_schema(self):
# if self.store['source']['provider'] != 'bigquery' :
# return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
# else:
# reader = transport.factory.instance(**self.store['source'])
# return reader.meta(table=self.info['from'])
reader = transport.factory.instance(**self.store['source'])
return reader.meta(table=self.info['from'])
def initalize(self):
reader = transport.factory.instance(**self.store['source'])
_read_args= self.info
if self._df is None :
self._df = reader.read(**_read_args)
#
# NOTE : PRE
# At this point we apply pre-processing of the data if there were ever a need for it
#
_log = {}
HAS_STATES = self._states is not None and 'pre' in self._states
NOT_GENERATING = self.name in ['Trainer','Shuffle']
IS_AUTOPILOT = self.autopilot
#
# allow calling pre-conditions if either of the conditions is true
# 1. states and not generating
# 2. IS_GENERATING and states and not autopilot
_ALLOW_PRE_CALL = (HAS_STATES and NOT_GENERATING) or (NOT_GENERATING is False and HAS_STATES and IS_AUTOPILOT is False)
if _ALLOW_PRE_CALL :
# if HAS_STATES and NOT_GENERATING or (HAS_STATES and IS_AUTOPILOT is False and NOT_GENERATING is False):
_logs = {'action':'status','input':{'pre':self._states['pre']}}
_beg = list(self._df.shape)
self._df = State.apply(self._df,self._states['pre'])
_end = list(self._df.shape)
_logs['input']['size'] = _beg,_end
self.log(**_log)
#
#
columns = self.columns if self.columns else self._df.columns
#
# Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases
# - The code below tries to address the issue (Perhaps better suited for the reading components)
for name in columns :
#
# randomly sampling 5 elements to make sense of data-types
if self._df[name].size < 5 :
continue
_index = np.random.choice(np.arange(self._df[name].shape[0]),5,False)
no_value = [type(value) in [int,float,np.int64,np.int32,np.float32,np.float64] for value in self._df[name].values[_index] if value is not None]
no_value = 0 if np.sum(no_value) > 0 else ''
try:
self._df[name] = self._df[name].fillna(no_value)
except Exception as e:
print (['.... skipping ',name,no_value])
finally:
pass
# _log[name] = self._df[name].dtypes.name
# _log[name] = reader.meta()
# _log = {'action':'structure','input':_log}
# self.log(**_log)
#
# convert the data to binary here ...
_schema = self.get_schema()
_args = {"schema":_schema,"data":self._df,"columns":columns}
if self._map :
_args['map'] = self._map
self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None
_log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } }
self.log(**_log)
def get(self):
if self.cache :
return self.cache if len(self.cache) > 0 else(self.cache if not self.cache else self.cache[0])
else:
return self._queue.get() if self._queue.qsize() > 0 else []
def listen(self):
while True :
_info = self._queue.get()
self.cache.append(_info)
self._queue.task_done()
def publish(self,caller):
if hasattr(caller,'_queue') :
_queue = caller._queue
_queue.put(self.cache)
# _queue.join()
pass
class Trainer(Learner):
"""
This will perform training using a GAN
"""
def __init__(self,**_args):
super().__init__(**_args)
# self.info = _args['info']
self.limit = int(_args['limit']) if 'limit' in _args else None
self.autopilot = _args['autopilot'] if 'autopilot' in _args else False
self.generate = None
self.candidates = int(_args['candidates']) if 'candidates' in _args else 1
self.checkpoint_skips = _args['checkpoint_skips'] if 'checkpoint_skips' in _args else None
def run(self):
self.initalize()
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
_space,_matrix = self._encoder.convert()
_args = self.network_args
if self.gpu :
_args['gpu'] = self.gpu
_args['real'] = _matrix
_args['candidates'] = self.candidates
if 'logger' in self.store :
_args['logger'] = transport.factory.instance(**self.store['logger'])
if self.checkpoint_skips :
_args['checkpoint_skips'] = self.checkpoint_skips
#
# At this point we have the binary matrix, we can initiate training
#
beg = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S')
gTrain = gan.Train(**_args)
gTrain.apply()
writer = transport.factory.instance(provider=transport.providers.FILE,context='write',path=os.sep.join([gTrain.out_dir,'map.json']))
writer.write(self._encoder._map,overwrite=True)
writer.close()
#
# @TODO: At this point we need to generate another some other objects
#
_args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df}
_args['logs'] = self.network_args['logs']
_args['autopilot'] = self.autopilot
if self.gpu :
_args['gpu'] = self.gpu
#
# Let us find the smallest, the item is sorted by loss on disk
#
_epochs = [_e for _e in gTrain.logs['epochs'] if _e['path'] != '']
_epochs.sort(key=lambda _item: _item['loss'],reverse=False)
_args['network_args']['max_epochs'] = _epochs[0]['epochs']
self.log(action='autopilot',input={'epoch':_epochs[0]})
# g.run()
end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S')
_min = float((end-beg).seconds/ 60)
_logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}}
self.log(**_logs)
if self.autopilot :
# g = Generator(**_args)
g = Generator(**self._arch['init'])
self._g = g
self._g.run()
#
#@TODO Find a way to have the data in the object ....
def generate (self):
if self.autopilot :
print( "Autopilot is set ... No need to call this function")
else:
raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object")
class Generator (Learner):
def __init__(self,**_args):
super().__init__(**_args)
#
# We need to load the mapping information for the space we are working with ...
#
self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1
# filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json'])
_suffix = self.network_args['context']
filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'meta-',_suffix+'.json'])
self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}})
if os.path.exists(filename):
file = open(filename)
self._map = json.loads(file.read())
file.close()
else:
self._map = {}
self.autopilot = False if 'autopilot' not in _args else _args['autopilot']
def run(self):
self.initalize()
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
#
# The values will be returned because we have provided _map information from the constructor
#
values,_matrix = self._encoder.convert()
_args = self.network_args
_args['map'] = self._map
_args['values'] = np.array(values)
_args['row_count'] = self._df.shape[0]
if self.gpu :
_args['gpu'] = self.gpu
if 'logger' in self.store :
_args['logger'] = transport.factory.instance(**self.store['logger'])
gHandler = gan.Predict(**_args)
gHandler.load_meta(columns=None)
_iomatrix = gHandler.apply()
_candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix]
_size = np.sum([len(_item) for _item in _iomatrix])
_log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}}
self.log(**_log)
# self.cache = _candidates
self.post(_candidates)
def approximate(self,_df):
_columns = self.info['approximate']
for name in _columns :
if _df[name].size > 100 :
BATCH_SIZE = 10
else:
BATCH_SIZE = 1
batches = np.array_split(_df[name].fillna(np.nan).values,BATCH_SIZE)
_type = np.int64 if 'int' in self.info['approximate'][name]else np.float64
x = []
_log = {'action':'approximate','input':{'batch':BATCH_SIZE,'col':name}}
for values in batches :
index = [ _x not in ['',None,np.nan] for _x in values]
if np.sum(index) == 0:
#
# Sometimes messy data has unpleasant surprises
continue
_values = np.random.rand( len(values[index]))
_values += np.std(values[index]) / 4
values[index] = list(values[index] + _values )if np.random.randint(0,2) else list(values[index] - _values)
values[index] = values[index].astype(_type)
x += values.tolist()
if x :
_log['input']['identical_percentage'] = 100 * (np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size))
_df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
self.log(**_log)
return _df
def make_date(self,**_args) :
"""
:param year initial value
"""
if _args['year'] in ['',None,np.nan] :
return None
year = int(_args['year'])
offset = _args['offset'] if 'offset' in _args else 0
month = np.random.randint(1,13)
if month == 2:
_end = 28 if year % 4 != 0 else 29
else:
_end = 31 if month in [1,3,5,7,8,10,12] else 30
day = np.random.randint(1,_end)
#-- synthetic date
_date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0)
FORMAT = '%Y-%m-%d'
_name = _args['field'] if 'field' in _args else None
if 'format' in self.info and _name in self.info['format']:
# _name = _args['field']
FORMAT = self.info['format'][_name]
# print ([_name,FORMAT, _date.strftime(FORMAT)])
r = []
if offset :
r = [_date.strftime(FORMAT)]
for _delta in offset :
_date = _date + timedelta(_delta)
r.append(_date.strptime(FORMAT))
return r
else:
return _date.strftime(FORMAT)
pass
def _format(self,_df,_schema):
"""
:_df data-frame being processed
:_schema table schema with types
"""
_columns = [_item['name'] for _item in _schema if _item['name'] in _df.columns]
_map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))}
# pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None)
for _item in _schema :
_name = _item['name']
if _item['type'] not in _map :
continue
_pointer = _map[_item['type']]
try:
if type(_pointer).__name__ == 'type':
if _item['type'] in ['INT64','FLOAT64'] :
novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0)
elif _item['type'] == 'STRING' :
novalue = ''
if _item['type'] in ['INT64','FLOAT64','STRING'] :
_df[_name] = _df[_name].fillna(novalue)
#
# This is not guaranteed to work but may help ...
_df[_name] = _df[_name].values.astype(_pointer)
else:
_df[_name] = _pointer(_df[_name])
pass
except Exception as e:
pass
# bqw = transport.factory.instance(**_store['target'])
# bqw.write(_df,schema=_schema)
return _df #[_columns]
def post(self,_candidates):
if 'target' in self.store :
_store = self.store['target'] if 'target' in self.store else {'provider':'console'}
_store['lock'] = True
_store['context'] = 'write' #-- Just in case
if 'table' not in _store :
_store['table'] = self.info['from']
else:
_store = None
N = 0
_haslist = np.sum([type(_item)==list for _item in self.columns]) > 0
_schema = self.get_schema()
#
# If the schema doesn't match the data we need to skip it
# This happens when the data comes from a query, the post processing needs to handle this
#
# _names = [_field['name'] for _field in _schema]
# _columns = _candidates[0].columns.tolist()
# _common = list( set(_columns) & set(_names) )
# if not (len(_common) == len(_columns) and len(_names) == len(_common)) :
# _schema = None
for _iodf in _candidates :
_df = self._df.copy()
if self.columns and _haslist is False:
_df[self.columns] = _iodf[self.columns]
else:
#
# In here we have the case of all attributes have undergone random permutations
#
_df = _iodf
N += _df.shape[0]
if self._states and 'post' in self._states:
_df = State.apply(_df,self._states['post'])
#
# Let us format the data frame so as to be able to minimize write errors
#
if _schema :
_df = self._format(_df,_schema)
# _df = self.format(_df,_schema)
# _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ]
self.log(**{"action":"consolidate","input":{"rows":N,"candidate":_candidates.index(_iodf)}})
if _store :
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}}
writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery' and _schema:
try:
_log['schema'] = _schema
writer.write(_df,schema=_schema)
except Exception as e:
print (e)
writer.write(_df)
else:
writer.write(_df)
self.log(**_log)
else:
self.cache.append(_df)
self.log(**{'action':'write','input':{'rows':N,'candidates':len(_candidates)}})
class Shuffle(Generator):
"""
This is a method that will yield data with low utility
"""
def __init__(self,**_args):
super().__init__(**_args)
if 'data' not in _args :
reader = transport.factory.instance(**self.store['source'])
self._df = reader.read(sql=self.info['sql'])
def run(self):
self.initalize()
#
# If we are given lists of columns instead of a list-of-list
# unpack the list
_invColumns = []
_colNames = []
_ucolNames= []
_rmColumns = []
for _item in self.info['columns'] :
if type(_item) == list :
_invColumns.append(_item)
_rmColumns += _item
elif _item in self._df.columns.tolist():
_colNames.append(_item)
#
# At this point we build the matrix of elements we are interested in considering the any unspecified column
#
if _colNames :
_invColumns.append(_colNames)
_ucolNames = list(set(self._df.columns) - set(_colNames) - set(_rmColumns))
if _ucolNames :
_invColumns += [ [_name] for _name in _ucolNames]
_xdf = pd.DataFrame()
_xdf = pd.DataFrame()
_index = np.arange(self._df.shape[0])
for _columns in _invColumns :
_tmpdf = self._df[_columns].copy()[_columns]
np.random.seed(1)
np.random.shuffle(_index)
# _values = _tmpdf.values[_index]
#_tmpdf = _tmpdf.iloc[_index]
_tmpdf = pd.DataFrame(_tmpdf.values[_index],columns=_columns)
if _xdf.shape[0] == 0 :
_xdf = _tmpdf
else:
_xdf = _xdf.join(_tmpdf)
_xdf = _xdf[self._df.columns]
self._df = _xdf
_log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}}
self.log(**_log)
try:
self.post([self._df])
self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}})
except Exception as e :
# print (e)
self.log(**{'action':'failed','input':{'msg':e,'info':self.info}})
class apply :
TRAIN,GENERATE,RANDOM = 'train','generate','random'
class factory :
_infocache = {}
@staticmethod
def instance(**_args):
"""
An instance of an object that trains and generates candidate datasets
:param gpu (optional) index of the gpu to be used if using one
:param store {source,target} if no target is provided console will be output
:param epochs (default 2) number of epochs to train
:param candidates(default 1) number of candidates to generate
:param info {columns,sql,from}
:param autopilot will generate output automatically
:param batch (default 2k) size of the batch
"""
#
if _args['apply'] in [apply.RANDOM] :
pthread = Shuffle(**_args)
elif _args['apply'] == apply.GENERATE :
pthread = Generator(**_args)
else:
pthread= Trainer(**_args)
if 'start' in _args and _args['start'] == True :
pthread.start()
return pthread
class plugins:
@staticmethod
def load(_config):
"""
This function attempts to load the plugins to insure they are valid
_config configuration for plugin specifications {pre:{pipeline,path},post:{pipeline,path}}
"""