bug fix: crash with dataset & epochs

This commit is contained in:
Steve Nyemba 2022-09-16 18:18:15 -05:00
parent 4398212caf
commit 0efd4b13bc
5 changed files with 167 additions and 44 deletions

View File

@ -13,17 +13,19 @@ This package is designed to generate synthetic data from a dataset from an origi
After installing the easiest way to get started is as follows (using pandas). The process is as follows:
Read about [data-transport on github](https://github.com/lnyemba/data-transport) or on [healthcareio.the-phi.com/git/code/transport](https://healthcareio.the-phi.com/git/code/transport.git)
**Train the GAN on the original/raw dataset**
1. We define the data sources
The sources will consists in source, target and logger20.
import pandas as pd
import data.maker
import transport
from transport import providers
df = pd.read_csv('sample.csv')
column = 'gender'
id = 'id'
context = 'demo'
data.maker.train(context=context,data=df,column=column,id=id,logs='logs')
The trainer will store the data on disk (for now) in a structured folder that will hold training models that will be used to generate the synthetic data.

View File

@ -3,3 +3,4 @@ from data.params import SYS_ARGS
import transport
from multiprocessing import Process, Queue
from data.maker import prepare
from data.maker import state

View File

@ -100,6 +100,13 @@ class GNet :
self.TOTAL_BATCHSIZE = self.BATCHSIZE_PER_GPU * self.NUM_GPUS
self.STEPS_PER_EPOCH = 256 #int(np.load('ICD9/train.npy').shape[0] / 2000)
self.MAX_EPOCHS = 10 if 'max_epochs' not in args else int(args['max_epochs'])
CHECKPOINT_SKIPS = 10
if self.MAX_EPOCHS < 2*CHECKPOINT_SKIPS :
CHECKPOINT_SKIPS = 2
self.CHECKPOINTS = np.repeat( np.divide(self.MAX_EPOCHS,CHECKPOINT_SKIPS),CHECKPOINT_SKIPS ).cumsum().astype(int).tolist()
self.ROW_COUNT = args['real'].shape[0] if 'real' in args else 100
self.CONTEXT = args['context']
self.ATTRIBUTES = {"id":args['column_id'] if 'column_id' in args else None,"synthetic":args['column'] if 'column' in args else None}
@ -120,14 +127,18 @@ class GNet :
for key in ['train','output'] :
self.mkdir(os.sep.join([self.log_dir,key]))
self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT]))
if 'partition' in args :
self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT,str(args['partition'])]))
# if 'partition' in args :
# self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT,str(args['partition'])]))
self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT])
self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT])
if 'partition' in args :
self.train_dir = os.sep.join([self.train_dir,str(args['partition'])])
self.out_dir = os.sep.join([self.out_dir,str(args['partition'])])
for checkpoint in self.CHECKPOINTS :
self.mkdir (os.sep.join([self.train_dir,str(checkpoint)]))
self.mkdir (os.sep.join([self.out_dir,str(checkpoint)]))
# if self.logger :
# We will clear the logs from the data-store
@ -150,12 +161,13 @@ class GNet :
attr = json.loads((open(_name)).read())
for key in attr :
value = attr[key]
setattr(self,key,value)
if not hasattr(self,key):
setattr(self,key,value)
self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT])
self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT])
if 'partition' in args :
self.train_dir = os.sep.join([self.train_dir,str(args['partition'])])
self.out_dir = os.sep.join([self.out_dir,str(args['partition'])])
# if 'partition' in args :
# self.train_dir = os.sep.join([self.train_dir,str(args['partition'])])
# self.out_dir = os.sep.join([self.out_dir,str(args['partition'])])
def log_meta(self,**args) :
@ -183,15 +195,24 @@ class GNet :
suffix = self.CONTEXT #self.get.suffix()
_name = os.sep.join([self.out_dir,'meta-'+suffix])
f = open(_name+'.json','w')
f.write(json.dumps(_object))
# f = open(_name+'.json','w')
# f.write(json.dumps(_object))
# f.close()
for _info in [{"name":os.sep.join([self.out_dir,'meta-'+suffix+'.json']),"data":_object},{"name":os.sep.join([self.out_dir,'epochs.json']),"data":self.logs['epochs'] if 'epochs' in self.logs else []}] :
f = open(_info['name'],'w')
f.write(json.dumps(_info['data']))
f.close()
return _object
def mkdir (self,path):
if not os.path.exists(path) :
if os.sep in path :
pass
root = []
for loc in path.split(os.sep) :
for loc in path.strip().split(os.sep) :
if loc == '' :
root.append(os.sep)
root.append(loc)
if not os.path.exists(os.sep.join(root)) :
os.mkdir(os.sep.join(root))
@ -278,8 +299,10 @@ class Generator (GNet):
tf.compat.v1.add_to_collection('glosses', loss)
return loss, loss
def load_meta(self, **args):
super().load_meta(**args)
# super().load_meta(**args)
self.discriminator.load_meta(**args)
def network(self,**args) :
"""
This function will build the network that will generate the synthetic candidates
@ -381,6 +404,7 @@ class Train (GNet):
self.logger.write({"module":"gan-train","action":"start","input":{"partition":self.PARTITION,"meta":self.meta} } )
# self.log (real_shape=list(self._REAL.shape),label_shape = self._LABEL.shape,meta_data=self.meta)
def load_meta(self, column):
"""
@ -445,7 +469,7 @@ class Train (GNet):
else :
dataset = tf.data.Dataset.from_tensor_slices(features_placeholder)
# labels_placeholder = None
dataset = dataset.repeat(10000)
dataset = dataset.repeat(20000)
dataset = dataset.batch(batch_size=self.BATCHSIZE_PER_GPU)
dataset = dataset.prefetch(1)
@ -472,9 +496,11 @@ class Train (GNet):
if self._LABEL is not None :
(real, label) = iterator.get_next()
else:
real = iterator.get_next()
label= None
loss, w = self.loss(scope=scope, stage=stage, real=real, label=label)
#tf.get_variable_scope().reuse_variables()
tf.compat.v1.get_variable_scope().reuse_variables()
#vars_ = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=stage)
@ -507,6 +533,7 @@ class Train (GNet):
# init = tf.global_variables_initializer()
init = tf.compat.v1.global_variables_initializer()
logs = []
self.logs['epochs'] = []
#with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
with tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
@ -536,25 +563,41 @@ class Train (GNet):
logs.append({"epoch": int(epoch),"distance":float(-w_sum/(self.STEPS_PER_EPOCH*2)) })
# if epoch % self.MAX_EPOCHS == 0:
if epoch in [5,10,20,50,75, self.MAX_EPOCHS] :
# if epoch in [5,10,20,50,75, self.MAX_EPOCHS] :
if epoch in self.CHECKPOINTS or int(epoch) == 1:
# suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic']
suffix = self.CONTEXT #self.get.suffix()
_name = os.sep.join([self.train_dir,suffix])
_name = os.sep.join([self.train_dir,str(epoch),suffix])
# saver.save(sess, self.train_dir, write_meta_graph=False, global_step=epoch)
saver.save(sess, _name, write_meta_graph=False, global_step=epoch)
#
#
if self.logger :
row = {"module":"gan-train","action":"logs","input":{"partition":self.PARTITION,"logs":logs}} #,"model":pickle.dump(sess)}
self.logger.write(row)
#
#
logs = [{"path":_name,"epochs":int(epoch),"loss":float(-w_sum/(self.STEPS_PER_EPOCH*2))}]
if self.logger :
# row = {"module":"gan-train","action":"epochs","input":{"logs":logs}} #,"model":pickle.dump(sess)}
# self.logger.write(row)
self.logs['epochs'] += logs
#
# @TODO:
# We should upload the files in the checkpoint
# This would allow the learnt model to be portable to another system
#
tf.compat.v1.reset_default_graph()
#
# let's sort the epochs we've logged thus far (if any)
#
self.logs['epochs'].sort(key=lambda _item: _item['loss'])
if self.logger :
_log = {'module':'gan-train','action':'epochs','input':self.logs['epochs']}
self.logger.write(_log)
#
# @TODO:
# Make another copy of this on disk to be able to load it should we not have a logger setup
#
self.log_meta()
class Predict(GNet):
"""
This class uses synthetic data given a learned model
@ -565,6 +608,7 @@ class Predict(GNet):
self.values = args['values']
self.ROW_COUNT = args['row_count']
self.oROW_COUNT = self.ROW_COUNT
# self.MISSING_VALUES = np.nan_to_num(np.nan)
# if 'no_value' in args and args['no_value'] not in ['na','','NA'] :
# self.MISSING_VALUES = args['no_value']
@ -577,9 +621,20 @@ class Predict(GNet):
super().load_meta(**args)
self.generator.load_meta(**args)
self.ROW_COUNT = self.oROW_COUNT
#
# updating the input/output for the generator, so it points properly
#
for object in [self,self.generator] :
_train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT,str(self.MAX_EPOCHS)])
_out_dir= os.sep.join([self.log_dir,'output',self.CONTEXT,str(self.MAX_EPOCHS)])
setattr(object,'train_dir',_train_dir)
setattr(object,'out_dir',_out_dir)
def apply(self,**args):
suffix = self.CONTEXT #self.get.suffix()
model_dir = os.sep.join([self.train_dir,suffix+'-'+str(self.MAX_EPOCHS)])
# model_dir = os.sep.join([self.train_dir,str(self.MAX_EPOCHS)])
demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo']
#
# setup computational graph

View File

@ -15,6 +15,7 @@ import transport
# from data.bridge import Binary
import threading
from data.maker import prepare
from data.maker.state import State
import copy
import os
import nujson as json
@ -25,6 +26,7 @@ from multiprocessing import Queue
import time
class Learner(Process):
def __init__(self,**_args):
@ -48,7 +50,7 @@ class Learner(Process):
if 'network_args' not in _args :
self.network_args ={
'context':self.info['context'] ,
'logs':_args['logpath'] if 'logpath' in _args else 'logs',
'logs':_args['logs'] if 'logs' in _args else 'logs',
'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2,
'batch_size':int (_args['batch']) if 'batch' in _args else 2000
}
@ -72,6 +74,36 @@ class Learner(Process):
self.logger = None
if 'logger' in self.store :
self.logger = transport.factory.instance(**self.store['logger'])
self.autopilot = False #-- to be set by caller
self._initStateSpace()
def _initStateSpace(self):
"""
Initializing state-space for the data-maker, The state-space functions are used as pre-post processing functions applied to the data accordingly i.e
- Trainer -> pre-processing
- Generation -> post processing
The specifications of a state space in the configuration file is as such
state:{pre:{path,pipeline:[]}, post:{path,pipeline:[]}}
"""
self._states = None
if 'state' in self.info :
try:
_config = self.info ['state']
self._states = State.instance(_config)
except Exception as e:
print (e)
pass
finally:
# __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records')
if self._states :
__info = {}
for key in self._states :
__info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key]]
self.log(object='state-space',action='load',input=__info)
def log(self,**_args):
try:
@ -108,11 +140,36 @@ class Learner(Process):
_read_args= self.info
if self._df is None :
self._df = reader.read(**_read_args)
#
# NOTE : PRE
# At this point we apply pre-processing of the data if there were ever a need for it
#
_log = {}
HAS_STATES = self._states is not None and 'pre' in self._states
NOT_GENERATING = self.name in ['Trainer','Shuffle']
IS_AUTOPILOT = self.autopilot
#
# allow calling pre-conditions if either of the conditions is true
# 1. states and not generating
# 2. IS_GENERATING and states and not autopilot
_ALLOW_PRE_CALL = (HAS_STATES and NOT_GENERATING) or (NOT_GENERATING is False and HAS_STATES and IS_AUTOPILOT is False)
if _ALLOW_PRE_CALL :
# if HAS_STATES and NOT_GENERATING or (HAS_STATES and IS_AUTOPILOT is False and NOT_GENERATING is False):
_logs = {'action':'status','input':{'pre':self._states['pre']}}
_beg = list(self._df.shape)
self._df = State.apply(self._df,self._states['pre'])
_end = list(self._df.shape)
_logs['input']['size'] = _beg,_end
self.log(**_log)
#
#
columns = self.columns if self.columns else self._df.columns
#
# Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases
# - The code below tries to address the issue (Perhaps better suited for the reading components)
_log = {}
for name in columns :
#
# randomly sampling 5 elements to make sense of data-types
@ -201,8 +258,14 @@ class Trainer(Learner):
# @TODO: At this point we need to generate another some other objects
#
_args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df}
_args['logs'] = self.network_args['logs']
_args['autopilot'] = self.autopilot
if self.gpu :
_args['gpu'] = self.gpu
#
# Let us find the smallest, the item is sorted by loss ...
_args['epochs'] = gTrain.logs['epochs'][0]['epochs']
g = Generator(**_args)
# g.run()
@ -239,6 +302,7 @@ class Generator (Learner):
file.close()
else:
self._map = {}
self.autopilot = False if 'autopilot' not in _args else _args['autopilot']
def run(self):
self.initalize()
if self._encoder is None :
@ -416,33 +480,32 @@ class Generator (Learner):
_df = self._df.copy()
_df[self.columns] = _iodf[self.columns]
N += _df.shape[0]
#
#@TODO:
# Improve formatting with better post-processing pipeline
if 'approximate' in self.info :
_df = self.approximate(_df)
if 'make_date' in self.info :
for name in self.info['make_date'] :
# iname = self.info['make_date']['init_field']
iname = self.info['make_date'][name]
if self._states :
_df = State.apply(_df,self._states['post'])
# #
# #@TODO:
# # Improve formatting with better post-processing pipeline
# if 'approximate' in self.info :
# _df = self.approximate(_df)
# if 'make_date' in self.info :
# for name in self.info['make_date'] :
# # iname = self.info['make_date']['init_field']
# iname = self.info['make_date'][name]
years = _df[iname]
_dates = [self.make_date(year=_year,field=name) for _year in years]
if _dates :
_df[name] = _dates
# years = _df[iname]
# _dates = [self.make_date(year=_year,field=name) for _year in years]
# if _dates :
# _df[name] = _dates
_schema = self.get_schema()
# _schema = [{'name':_item.name,'type':_item.field_type} for _item in _schema]
_df = self.format(_df,_schema)
_log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ]
self.log(**{"action":"consolidate","input":_log})
# w = transport.factory.instance(doc='observation',provider='mongodb',context='write',db='IOV01_LOGS',auth_file='/home/steve/dev/transport/mongo.json')
# w.write(_df)
# cols = [name for name in _df.columns if name.endswith('datetime')]
# print (_df[cols])
if _store :
writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery':
@ -507,8 +570,10 @@ class factory :
:param info {columns,sql,from}
:param autopilot will generate output automatically
:param batch (default 2k) size of the batch
"""
if _args['apply'] in [apply.RANDOM] :
pthread = Shuffle(**_args)
elif _args['apply'] == apply.GENERATE :