data-maker/data/maker/__init__.py

308 lines
12 KiB
Python
Raw Normal View History

"""
(c) 2019 Data Maker, hiplab.mc.vanderbilt.edu
version 1.0.0
This package serves as a proxy to the overall usage of the framework.
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO:
- Make configurable GPU, EPOCHS
"""
import pandas as pd
import numpy as np
2020-01-05 05:02:15 +00:00
import data.gan as gan
2022-04-11 23:33:07 +00:00
import transport
2022-04-12 04:27:25 +00:00
# from data.bridge import Binary
2020-02-11 18:00:16 +00:00
import threading as thread
2021-03-29 16:10:57 +00:00
from data.maker import prepare
import copy
import os
import json
2022-04-11 23:33:07 +00:00
from multiprocessing import Process, RLock
2022-04-12 04:27:25 +00:00
from datetime import datetime, timedelta
2021-03-29 16:10:57 +00:00
2022-04-11 23:33:07 +00:00
class Learner(Process):
2022-04-13 16:11:23 +00:00
2022-04-11 23:33:07 +00:00
def __init__(self,**_args):
super(Learner, self).__init__()
2022-04-13 16:11:23 +00:00
self.ndx = 0
2022-04-11 23:33:07 +00:00
if 'gpu' in _args :
2022-04-11 23:33:07 +00:00
os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu'])
self.gpu = int(_args['gpu'])
else:
self.gpu = None
2022-04-11 23:33:07 +00:00
self.info = _args['info']
self.columns = self.info['columns'] if 'columns' in self.info else None
self.store = _args['store']
2022-04-13 15:07:27 +00:00
2022-04-11 23:33:07 +00:00
if 'network_args' not in _args :
self.network_args ={
2022-04-12 19:00:03 +00:00
'context':self.info['context'] ,
2022-04-11 23:33:07 +00:00
'logs':_args['logpath'] if 'logpath' in _args else 'logs',
'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2,
'batch_size':int (_args['batch']) if 'batch' in _args else 2000
}
else:
self.network_args = _args['network_args']
self._encoder = None
self._map = None
self._df = _args['data'] if 'data' in _args else None
2022-04-13 16:11:23 +00:00
self.name = self.__class__.__name__+'::'+self.info['from']
self.name = self.name.replace('?','')
2022-04-11 23:33:07 +00:00
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
2022-04-13 15:07:27 +00:00
2022-04-13 16:11:23 +00:00
_log = {'action':'init','context':self.info['context'],'gpu':(self.gpu if self.gpu is not None else -1)}
2022-04-13 15:07:27 +00:00
self.log(**_log)
2022-04-11 23:33:07 +00:00
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
2022-04-13 15:07:27 +00:00
def log(self,**_args):
logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True)
2022-04-13 16:11:23 +00:00
_args = dict({'ndx':self.ndx,'module':self.name,'info':self.info['context'],**_args})
2022-04-13 15:07:27 +00:00
logger.write(_args)
2022-04-13 16:11:23 +00:00
self.ndx += 1
2022-04-13 15:07:27 +00:00
if hasattr(logger,'close') :
logger.close()
2022-04-11 23:33:07 +00:00
def get_schema(self):
2022-04-12 04:27:25 +00:00
if self.store['source']['provider'] != 'bigquery' :
return [{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
else:
reader = transport.factory.instance(**self.store['source'])
return reader.meta(table=self.info['from'])
2022-04-11 23:33:07 +00:00
def initalize(self):
reader = transport.factory.instance(**self.store['source'])
_read_args= self.info
if self._df is None :
self._df = reader.read(**_read_args)
columns = self.columns if self.columns else self._df.columns
#
# convert the data to binary here ...
_args = {"schema":self.get_schema(),"data":self._df,"columns":columns}
if self._map :
_args['map'] = self._map
2022-04-12 19:59:46 +00:00
self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None
2022-04-13 15:07:27 +00:00
2022-04-13 16:11:23 +00:00
_log = {'action':'data-prep','input':{'rows':self._df.shape[0],'cols':self._df.shape[1]} }
2022-04-13 15:07:27 +00:00
self.log(**_log)
2022-04-11 23:33:07 +00:00
class Trainer(Learner):
"""
This will perform training using a GAN
"""
def __init__(self,**_args):
super().__init__(**_args)
# self.info = _args['info']
self.limit = int(_args['limit']) if 'limit' in _args else None
2022-04-11 23:33:07 +00:00
self.autopilot = _args['autopilot'] if 'autopilot' in _args else False
self.generate = None
self.candidates = int(_args['candidates']) if 'candidates' in _args else 1
2022-04-11 23:33:07 +00:00
def run(self):
self.initalize()
2022-04-12 19:59:46 +00:00
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
2022-04-11 23:33:07 +00:00
_space,_matrix = self._encoder.convert()
_args = self.network_args
if self.gpu :
_args['gpu'] = self.gpu
_args['real'] = _matrix
_args['candidates'] = self.candidates
#
# At this point we have the binary matrix, we can initiate training
#
beg = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
2022-04-11 23:33:07 +00:00
gTrain = gan.Train(**_args)
gTrain.apply()
writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json']))
writer.write(self._encoder._map,overwrite=True)
writer.close()
#
# @TODO: At this point we need to generate another some other objects
#
_args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df}
if self.gpu :
_args['gpu'] = self.gpu
g = Generator(**_args)
# g.run()
2022-04-13 15:07:27 +00:00
end = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
2022-04-13 16:11:23 +00:00
_logs = {'action':'train','input':{'start':beg,'end':end,"unique_counts":self._encoder._io[0]}}
2022-04-13 15:07:27 +00:00
self.log(**_logs)
2022-04-11 23:33:07 +00:00
self.generate = g
if self.autopilot :
self.generate.run()
def generate (self):
if self.autopilot :
print( "Autopilot is set ... No need to call this function")
else:
raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object")
class Generator (Learner):
def __init__(self,**_args):
super().__init__(**_args)
#
# We need to load the mapping information for the space we are working with ...
#
self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1
filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json'])
file = open(filename)
self._map = json.loads(file.read())
file.close()
def run(self):
self.initalize()
2022-04-12 19:59:46 +00:00
if self._encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
2022-04-11 23:33:07 +00:00
#
# The values will be returned because we have provided _map information from the constructor
#
2022-04-12 19:59:46 +00:00
2022-04-11 23:33:07 +00:00
values,_matrix = self._encoder.convert()
_args = self.network_args
_args['map'] = self._map
_args['values'] = np.array(values)
_args['row_count'] = self._df.shape[0]
2022-04-12 19:50:19 +00:00
if self.gpu :
_args['gpu'] = self.gpu
2022-04-11 23:33:07 +00:00
gHandler = gan.Predict(**_args)
gHandler.load_meta(columns=None)
_iomatrix = gHandler.apply()
2022-04-11 23:33:07 +00:00
_candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix]
2022-04-13 15:07:27 +00:00
_size = np.sum([len(_item) for _item in _iomatrix])
2022-04-13 16:11:23 +00:00
_log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}}
2022-04-13 15:07:27 +00:00
self.log(**_log)
2022-04-11 23:33:07 +00:00
self.post(_candidates)
2022-04-12 04:27:25 +00:00
def approximate(self,_df):
2022-04-11 23:33:07 +00:00
_columns = self.info['approximate']
2022-04-11 23:33:07 +00:00
for name in _columns :
if _df[name].size > 100 :
BATCH_SIZE = 10
else:
BATCH_SIZE = 1
batches = np.array_split(_df[name].fillna(np.nan).values,BATCH_SIZE)
2022-04-12 04:27:25 +00:00
_type = np.int64 if 'int' in self.info['approximate'][name]else np.float64
2022-04-11 23:33:07 +00:00
x = []
2022-04-13 16:11:23 +00:00
_log = {'action':'approximate','input':{'batch':BATCH_SIZE,'col':name}}
2022-04-11 23:33:07 +00:00
for values in batches :
index = [ _x not in ['',None,np.nan] for _x in values]
2022-04-12 04:27:25 +00:00
_values = np.random.dirichlet(values[index].astype(_type))
values[index] = list(values[index] + _values )if np.random.randint(0,2) else list(values[index] - _values)
values[index] = values[index].astype(_type)
x += values.tolist()
if x :
2022-04-13 15:07:27 +00:00
_log['input']['diff_pct'] = 100 * (1 - np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size))
2022-04-12 04:27:25 +00:00
_df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
2022-04-13 15:07:27 +00:00
self.log(**_log)
2022-04-11 23:33:07 +00:00
return _df
2022-04-12 04:27:25 +00:00
def make_date(self,**_args) :
"""
:param year initial value
"""
if _args['year'] in ['',None,np.nan] :
return None
year = int(_args['year'])
offset = _args['offset'] if 'offset' in _args else 0
month = np.random.randint(1,13)
if month == 2:
_end = 28 if year % 4 != 0 else 29
else:
_end = 31 if month in [1,3,5,7,8,10,12] else 30
day = np.random.randint(1,_end)
#-- synthetic date
_date = datetime(year=year,month=month,day=day)
2022-04-12 19:00:03 +00:00
FORMAT = '%Y-%m-%d'
if 'format' in self.info and 'field' in _args and _args['field'] in self.info['format']:
_name = _args['field']
FORMAT = self.info['format'][_name]
2022-04-12 04:27:25 +00:00
r = []
if offset :
r = [_date.strftime(FORMAT)]
for _delta in offset :
_date = _date + timedelta(_delta)
r.append(_date.strftime(FORMAT))
return r
else:
return _date.strftime(FORMAT)
pass
2022-04-13 15:55:55 +00:00
def format(self,_df,_schema):
for _item in _schema :
name = _item['name']
if _item['type'].upper() in ['DATETIME','TIMESTAMP'] :
_df[name] = pd.to_datetime(_df[name], format='%Y-%m-%d %H:%M:%S').astype('datetime64[ns]')
return _df
2022-04-11 23:33:07 +00:00
pass
def post(self,_candidates):
_store = self.store['target'] if 'target' in self.store else {'provider':'console'}
_store['lock'] = True
_store['context'] = 'write' #-- Just in case
2022-04-12 19:00:03 +00:00
if 'table' not in _store :
_store['table'] = self.info['from']
2022-04-11 23:33:07 +00:00
writer = transport.factory.instance(**_store)
N = 0
2022-04-11 23:33:07 +00:00
for _iodf in _candidates :
_df = self._df.copy()
_df[self.columns] = _iodf[self.columns]
N += _df.shape[0]
#
#@TODO:
# Improve formatting with better post-processing pipeline
2022-04-12 04:27:25 +00:00
if 'approximate' in self.info :
_df = self.approximate(_df)
if 'make_date' in self.info :
for name in self.info['make_date'] :
# iname = self.info['make_date']['init_field']
iname = self.info['make_date'][name]
years = _df[iname]
2022-04-12 19:00:03 +00:00
_dates = [self.make_date(year=year,field=name) for year in years]
2022-04-12 04:27:25 +00:00
if _dates :
2022-04-13 15:55:55 +00:00
_df[name] = _dates
2022-04-12 19:00:03 +00:00
_schema = self.get_schema()
_schema = [{'name':_item.name,'type':_item.field_type} for _item in _schema]
2022-04-13 15:55:55 +00:00
_df = self.format(_df,_schema)
writer.write(_df,schema=_schema)
2022-04-13 15:07:27 +00:00
2022-04-13 16:11:23 +00:00
self.log(**{'action':'write','input':{'rows':N,'candidates':len(_candidates)}})
2022-04-11 23:33:07 +00:00
class factory :
_infocache = {}
@staticmethod
def instance(**_args):
"""
An instance of an object that trains and generates candidate datasets
:param gpu (optional) index of the gpu to be used if using one
:param store {source,target} if no target is provided console will be output
:param epochs (default 2) number of epochs to train
:param candidates(default 1) number of candidates to generate
:param info {columns,sql,from}
:param autopilot will generate output automatically
:param batch (default 2k) size of the batch
"""
return Trainer(**_args)