data-maker/data/maker/__init__.py

592 lines
23 KiB
Python

"""
(c) 2019 Data Maker, hiplab.mc.vanderbilt.edu
version 1.0.0
This package serves as a proxy to the overall usage of the framework.
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO:
- Make configurable GPU, EPOCHS
"""
import pandas as pd
import numpy as np
import data.gan as gan
import transport
# from data.bridge import Binary
import threading
from data.maker import prepare
from data.maker.state import State
import copy
import os
import nujson as json
from multiprocessing import Process, RLock
from datetime import datetime, timedelta
from multiprocessing import Queue
import time
class Learner(Process):
def __init__(self,**_args):
super(Learner, self).__init__()
self.ndx = 0
self._queue = Queue()
self.lock = RLock()
if 'gpu' in _args :
os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu'])
self.gpu = int(_args['gpu'])
else:
self.gpu = None
self.info = _args['info']
self.columns = self.info['columns'] if 'columns' in self.info else None
self.store = _args['store']
if 'network_args' not in _args :
self.network_args ={
'context':self.info['context'] ,
'logs':_args['logs'] if 'logs' in _args else 'logs',
'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2,
'batch_size':int (_args['batch']) if 'batch' in _args else 2000
}
else:
self.network_args = _args['network_args']
self._encoder = None
self._map = None
self._df = _args['data'] if 'data' in _args else None
self.name = self.__class__.__name__
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
_log = {'action':'init','gpu':(self.gpu if self.gpu is not None else -1)}
self.log(**_log)
self.cache = []
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
self.logger = None
if 'logger' in self.store :
self.logger = transport.factory.instance(**self.store['logger'])
self.autopilot = False #-- to be set by caller
self._initStateSpace()
def _initStateSpace(self):
"""
Initializing state-space for the data-maker, The state-space functions are used as pre-post processing functions applied to the data accordingly i.e
- Trainer -> pre-processing
- Generation -> post processing
The specifications of a state space in the configuration file is as such
state:{pre:{path,pipeline:[]}, post:{path,pipeline:[]}}
"""
self._states = None
if 'state' in self.info :
try:
_config = self.info ['state']
self._states = State.instance(_config)
except Exception as e:
print (e)
pass
finally:
# __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records')
if self._states :
__info = {}
for key in self._states :
__info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key]]
self.log(object='state-space',action='load',input=__info)
def log(self,**_args):
try:
_context = self.info['context']
_label = self.info['info'] if 'info' in self.info else _context
# logger =
_args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args})
if 'logger' in self.store :
logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider=transport.providers.CONSOLE,context='write',lock=True)
logger.write(_args)
self.ndx += 1
# if hasattr(logger,'close') :
# logger.close()
pass
except Exception as e:
# print ()
# print (_args)
# print (e)
pass
finally:
pass
def get_schema(self):
# if self.store['source']['provider'] != 'bigquery' :
# return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
# else:
# reader = transport.factory.instance(**self.store['source'])
# return reader.meta(table=self.info['from'])
reader = transport.factory.instance(**self.store['source'])
return reader.meta(table=self.info['from'])
def initalize(self):
reader = transport.factory.instance(**self.store['source'])
_read_args= self.info
if self._df is None :
self._df = reader.read(**_read_args)
#
# NOTE : PRE
# At this point we apply pre-processing of the data if there were ever a need for it
#
_log = {}
HAS_STATES = self._states is not None and 'pre' in self._states
NOT_GENERATING = self.name in ['Trainer','Shuffle']
IS_AUTOPILOT = self.autopilot
#
# allow calling pre-conditions if either of the conditions is true
# 1. states and not generating
# 2. IS_GENERATING and states and not autopilot
_ALLOW_PRE_CALL = (HAS_STATES and NOT_GENERATING) or (NOT_GENERATING is False and HAS_STATES and IS_AUTOPILOT is False)
if _ALLOW_PRE_CALL :
# if HAS_STATES and NOT_GENERATING or (HAS_STATES and IS_AUTOPILOT is False and NOT_GENERATING is False):
_logs = {'action':'status','input':{'pre':self._states['pre']}}
_beg = list(self._df.shape)
self._df = State.apply(self._df,self._states['pre'])
_end = list(self._df.shape)
_logs['input']['size'] = _beg,_end
self.log(**_log)
#
#
columns = self.columns if self.columns else self._df.columns
#
# Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases
# - The code below tries to address the issue (Perhaps better suited for the reading components)
for name in columns :
#
# randomly sampling 5 elements to make sense of data-types
if self._df[name].size < 5 :
continue
_index = np.random.choice(np.arange(self._df[name].size),5,False)
no_value = [type(value) in [int,float,np.int64,np.int32,np.float32,np.float64] for value in self._df[name].values[_index]]
no_value = 0 if np.sum(no_value) > 0 else ''
try:
self._df[name] = self._df[name].fillna(no_value)
finally:
pass
_log[name] = self._df[name].dtypes.name
_log = {'action':'structure','input':_log}
self.log(**_log)
#
# convert the data to binary here ...
_schema = self.get_schema()
_args = {"schema":_schema,"data":self._df,"columns":columns}
if self._map :
_args['map'] = self._map
self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None
_log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } }
self.log(**_log)
def get(self):
if self.cache :
return self.cache if len(self.cache) > 0 else(self.cache if not self.cache else self.cache[0])
else:
return self._queue.get() if self._queue.qsize() > 0 else []
def listen(self):
while True :
_info = self._queue.get()
self.cache.append(_info)
self._queue.task_done()
def publish(self,caller):
if hasattr(caller,'_queue') :
_queue = caller._queue
_queue.put(self.cache)
# _queue.join()
pass
class Trainer(Learner):
"""
This will perform training using a GAN
"""
def __init__(self,**_args):
super().__init__(**_args)
# self.info = _args['info']
self.limit = int(_args['limit']) if 'limit' in _args else None
self.autopilot = _args['autopilot'] if 'autopilot' in _args else False
self.generate = None
self.candidates = int(_args['candidates']) if 'candidates' in _args else 1
self.checkpoint_skips = _args['checkpoint_skips'] if 'checkpoint_skips' in _args else None
def run(self):
self.initalize()
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
_space,_matrix = self._encoder.convert()
_args = self.network_args
if self.gpu :
_args['gpu'] = self.gpu
_args['real'] = _matrix
_args['candidates'] = self.candidates
if 'logger' in self.store :
_args['logger'] = transport.factory.instance(**self.store['logger'])
if self.checkpoint_skips :
_args['checkpoint_skips'] = self.checkpoint_skips
#
# At this point we have the binary matrix, we can initiate training
#
beg = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S')
gTrain = gan.Train(**_args)
gTrain.apply()
writer = transport.factory.instance(provider=transport.providers.FILE,context='write',path=os.sep.join([gTrain.out_dir,'map.json']))
writer.write(self._encoder._map,overwrite=True)
writer.close()
#
# @TODO: At this point we need to generate another some other objects
#
_args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df}
_args['logs'] = self.network_args['logs']
_args['autopilot'] = self.autopilot
if self.gpu :
_args['gpu'] = self.gpu
#
# Let us find the smallest, the item is sorted by loss on disk
#
_epochs = [_e for _e in gTrain.logs['epochs'] if _e['path'] != '']
_epochs.sort(key=lambda _item: _item['loss'],reverse=False)
_args['network_args']['max_epochs'] = _epochs[0]['epochs']
self.log(action='autopilot',input={'epoch':_epochs[0]})
g = Generator(**_args)
# g.run()
end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S')
_min = float((end-beg).seconds/ 60)
_logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}}
self.log(**_logs)
self._g = g
if self.autopilot :
self._g.run()
#
#@TODO Find a way to have the data in the object ....
def generate (self):
if self.autopilot :
print( "Autopilot is set ... No need to call this function")
else:
raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object")
class Generator (Learner):
def __init__(self,**_args):
super().__init__(**_args)
#
# We need to load the mapping information for the space we are working with ...
#
self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1
filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json'])
self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}})
if os.path.exists(filename):
file = open(filename)
self._map = json.loads(file.read())
file.close()
else:
self._map = {}
self.autopilot = False if 'autopilot' not in _args else _args['autopilot']
def run(self):
self.initalize()
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
#
# The values will be returned because we have provided _map information from the constructor
#
values,_matrix = self._encoder.convert()
_args = self.network_args
_args['map'] = self._map
_args['values'] = np.array(values)
_args['row_count'] = self._df.shape[0]
if self.gpu :
_args['gpu'] = self.gpu
if 'logger' in self.store :
_args['logger'] = transport.factory.instance(**self.store['logger'])
gHandler = gan.Predict(**_args)
gHandler.load_meta(columns=None)
_iomatrix = gHandler.apply()
_candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix]
_size = np.sum([len(_item) for _item in _iomatrix])
_log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}}
self.log(**_log)
# self.cache = _candidates
self.post(_candidates)
def approximate(self,_df):
_columns = self.info['approximate']
for name in _columns :
if _df[name].size > 100 :
BATCH_SIZE = 10
else:
BATCH_SIZE = 1
batches = np.array_split(_df[name].fillna(np.nan).values,BATCH_SIZE)
_type = np.int64 if 'int' in self.info['approximate'][name]else np.float64
x = []
_log = {'action':'approximate','input':{'batch':BATCH_SIZE,'col':name}}
for values in batches :
index = [ _x not in ['',None,np.nan] for _x in values]
if np.sum(index) == 0:
#
# Sometimes messy data has unpleasant surprises
continue
_values = np.random.rand( len(values[index]))
_values += np.std(values[index]) / 4
values[index] = list(values[index] + _values )if np.random.randint(0,2) else list(values[index] - _values)
values[index] = values[index].astype(_type)
x += values.tolist()
if x :
_log['input']['identical_percentage'] = 100 * (np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size))
_df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
self.log(**_log)
return _df
def make_date(self,**_args) :
"""
:param year initial value
"""
if _args['year'] in ['',None,np.nan] :
return None
year = int(_args['year'])
offset = _args['offset'] if 'offset' in _args else 0
month = np.random.randint(1,13)
if month == 2:
_end = 28 if year % 4 != 0 else 29
else:
_end = 31 if month in [1,3,5,7,8,10,12] else 30
day = np.random.randint(1,_end)
#-- synthetic date
_date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0)
FORMAT = '%Y-%m-%d'
_name = _args['field'] if 'field' in _args else None
if 'format' in self.info and _name in self.info['format']:
# _name = _args['field']
FORMAT = self.info['format'][_name]
# print ([_name,FORMAT, _date.strftime(FORMAT)])
r = []
if offset :
r = [_date.strftime(FORMAT)]
for _delta in offset :
_date = _date + timedelta(_delta)
r.append(_date.strptime(FORMAT))
return r
else:
return _date.strftime(FORMAT)
pass
def format(self,_df,_schema):
r = {}
for _item in _schema :
name = _item['name']
if _item['type'].upper() in ['DATE','DATETIME','TIMESTAMP'] :
FORMAT = '%Y-%m-%d'
try:
#
#-- Sometimes data isn't all it's meant to be
SIZE = -1
if 'format' in self.info and name in self.info['format'] :
FORMAT = self.info['format'][name]
SIZE = 10
elif _item['type'] in ['DATETIME','TIMESTAMP'] :
FORMAT = '%Y-%m-%-d %H:%M:%S'
SIZE = 19
if SIZE > 0 :
values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64)
# _df[name] = [_date[:SIZE].strip() for _date in values]
# _df[name] = _df[name].astype(str)
r[name] = FORMAT
# _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]')
if _item['type'] in ['DATETIME','TIMESTAMP']:
pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]')
except Exception as e:
pass
finally:
pass
else:
#
# Because types are inferred on the basis of the sample being processed they can sometimes be wrong
# To help disambiguate we add the schema information
_type = None
if 'int' in _df[name].dtypes.name or 'int' in _item['type'].lower():
_type = np.int
elif 'float' in _df[name].dtypes.name or 'float' in _item['type'].lower():
_type = np.float
if _type :
_df[name] = _df[name].fillna(0).replace(' ',0).replace('',0).replace('NA',0).replace('nan',0).astype(_type)
# else:
# _df[name] = _df[name].astype(str)
# _df = _df.replace('NaT','').replace('NA','')
if r :
self.log(**{'action':'format','input':r})
return _df
pass
def post(self,_candidates):
if 'target' in self.store :
_store = self.store['target'] if 'target' in self.store else {'provider':'console'}
_store['lock'] = True
_store['context'] = 'write' #-- Just in case
if 'table' not in _store :
_store['table'] = self.info['from']
else:
_store = None
N = 0
for _iodf in _candidates :
_df = self._df.copy()
_df[self.columns] = _iodf[self.columns]
N += _df.shape[0]
if self._states and 'post' in self._states:
_df = State.apply(_df,self._states['post'])
# #
# #@TODO:
# # Improve formatting with better post-processing pipeline
# if 'approximate' in self.info :
# _df = self.approximate(_df)
# if 'make_date' in self.info :
# for name in self.info['make_date'] :
# # iname = self.info['make_date']['init_field']
# iname = self.info['make_date'][name]
# years = _df[iname]
# _dates = [self.make_date(year=_year,field=name) for _year in years]
# if _dates :
# _df[name] = _dates
_schema = self.get_schema()
_df = self.format(_df,_schema)
_log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ]
self.log(**{"action":"consolidate","input":_log})
if _store :
writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery':
writer.write(_df,schema=[],table=self.info['from'])
else:
writer.write(_df,table=self.info['from'])
else:
self.cache.append(_df)
self.log(**{'action':'write','input':{'rows':N,'candidates':len(_candidates)}})
class Shuffle(Generator):
"""
This is a method that will yield data with low utility
"""
def __init__(self,**_args):
super().__init__(**_args)
def run(self):
np.random.seed(1)
self.initalize()
_index = np.arange(self._df.shape[0])
np.random.shuffle(_index)
np.random.shuffle(_index)
_iocolumns = self.info['columns']
_ocolumns = list(set(self._df.columns) - set(_iocolumns) )
# _iodf = pd.DataFrame(self._df[_ocolumns],self._df.loc[_index][_iocolumns],index=np.arange(_index.size))
_iodf = pd.DataFrame(self._df[_iocolumns].copy(),index = np.arange(_index.size))
# self._df = self._df.loc[_index][_ocolumns].join(_iodf)
self._df = self._df.loc[_index][_ocolumns]
self._df.index = np.arange(self._df.shape[0])
self._df = self._df.join(_iodf)
#
# The following is a full shuffle
self._df = self._df.loc[_index]
self._df.index = np.arange(self._df.shape[0])
_log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}}
self.log(**_log)
try:
self.post([self._df])
self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}})
except Exception as e :
# print (e)
self.log(**{'action':'failed','input':{'msg':e,'info':self.info}})
class apply :
TRAIN,GENERATE,RANDOM = 'train','generate','random'
class factory :
_infocache = {}
@staticmethod
def instance(**_args):
"""
An instance of an object that trains and generates candidate datasets
:param gpu (optional) index of the gpu to be used if using one
:param store {source,target} if no target is provided console will be output
:param epochs (default 2) number of epochs to train
:param candidates(default 1) number of candidates to generate
:param info {columns,sql,from}
:param autopilot will generate output automatically
:param batch (default 2k) size of the batch
"""
if _args['apply'] in [apply.RANDOM] :
pthread = Shuffle(**_args)
elif _args['apply'] == apply.GENERATE :
pthread = Generator(**_args)
else:
pthread= Trainer(**_args)
if 'start' in _args and _args['start'] == True :
pthread.start()
return pthread