Compare commits

..

No commits in common. "master" and "dev" have entirely different histories.
master ... dev

7 changed files with 123 additions and 230 deletions

View File

@ -103,12 +103,11 @@ class GNet :
CHECKPOINT_SKIPS = int(args['checkpoint_skips']) if 'checkpoint_skips' in args else int(self.MAX_EPOCHS/10) CHECKPOINT_SKIPS = int(args['checkpoint_skips']) if 'checkpoint_skips' in args else int(self.MAX_EPOCHS/10)
CHECKPOINT_SKIPS = 1 if CHECKPOINT_SKIPS < 1 else CHECKPOINT_SKIPS CHECKPOINT_SKIPS = 1 if CHECKPOINT_SKIPS < 1 else CHECKPOINT_SKIPS
# if self.MAX_EPOCHS < 2*CHECKPOINT_SKIPS : # if self.MAX_EPOCHS < 2*CHECKPOINT_SKIPS :
# CHECKPOINT_SKIPS = 2 # CHECKPOINT_SKIPS = 2
# self.CHECKPOINTS = [1,self.MAX_EPOCHS] + np.repeat( np.divide(self.MAX_EPOCHS,CHECKPOINT_SKIPS),CHECKPOINT_SKIPS ).cumsum().astype(int).tolist() # self.CHECKPOINTS = [1,self.MAX_EPOCHS] + np.repeat( np.divide(self.MAX_EPOCHS,CHECKPOINT_SKIPS),CHECKPOINT_SKIPS ).cumsum().astype(int).tolist()
self.CHECKPOINTS = np.repeat(CHECKPOINT_SKIPS, self.MAX_EPOCHS/ CHECKPOINT_SKIPS).cumsum().astype(int).tolist() self.CHECKPOINTS = np.repeat(CHECKPOINT_SKIPS, self.MAX_EPOCHS/ CHECKPOINT_SKIPS).cumsum().astype(int).tolist()
self.ROW_COUNT = args['real'].shape[0] if 'real' in args else 100 self.ROW_COUNT = args['real'].shape[0] if 'real' in args else 100
self.CONTEXT = args['context'] self.CONTEXT = args['context']
self.ATTRIBUTES = {"id":args['column_id'] if 'column_id' in args else None,"synthetic":args['column'] if 'column' in args else None} self.ATTRIBUTES = {"id":args['column_id'] if 'column_id' in args else None,"synthetic":args['column'] if 'column' in args else None}
@ -288,17 +287,8 @@ class Generator (GNet):
""" """
def __init__(self,**args): def __init__(self,**args):
if 'trainer' not in args : GNet.__init__(self,**args)
GNet.__init__(self,**args) self.discriminator = Discriminator(**args)
self.discriminator = Discriminator(**args)
else:
_args = {}
_trainer = args['trainer']
for key in vars(_trainer) :
value = getattr(_trainer,key)
setattr(self,key,value)
_args[key] = value
self.discriminator = Discriminator(**_args)
def loss(self,**args): def loss(self,**args):
fake = args['fake'] fake = args['fake']
label = args['label'] label = args['label']
@ -667,9 +657,7 @@ class Predict(GNet):
fake = self.generator.network(inputs=z, label=label) fake = self.generator.network(inputs=z, label=label)
init = tf.compat.v1.global_variables_initializer() init = tf.compat.v1.global_variables_initializer()
print ([self.CHECKPOINTS]) saver = tf.compat.v1.train.Saver()
# saver = tf.compat.v1.train.Saver()
saver = tf.compat.v1.train.Saver(max_to_keep=len(self.CHECKPOINTS))
df = pd.DataFrame() df = pd.DataFrame()
CANDIDATE_COUNT = args['candidates'] if 'candidates' in args else 1 #0 if self.ROW_COUNT < 1000 else 100 CANDIDATE_COUNT = args['candidates'] if 'candidates' in args else 1 #0 if self.ROW_COUNT < 1000 else 100
candidates = [] candidates = []

View File

@ -22,7 +22,7 @@ import nujson as json
from multiprocessing import Process, RLock from multiprocessing import Process, RLock
from datetime import datetime, timedelta from datetime import datetime, timedelta
from multiprocessing import Queue from multiprocessing import Queue
from data.maker.version import __version__
import time import time
@ -33,7 +33,6 @@ class Learner(Process):
super(Learner, self).__init__() super(Learner, self).__init__()
self._arch = {'init':_args}
self.ndx = 0 self.ndx = 0
self._queue = Queue() self._queue = Queue()
self.lock = RLock() self.lock = RLock()
@ -45,8 +44,6 @@ class Learner(Process):
self.gpu = None self.gpu = None
self.info = _args['info'] self.info = _args['info']
if 'context' not in self.info :
self.info['context'] = self.info['from']
self.columns = self.info['columns'] if 'columns' in self.info else None self.columns = self.info['columns'] if 'columns' in self.info else None
self.store = _args['store'] self.store = _args['store']
@ -100,12 +97,9 @@ class Learner(Process):
# __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records') # __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records')
if self._states : if self._states :
__info = {} __info = {}
# print (self._states)
for key in self._states : for key in self._states :
_pipeline = self._states[key] __info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key]]
# __info[key] = ([{'name':_payload['name']} for _payload in _pipeline])
__info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key] if _item ]
self.log(object='state-space',action='load',input=__info) self.log(object='state-space',action='load',input=__info)
@ -179,23 +173,19 @@ class Learner(Process):
for name in columns : for name in columns :
# #
# randomly sampling 5 elements to make sense of data-types # randomly sampling 5 elements to make sense of data-types
if self._df[name].size < 5 : if self._df[name].size < 5 :
continue continue
_index = np.random.choice(np.arange(self._df[name].shape[0]),5,False) _index = np.random.choice(np.arange(self._df[name].size),5,False)
no_value = [type(value) in [int,float,np.int64,np.int32,np.float32,np.float64] for value in self._df[name].values[_index] if value is not None] no_value = [type(value) in [int,float,np.int64,np.int32,np.float32,np.float64] for value in self._df[name].values[_index]]
no_value = 0 if np.sum(no_value) > 0 else '' no_value = 0 if np.sum(no_value) > 0 else ''
try: try:
self._df[name] = self._df[name].fillna(no_value) self._df[name] = self._df[name].fillna(no_value)
except Exception as e:
print (['.... skipping ',name,no_value])
finally: finally:
pass pass
# _log[name] = self._df[name].dtypes.name _log[name] = self._df[name].dtypes.name
# _log[name] = reader.meta() _log = {'action':'structure','input':_log}
# _log = {'action':'structure','input':_log} self.log(**_log)
# self.log(**_log)
# #
# convert the data to binary here ... # convert the data to binary here ...
_schema = self.get_schema() _schema = self.get_schema()
@ -280,23 +270,18 @@ class Trainer(Learner):
# #
_epochs = [_e for _e in gTrain.logs['epochs'] if _e['path'] != ''] _epochs = [_e for _e in gTrain.logs['epochs'] if _e['path'] != '']
_epochs.sort(key=lambda _item: _item['loss'],reverse=False) _epochs.sort(key=lambda _item: _item['loss'],reverse=False)
_args['network_args']['max_epochs'] = _epochs[0]['epochs'] _args['network_args']['max_epochs'] = _epochs[0]['epochs']
self.log(action='autopilot',input={'epoch':_epochs[0]}) self.log(action='autopilot',input={'epoch':_epochs[0]})
g = Generator(**_args)
# g.run() # g.run()
end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S') end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S')
_min = float((end-beg).seconds/ 60) _min = float((end-beg).seconds/ 60)
_logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}} _logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}}
self.log(**_logs) self.log(**_logs)
self._g = g
if self.autopilot : if self.autopilot :
# g = Generator(**_args)
g = Generator(**self._arch['init'])
self._g = g
self._g.run() self._g.run()
# #
#@TODO Find a way to have the data in the object .... #@TODO Find a way to have the data in the object ....
@ -315,15 +300,10 @@ class Generator (Learner):
# #
# We need to load the mapping information for the space we are working with ... # We need to load the mapping information for the space we are working with ...
# #
self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1 self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1
# filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json']) filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json'])
_suffix = self.network_args['context']
filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'meta-',_suffix+'.json'])
self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}}) self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}})
if os.path.exists(filename): if os.path.exists(filename):
file = open(filename) file = open(filename)
self._map = json.loads(file.read()) self._map = json.loads(file.read())
file.close() file.close()
@ -432,51 +412,66 @@ class Generator (Learner):
return _date.strftime(FORMAT) return _date.strftime(FORMAT)
pass pass
def _format(self,_df,_schema): def format(self,_df,_schema):
""" r = {}
:_df data-frame being processed
:_schema table schema with types
"""
_columns = [_item['name'] for _item in _schema if _item['name'] in _df.columns]
_map = {'INT64':np.int64,'FLOAT64':np.float64,'DATE':np.datetime64,'TIMESTAMP':(lambda field: pd.to_datetime(field).dt.tz_localize(None))}
# pd.to_datetime(_df.measurement_datetime).dt.tz_localize(None)
for _item in _schema : for _item in _schema :
_name = _item['name'] name = _item['name']
if _item['type'] not in _map :
continue if _item['type'].upper() in ['DATE','DATETIME','TIMESTAMP'] :
_pointer = _map[_item['type']] FORMAT = '%Y-%m-%d'
try:
if type(_pointer).__name__ == 'type':
if _item['type'] in ['INT64','FLOAT64'] :
novalue = np.int64(0) if _item['type'] == 'INT64' else np.float64(0)
elif _item['type'] == 'STRING' :
novalue = ''
if _item['type'] in ['INT64','FLOAT64','STRING'] :
_df[_name] = _df[_name].fillna(novalue)
#
# This is not guaranteed to work but may help ...
_df[_name] = _df[_name].values.astype(_pointer)
else:
_df[_name] = _pointer(_df[_name])
pass
except Exception as e:
pass try:
# bqw = transport.factory.instance(**_store['target']) #
# bqw.write(_df,schema=_schema) #-- Sometimes data isn't all it's meant to be
SIZE = -1
if 'format' in self.info and name in self.info['format'] :
FORMAT = self.info['format'][name]
SIZE = 10
elif _item['type'] in ['DATETIME','TIMESTAMP'] :
FORMAT = '%Y-%m-%-d %H:%M:%S'
SIZE = 19
if SIZE > 0 :
values = pd.to_datetime(_df[name], format=FORMAT).astype(np.datetime64)
# _df[name] = [_date[:SIZE].strip() for _date in values]
# _df[name] = _df[name].astype(str)
r[name] = FORMAT
# _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]')
if _item['type'] in ['DATETIME','TIMESTAMP']:
pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]')
except Exception as e:
pass
finally:
pass
else:
#
# Because types are inferred on the basis of the sample being processed they can sometimes be wrong
# To help disambiguate we add the schema information
_type = None
if 'int' in _df[name].dtypes.name or 'int' in _item['type'].lower():
_type = np.int
elif 'float' in _df[name].dtypes.name or 'float' in _item['type'].lower():
_type = np.float
if _type :
_df[name] = _df[name].fillna(0).replace(' ',0).replace('',0).replace('NA',0).replace('nan',0).astype(_type)
# else:
# _df[name] = _df[name].astype(str)
# _df = _df.replace('NaT','').replace('NA','')
return _df #[_columns] if r :
self.log(**{'action':'format','input':r})
return _df
pass
def post(self,_candidates): def post(self,_candidates):
if 'target' in self.store : if 'target' in self.store :
@ -488,67 +483,42 @@ class Generator (Learner):
else: else:
_store = None _store = None
N = 0 N = 0
_haslist = np.sum([type(_item)==list for _item in self.columns]) > 0
_schema = self.get_schema()
#
# If the schema doesn't match the data we need to skip it
# This happens when the data comes from a query, the post processing needs to handle this
#
# _names = [_field['name'] for _field in _schema]
# _columns = _candidates[0].columns.tolist()
# _common = list( set(_columns) & set(_names) )
# if not (len(_common) == len(_columns) and len(_names) == len(_common)) :
# _schema = None
for _iodf in _candidates : for _iodf in _candidates :
_df = self._df.copy() _df = self._df.copy()
_df[self.columns] = _iodf[self.columns]
if self.columns and _haslist is False:
_df[self.columns] = _iodf[self.columns]
else:
#
# In here we have the case of all attributes have undergone random permutations
#
_df = _iodf
N += _df.shape[0] N += _df.shape[0]
if self._states and 'post' in self._states: if self._states and 'post' in self._states:
_df = State.apply(_df,self._states['post']) _df = State.apply(_df,self._states['post'])
# #
# # #@TODO:
# Let us format the data frame so as to be able to minimize write errors # # Improve formatting with better post-processing pipeline
# # if 'approximate' in self.info :
if _schema : # _df = self.approximate(_df)
_df = self._format(_df,_schema) # if 'make_date' in self.info :
# for name in self.info['make_date'] :
# # iname = self.info['make_date']['init_field']
# iname = self.info['make_date'][name]
# years = _df[iname]
# _dates = [self.make_date(year=_year,field=name) for _year in years]
# if _dates :
# _df[name] = _dates
_schema = self.get_schema()
_df = self.format(_df,_schema)
# _df = self.format(_df,_schema) _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ]
# _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ] self.log(**{"action":"consolidate","input":_log})
self.log(**{"action":"consolidate","input":{"rows":N,"candidate":_candidates.index(_iodf)}})
if _store : if _store :
_log = {'action':'write','input':{'table':self.info['from'],'schema':[],'rows':_df.shape[0]}} writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery':
writer = transport.factory.instance(**_store) writer.write(_df,schema=[],table=self.info['from'])
if _store['provider'] == 'bigquery' and _schema:
try:
_log['schema'] = _schema
writer.write(_df,schema=_schema)
except Exception as e:
print (e)
writer.write(_df)
else: else:
writer.write(_df) writer.write(_df,table=self.info['from'])
self.log(**_log)
else: else:
self.cache.append(_df) self.cache.append(_df)
@ -563,63 +533,30 @@ class Shuffle(Generator):
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
if 'data' not in _args :
reader = transport.factory.instance(**self.store['source'])
self._df = reader.read(sql=self.info['sql'])
def run(self): def run(self):
np.random.seed(1)
self.initalize() self.initalize()
#
# If we are given lists of columns instead of a list-of-list
# unpack the list
_invColumns = []
_colNames = []
_ucolNames= []
_rmColumns = []
for _item in self.info['columns'] :
if type(_item) == list :
_invColumns.append(_item)
_rmColumns += _item
elif _item in self._df.columns.tolist():
_colNames.append(_item)
#
# At this point we build the matrix of elements we are interested in considering the any unspecified column
#
if _colNames :
_invColumns.append(_colNames)
_ucolNames = list(set(self._df.columns) - set(_colNames) - set(_rmColumns))
if _ucolNames :
_invColumns += [ [_name] for _name in _ucolNames]
_xdf = pd.DataFrame()
_xdf = pd.DataFrame()
_index = np.arange(self._df.shape[0]) _index = np.arange(self._df.shape[0])
np.random.shuffle(_index)
np.random.shuffle(_index)
_iocolumns = self.info['columns']
_ocolumns = list(set(self._df.columns) - set(_iocolumns) )
# _iodf = pd.DataFrame(self._df[_ocolumns],self._df.loc[_index][_iocolumns],index=np.arange(_index.size))
_iodf = pd.DataFrame(self._df[_iocolumns].copy(),index = np.arange(_index.size))
# self._df = self._df.loc[_index][_ocolumns].join(_iodf)
self._df = self._df.loc[_index][_ocolumns]
self._df.index = np.arange(self._df.shape[0])
self._df = self._df.join(_iodf)
#
# The following is a full shuffle
self._df = self._df.loc[_index]
self._df.index = np.arange(self._df.shape[0])
for _columns in _invColumns :
_tmpdf = self._df[_columns].copy()[_columns]
np.random.seed(1)
np.random.shuffle(_index)
# _values = _tmpdf.values[_index]
#_tmpdf = _tmpdf.iloc[_index]
_tmpdf = pd.DataFrame(_tmpdf.values[_index],columns=_columns)
if _xdf.shape[0] == 0 :
_xdf = _tmpdf
else:
_xdf = _xdf.join(_tmpdf)
_xdf = _xdf[self._df.columns]
self._df = _xdf
_log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}} _log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}}
self.log(**_log) self.log(**_log)
try: try:
self.post([self._df]) self.post([self._df])
self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}}) self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}})
except Exception as e : except Exception as e :
@ -643,7 +580,6 @@ class factory :
""" """
#
if _args['apply'] in [apply.RANDOM] : if _args['apply'] in [apply.RANDOM] :
pthread = Shuffle(**_args) pthread = Shuffle(**_args)

View File

@ -69,7 +69,7 @@ class Date(Post):
""" """
""" """
pass pass
class Approximate(Post): class Approximate(Post):
def apply(**_args): def apply(**_args):
pass pass

View File

@ -20,7 +20,7 @@ import os
class State : class State :
@staticmethod @staticmethod
def apply(_data,lpointers,_config={}): def apply(_data,lpointers):
""" """
This function applies a pipeline against a given data-frame, the calling code must decide whether it is a pre/post This function applies a pipeline against a given data-frame, the calling code must decide whether it is a pre/post
:_data data-frame :_data data-frame
@ -31,22 +31,12 @@ class State :
continue continue
pointer = _item['module'] pointer = _item['module']
_args = _item['args']
if type(pointer).__name__ != 'function':
_args = _item['args'] if 'args' in _item else {}
else:
pointer = _item['module']
_args = _item['args'] if 'args' in _item else {}
_data = pointer(_data,_args) _data = pointer(_data,_args)
return _data return _data
@staticmethod @staticmethod
def instance(_args): def instance(_args):
"""
"""
pre = [] pre = []
post=[] post=[]
@ -55,20 +45,8 @@ class State :
# #
# If the item has a path property is should be ignored # If the item has a path property is should be ignored
path = _args[key]['path'] if 'path' in _args[key] else '' path = _args[key]['path'] if 'path' in _args[key] else ''
# out[key] = [ State._build(dict(_item,**{'path':path})) if 'path' not in _item else State._build(_item) for _item in _args[key]['pipeline']] out[key] = [ State._build(dict(_item,**{'path':path})) if 'path' not in _item else State._build(_item) for _item in _args[key]['pipeline']]
out[key] = []
for _item in _args[key]['pipeline'] :
if type(_item).__name__ == 'function':
_stageInfo = {'module':_item,'name':_item.__name__,'args':{},'path':''}
pass
else:
if 'path' in _item :
_stageInfo = State._build(dict(_item,**{'path':path}))
else :
_stageInfo= State._build(_item)
out[key].append(_stageInfo)
# print ([out])
return out return out
# if 'pre' in _args: # if 'pre' in _args:
# path = _args['pre']['path'] if 'path' in _args['pre'] else '' # path = _args['pre']['path'] if 'path' in _args['pre'] else ''
@ -90,18 +68,11 @@ class State :
pass pass
@staticmethod @staticmethod
def _build(_args): def _build(_args):
"""
This function builds the object {module,path} where module is extracted from a file (if needed)
:param _args dictionary containing attributes that can be value pair
It can also be a function
"""
#
# In the advent an actual pointer is passed we should do the following
_info = State._extract(_args) _info = State._extract(_args)
# _info = dict(_args,**_info) # _info = dict(_args,**_info)
_info['module'] = State._instance(_info) _info['module'] = State._instance(_info)
return _info if _info['module'] is not None else None return _info if _info['module'] is not None else None
@staticmethod @staticmethod

View File

@ -1 +0,0 @@
__version__='1.7.6'

View File

@ -1,10 +1,10 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
import os import os
import sys import sys
import version
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {"name":"data-maker","version":version.__version__, args = {"name":"data-maker","version":"1.6.4",
"author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vumc.org","license":"MIT", "author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vumc.org","license":"MIT",
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
args["install_requires"] = ['data-transport@git+https://github.com/lnyemba/data-transport.git','tensorflow'] args["install_requires"] = ['data-transport@git+https://github.com/lnyemba/data-transport.git','tensorflow']

View File

@ -1 +0,0 @@
data/maker/version.py