2020-01-01 05:27:53 +00:00
"""
( c ) 2019 Data Maker , hiplab . mc . vanderbilt . edu
version 1.0 .0
This package serves as a proxy to the overall usage of the framework .
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO :
- Make configurable GPU , EPOCHS
"""
import pandas as pd
import numpy as np
2020-01-05 05:02:15 +00:00
import data . gan as gan
2022-04-11 23:33:07 +00:00
import transport
2022-04-12 04:27:25 +00:00
# from data.bridge import Binary
2022-08-09 17:22:07 +00:00
import threading
2021-03-29 16:10:57 +00:00
from data . maker import prepare
import copy
import os
2022-08-09 17:22:07 +00:00
import nujson as json
2022-04-11 23:33:07 +00:00
from multiprocessing import Process , RLock
2022-04-12 04:27:25 +00:00
from datetime import datetime , timedelta
2022-08-09 17:22:07 +00:00
from multiprocessing import Queue
import time
2021-03-29 16:10:57 +00:00
2022-04-11 23:33:07 +00:00
class Learner ( Process ) :
2022-04-13 16:11:23 +00:00
2022-04-11 23:33:07 +00:00
def __init__ ( self , * * _args ) :
super ( Learner , self ) . __init__ ( )
2022-04-13 16:11:23 +00:00
self . ndx = 0
2022-08-09 17:22:07 +00:00
self . _queue = Queue ( )
2022-04-14 16:06:28 +00:00
self . lock = RLock ( )
2022-04-11 23:33:07 +00:00
if ' gpu ' in _args :
2022-04-13 14:36:21 +00:00
2022-04-11 23:33:07 +00:00
os . environ [ ' CUDA_VISIBLE_DEVICES ' ] = str ( _args [ ' gpu ' ] )
self . gpu = int ( _args [ ' gpu ' ] )
else :
self . gpu = None
2022-04-12 18:16:48 +00:00
2022-04-11 23:33:07 +00:00
self . info = _args [ ' info ' ]
self . columns = self . info [ ' columns ' ] if ' columns ' in self . info else None
self . store = _args [ ' store ' ]
2022-04-13 15:07:27 +00:00
2022-04-11 23:33:07 +00:00
if ' network_args ' not in _args :
self . network_args = {
2022-04-12 19:00:03 +00:00
' context ' : self . info [ ' context ' ] ,
2022-04-11 23:33:07 +00:00
' logs ' : _args [ ' logpath ' ] if ' logpath ' in _args else ' logs ' ,
' max_epochs ' : int ( _args [ ' epochs ' ] ) if ' epochs ' in _args else 2 ,
' batch_size ' : int ( _args [ ' batch ' ] ) if ' batch ' in _args else 2000
}
else :
self . network_args = _args [ ' network_args ' ]
self . _encoder = None
self . _map = None
self . _df = _args [ ' data ' ] if ' data ' in _args else None
2022-05-17 18:24:24 +00:00
2022-04-13 16:45:39 +00:00
self . name = self . __class__ . __name__
2022-04-11 23:33:07 +00:00
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
2022-04-13 15:07:27 +00:00
2022-06-17 04:56:16 +00:00
_log = { ' action ' : ' init ' , ' gpu ' : ( self . gpu if self . gpu is not None else - 1 ) }
2022-04-13 15:07:27 +00:00
self . log ( * * _log )
2022-08-09 17:22:07 +00:00
self . cache = [ ]
2022-04-11 23:33:07 +00:00
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
2022-04-13 15:07:27 +00:00
def log ( self , * * _args ) :
2022-08-09 17:22:07 +00:00
2022-04-14 16:06:28 +00:00
try :
2022-08-09 17:22:07 +00:00
# _context = self.info['context']
# _label = self.info['info'] if 'info' in self.info else _context
# logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider=transport.providers.CONSOLE,context='write',lock=True)
# _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args})
# logger.write(_args)
# self.ndx += 1
# if hasattr(logger,'close') :
# logger.close()
pass
2022-04-14 16:06:28 +00:00
except Exception as e :
print ( )
print ( _args )
print ( e )
pass
finally :
2022-08-09 17:22:07 +00:00
2022-04-14 18:37:47 +00:00
pass
2022-04-11 23:33:07 +00:00
def get_schema ( self ) :
2022-08-09 17:22:07 +00:00
# if self.store['source']['provider'] != 'bigquery' :
# return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
# else:
# reader = transport.factory.instance(**self.store['source'])
# return reader.meta(table=self.info['from'])
reader = transport . factory . instance ( * * self . store [ ' source ' ] )
return reader . meta ( table = self . info [ ' from ' ] )
2022-04-11 23:33:07 +00:00
def initalize ( self ) :
reader = transport . factory . instance ( * * self . store [ ' source ' ] )
_read_args = self . info
if self . _df is None :
self . _df = reader . read ( * * _read_args )
columns = self . columns if self . columns else self . _df . columns
2022-05-17 18:24:24 +00:00
#
# Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases
# - The code below tries to address the issue (Perhaps better suited for the reading components)
2022-05-17 23:04:05 +00:00
_log = { }
2022-05-17 18:24:24 +00:00
for name in columns :
2022-06-10 19:52:55 +00:00
#
# randomly sampling 5 elements to make sense of data-types
if self . _df [ name ] . size < 5 :
continue
2022-05-17 18:24:24 +00:00
_index = np . random . choice ( np . arange ( self . _df [ name ] . size ) , 5 , False )
2022-05-17 18:27:13 +00:00
no_value = [ type ( value ) in [ int , float , np . int64 , np . int32 , np . float32 , np . float64 ] for value in self . _df [ name ] . values [ _index ] ]
2022-05-17 18:24:24 +00:00
no_value = 0 if np . sum ( no_value ) > 0 else ' '
2022-06-14 17:24:56 +00:00
try :
self . _df [ name ] = self . _df [ name ] . fillna ( no_value )
finally :
pass
2022-05-17 23:04:05 +00:00
_log [ name ] = self . _df [ name ] . dtypes . name
_log = { ' action ' : ' structure ' , ' input ' : _log }
self . log ( * * _log )
2022-04-11 23:33:07 +00:00
#
# convert the data to binary here ...
2022-05-17 18:24:24 +00:00
_schema = self . get_schema ( )
_args = { " schema " : _schema , " data " : self . _df , " columns " : columns }
2022-04-11 23:33:07 +00:00
if self . _map :
_args [ ' map ' ] = self . _map
2022-04-14 16:06:28 +00:00
self . _encoder = prepare . Input ( * * _args ) if self . _df . shape [ 0 ] > 0 else None
_log = { ' action ' : ' data-prep ' , ' input ' : { ' rows ' : int ( self . _df . shape [ 0 ] ) , ' cols ' : int ( self . _df . shape [ 1 ] ) } }
2022-04-13 15:07:27 +00:00
self . log ( * * _log )
2022-08-09 17:22:07 +00:00
def get ( self ) :
if self . cache :
return self . cache if len ( self . cache ) > 0 else ( self . cache if not self . cache else self . cache [ 0 ] )
else :
return self . _queue . get ( ) if self . _queue . qsize ( ) > 0 else [ ]
def listen ( self ) :
while True :
_info = self . _queue . get ( )
self . cache . append ( _info )
self . _queue . task_done ( )
def publish ( self , caller ) :
if hasattr ( caller , ' _queue ' ) :
_queue = caller . _queue
_queue . put ( self . cache )
# _queue.join()
pass
2022-04-11 23:33:07 +00:00
class Trainer ( Learner ) :
"""
This will perform training using a GAN
"""
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
# self.info = _args['info']
self . limit = int ( _args [ ' limit ' ] ) if ' limit ' in _args else None
2022-04-13 14:36:21 +00:00
2022-04-11 23:33:07 +00:00
self . autopilot = _args [ ' autopilot ' ] if ' autopilot ' in _args else False
self . generate = None
self . candidates = int ( _args [ ' candidates ' ] ) if ' candidates ' in _args else 1
2022-04-13 14:36:21 +00:00
2022-04-11 23:33:07 +00:00
def run ( self ) :
self . initalize ( )
2022-04-12 19:59:46 +00:00
if self . _encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
2022-04-11 23:33:07 +00:00
_space , _matrix = self . _encoder . convert ( )
_args = self . network_args
if self . gpu :
_args [ ' gpu ' ] = self . gpu
_args [ ' real ' ] = _matrix
_args [ ' candidates ' ] = self . candidates
#
# At this point we have the binary matrix, we can initiate training
#
2022-04-14 15:06:27 +00:00
beg = datetime . now ( ) #.strftime('%Y-%m-%d %H:%M:%S')
2022-04-11 23:33:07 +00:00
gTrain = gan . Train ( * * _args )
gTrain . apply ( )
2022-08-09 17:22:07 +00:00
writer = transport . factory . instance ( provider = transport . providers . FILE , context = ' write ' , path = os . sep . join ( [ gTrain . out_dir , ' map.json ' ] ) )
2022-04-11 23:33:07 +00:00
writer . write ( self . _encoder . _map , overwrite = True )
writer . close ( )
#
# @TODO: At this point we need to generate another some other objects
#
_args = { " network_args " : self . network_args , " store " : self . store , " info " : self . info , " candidates " : self . candidates , " data " : self . _df }
if self . gpu :
_args [ ' gpu ' ] = self . gpu
g = Generator ( * * _args )
# g.run()
2022-04-13 15:07:27 +00:00
2022-04-14 15:06:27 +00:00
end = datetime . now ( ) #.strftime('%Y-%m-%d %H:%M:%S')
2022-04-14 16:06:28 +00:00
_min = float ( ( end - beg ) . seconds / 60 )
2022-04-14 15:06:27 +00:00
_logs = { ' action ' : ' train ' , ' input ' : { ' start ' : beg . strftime ( ' % Y- % m- %d % H: % M: % S ' ) , ' minutes ' : _min , " unique_counts " : self . _encoder . _io [ 0 ] } }
2022-04-13 15:07:27 +00:00
self . log ( * * _logs )
2022-08-09 17:22:07 +00:00
self . _g = g
if self . autopilot :
self . _g . run ( )
#
#@TODO Find a way to have the data in the object ....
2022-04-11 23:33:07 +00:00
def generate ( self ) :
if self . autopilot :
print ( " Autopilot is set ... No need to call this function " )
else :
raise Exception ( " Autopilot has not been, Wait till training is finished. Use is_alive function on process object " )
class Generator ( Learner ) :
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
#
# We need to load the mapping information for the space we are working with ...
#
self . network_args [ ' candidates ' ] = int ( _args [ ' candidates ' ] ) if ' candidates ' in _args else 1
filename = os . sep . join ( [ self . network_args [ ' logs ' ] , ' output ' , self . network_args [ ' context ' ] , ' map.json ' ] )
2022-04-14 15:06:27 +00:00
self . log ( * * { ' action ' : ' init-map ' , ' input ' : { ' filename ' : filename , ' exists ' : os . path . exists ( filename ) } } )
2022-04-14 17:03:59 +00:00
if os . path . exists ( filename ) :
file = open ( filename )
self . _map = json . loads ( file . read ( ) )
file . close ( )
else :
self . _map = { }
2022-04-11 23:33:07 +00:00
def run ( self ) :
self . initalize ( )
2022-04-12 19:59:46 +00:00
if self . _encoder is None :
#
# @TODO Log that the dataset was empty or not statistically relevant
return
2022-04-11 23:33:07 +00:00
#
# The values will be returned because we have provided _map information from the constructor
#
2022-04-12 19:59:46 +00:00
2022-04-11 23:33:07 +00:00
values , _matrix = self . _encoder . convert ( )
_args = self . network_args
_args [ ' map ' ] = self . _map
_args [ ' values ' ] = np . array ( values )
_args [ ' row_count ' ] = self . _df . shape [ 0 ]
2022-04-12 19:50:19 +00:00
if self . gpu :
_args [ ' gpu ' ] = self . gpu
2022-04-11 23:33:07 +00:00
gHandler = gan . Predict ( * * _args )
gHandler . load_meta ( columns = None )
2022-04-13 14:36:21 +00:00
_iomatrix = gHandler . apply ( )
2022-04-11 23:33:07 +00:00
_candidates = [ self . _encoder . revert ( matrix = _item ) for _item in _iomatrix ]
2022-04-13 15:07:27 +00:00
_size = np . sum ( [ len ( _item ) for _item in _iomatrix ] )
2022-04-13 16:11:23 +00:00
_log = { ' action ' : ' io-data ' , ' input ' : { ' candidates ' : len ( _candidates ) , ' rows ' : int ( _size ) } }
2022-04-13 15:07:27 +00:00
self . log ( * * _log )
2022-08-09 17:22:07 +00:00
# self.cache = _candidates
2022-04-11 23:33:07 +00:00
self . post ( _candidates )
2022-04-12 04:27:25 +00:00
def approximate ( self , _df ) :
2022-04-11 23:33:07 +00:00
_columns = self . info [ ' approximate ' ]
2022-04-13 14:36:21 +00:00
2022-04-11 23:33:07 +00:00
for name in _columns :
2022-04-13 14:36:21 +00:00
if _df [ name ] . size > 100 :
BATCH_SIZE = 10
else :
BATCH_SIZE = 1
batches = np . array_split ( _df [ name ] . fillna ( np . nan ) . values , BATCH_SIZE )
2022-04-12 04:27:25 +00:00
_type = np . int64 if ' int ' in self . info [ ' approximate ' ] [ name ] else np . float64
2022-04-11 23:33:07 +00:00
x = [ ]
2022-04-13 16:11:23 +00:00
_log = { ' action ' : ' approximate ' , ' input ' : { ' batch ' : BATCH_SIZE , ' col ' : name } }
2022-04-11 23:33:07 +00:00
for values in batches :
2022-04-13 14:36:21 +00:00
index = [ _x not in [ ' ' , None , np . nan ] for _x in values ]
2022-04-21 15:14:00 +00:00
2022-04-21 16:07:56 +00:00
if np . sum ( index ) == 0 :
2022-04-21 15:14:00 +00:00
#
# Sometimes messy data has unpleasant surprises
continue
2022-04-21 16:40:41 +00:00
_values = np . random . rand ( len ( values [ index ] ) )
_values + = np . std ( values [ index ] ) / 4
2022-04-21 15:14:00 +00:00
2022-04-12 04:27:25 +00:00
values [ index ] = list ( values [ index ] + _values ) if np . random . randint ( 0 , 2 ) else list ( values [ index ] - _values )
values [ index ] = values [ index ] . astype ( _type )
x + = values . tolist ( )
2022-04-21 16:40:41 +00:00
2022-04-13 14:36:21 +00:00
if x :
2022-04-21 16:40:41 +00:00
_log [ ' input ' ] [ ' identical_percentage ' ] = 100 * ( np . divide ( ( _df [ name ] . dropna ( ) == x ) . sum ( ) , _df [ name ] . dropna ( ) . size ) )
2022-04-21 17:17:32 +00:00
2022-04-21 16:40:41 +00:00
_df [ name ] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
2022-04-13 15:07:27 +00:00
self . log ( * * _log )
2022-04-11 23:33:07 +00:00
return _df
2022-04-12 04:27:25 +00:00
def make_date ( self , * * _args ) :
"""
: param year initial value
"""
if _args [ ' year ' ] in [ ' ' , None , np . nan ] :
return None
year = int ( _args [ ' year ' ] )
2022-04-14 23:07:17 +00:00
2022-04-12 04:27:25 +00:00
offset = _args [ ' offset ' ] if ' offset ' in _args else 0
month = np . random . randint ( 1 , 13 )
if month == 2 :
_end = 28 if year % 4 != 0 else 29
else :
_end = 31 if month in [ 1 , 3 , 5 , 7 , 8 , 10 , 12 ] else 30
day = np . random . randint ( 1 , _end )
#-- synthetic date
2022-04-14 23:07:17 +00:00
_date = datetime ( year = year , month = month , day = day , minute = 0 , hour = 0 , second = 0 )
FORMAT = ' % Y- % m- %d '
2022-04-14 18:37:47 +00:00
_name = _args [ ' field ' ] if ' field ' in _args else None
if ' format ' in self . info and _name in self . info [ ' format ' ] :
2022-04-14 23:07:17 +00:00
# _name = _args['field']
2022-04-12 19:00:03 +00:00
FORMAT = self . info [ ' format ' ] [ _name ]
2022-04-14 23:07:17 +00:00
2022-04-12 19:00:03 +00:00
2022-04-14 18:37:47 +00:00
# print ([_name,FORMAT, _date.strftime(FORMAT)])
2022-04-12 04:27:25 +00:00
r = [ ]
if offset :
r = [ _date . strftime ( FORMAT ) ]
for _delta in offset :
_date = _date + timedelta ( _delta )
2022-04-14 23:07:17 +00:00
r . append ( _date . strptime ( FORMAT ) )
2022-04-12 04:27:25 +00:00
return r
else :
return _date . strftime ( FORMAT )
pass
2022-04-13 15:55:55 +00:00
def format ( self , _df , _schema ) :
2022-04-14 17:42:11 +00:00
r = { }
2022-05-16 16:11:33 +00:00
2022-04-13 15:55:55 +00:00
for _item in _schema :
name = _item [ ' name ' ]
2022-04-14 17:36:20 +00:00
if _item [ ' type ' ] . upper ( ) in [ ' DATE ' , ' DATETIME ' , ' TIMESTAMP ' ] :
2022-05-18 00:10:33 +00:00
FORMAT = ' % Y- % m- %d '
2022-05-17 08:05:44 +00:00
2022-05-18 00:10:33 +00:00
try :
#
#-- Sometimes data isn't all it's meant to be
SIZE = - 1
if ' format ' in self . info and name in self . info [ ' format ' ] :
FORMAT = self . info [ ' format ' ] [ name ]
SIZE = 10
elif _item [ ' type ' ] in [ ' DATETIME ' , ' TIMESTAMP ' ] :
FORMAT = ' % Y- % m- %-d % H: % M: % S '
SIZE = 19
2022-05-16 16:11:33 +00:00
2022-05-18 00:10:33 +00:00
if SIZE > 0 :
2022-05-17 23:04:05 +00:00
2022-05-18 00:10:33 +00:00
values = pd . to_datetime ( _df [ name ] , format = FORMAT ) . astype ( np . datetime64 )
# _df[name] = [_date[:SIZE].strip() for _date in values]
2022-05-17 08:05:44 +00:00
2022-05-18 00:10:33 +00:00
# _df[name] = _df[name].astype(str)
r [ name ] = FORMAT
# _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]')
if _item [ ' type ' ] in [ ' DATETIME ' , ' TIMESTAMP ' ] :
pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]')
2022-05-17 08:05:44 +00:00
2022-05-18 00:10:33 +00:00
except Exception as e :
pass
finally :
pass
2022-05-16 16:11:33 +00:00
else :
2022-05-17 08:05:44 +00:00
#
# Because types are inferred on the basis of the sample being processed they can sometimes be wrong
# To help disambiguate we add the schema information
_type = None
2022-05-17 23:04:05 +00:00
2022-05-17 08:05:44 +00:00
if ' int ' in _df [ name ] . dtypes . name or ' int ' in _item [ ' type ' ] . lower ( ) :
_type = np . int
2022-05-17 23:04:05 +00:00
2022-05-17 08:05:44 +00:00
elif ' float ' in _df [ name ] . dtypes . name or ' float ' in _item [ ' type ' ] . lower ( ) :
_type = np . float
if _type :
2022-05-17 23:04:05 +00:00
2022-06-17 04:56:16 +00:00
_df [ name ] = _df [ name ] . fillna ( 0 ) . replace ( ' ' , 0 ) . replace ( ' ' , 0 ) . replace ( ' NA ' , 0 ) . replace ( ' nan ' , 0 ) . astype ( _type )
2022-05-17 23:04:05 +00:00
# else:
# _df[name] = _df[name].astype(str)
2022-05-17 08:05:44 +00:00
# _df = _df.replace('NaT','').replace('NA','')
2022-05-16 16:11:33 +00:00
2022-04-14 17:42:11 +00:00
if r :
self . log ( * * { ' action ' : ' format ' , ' input ' : r } )
2022-04-13 15:55:55 +00:00
return _df
2022-04-11 23:33:07 +00:00
pass
def post ( self , _candidates ) :
2022-08-09 17:22:07 +00:00
if ' target ' in self . store :
_store = self . store [ ' target ' ] if ' target ' in self . store else { ' provider ' : ' console ' }
_store [ ' lock ' ] = True
_store [ ' context ' ] = ' write ' #-- Just in case
if ' table ' not in _store :
_store [ ' table ' ] = self . info [ ' from ' ]
else :
_store = None
2022-04-13 14:36:21 +00:00
N = 0
2022-04-11 23:33:07 +00:00
for _iodf in _candidates :
_df = self . _df . copy ( )
_df [ self . columns ] = _iodf [ self . columns ]
2022-04-13 14:36:21 +00:00
N + = _df . shape [ 0 ]
2022-04-12 18:16:48 +00:00
#
#@TODO:
# Improve formatting with better post-processing pipeline
2022-04-12 04:27:25 +00:00
if ' approximate ' in self . info :
_df = self . approximate ( _df )
if ' make_date ' in self . info :
for name in self . info [ ' make_date ' ] :
# iname = self.info['make_date']['init_field']
iname = self . info [ ' make_date ' ] [ name ]
years = _df [ iname ]
2022-04-14 23:07:17 +00:00
_dates = [ self . make_date ( year = _year , field = name ) for _year in years ]
if _dates :
2022-04-14 18:37:47 +00:00
_df [ name ] = _dates
2022-04-14 23:07:17 +00:00
2022-04-12 19:00:03 +00:00
_schema = self . get_schema ( )
2022-08-10 16:29:21 +00:00
# _schema = [{'name':_item.name,'type':_item.field_type} for _item in _schema]
2022-04-13 15:55:55 +00:00
_df = self . format ( _df , _schema )
2022-05-17 23:04:05 +00:00
_log = [ { " name " : _schema [ i ] [ ' name ' ] , " dataframe " : _df [ _df . columns [ i ] ] . dtypes . name , " schema " : _schema [ i ] [ ' type ' ] } for i in np . arange ( len ( _schema ) ) ]
self . log ( * * { " action " : " consolidate " , " input " : _log } )
# w = transport.factory.instance(doc='observation',provider='mongodb',context='write',db='IOV01_LOGS',auth_file='/home/steve/dev/transport/mongo.json')
# w.write(_df)
2022-06-10 18:00:28 +00:00
# cols = [name for name in _df.columns if name.endswith('datetime')]
2022-05-17 23:04:05 +00:00
# print (_df[cols])
2022-08-09 17:22:07 +00:00
if _store :
writer = transport . factory . instance ( * * _store )
if _store [ ' provider ' ] == ' bigquery ' :
writer . write ( _df , schema = [ ] , table = self . info [ ' from ' ] )
else :
writer . write ( _df , table = self . info [ ' from ' ] )
2022-06-17 04:56:16 +00:00
else :
2022-08-09 17:22:07 +00:00
self . cache . append ( _df )
2022-05-17 23:04:05 +00:00
2022-04-13 15:07:27 +00:00
2022-04-13 16:11:23 +00:00
self . log ( * * { ' action ' : ' write ' , ' input ' : { ' rows ' : N , ' candidates ' : len ( _candidates ) } } )
2022-04-14 16:06:28 +00:00
class Shuffle ( Generator ) :
2022-04-14 15:06:27 +00:00
"""
This is a method that will yield data with low utility
"""
def __init__ ( self , * * _args ) :
2022-04-14 16:41:30 +00:00
super ( ) . __init__ ( * * _args )
2022-04-14 16:06:28 +00:00
def run ( self ) :
2022-04-21 15:53:19 +00:00
np . random . seed ( 1 )
2022-04-14 16:06:28 +00:00
self . initalize ( )
_index = np . arange ( self . _df . shape [ 0 ] )
np . random . shuffle ( _index )
2022-04-21 15:53:19 +00:00
np . random . shuffle ( _index )
2022-04-14 16:06:28 +00:00
_iocolumns = self . info [ ' columns ' ]
_ocolumns = list ( set ( self . _df . columns ) - set ( _iocolumns ) )
2022-04-14 16:41:30 +00:00
# _iodf = pd.DataFrame(self._df[_ocolumns],self._df.loc[_index][_iocolumns],index=np.arange(_index.size))
2022-04-21 15:53:19 +00:00
_iodf = pd . DataFrame ( self . _df [ _iocolumns ] . copy ( ) , index = np . arange ( _index . size ) )
# self._df = self._df.loc[_index][_ocolumns].join(_iodf)
self . _df = self . _df . loc [ _index ] [ _ocolumns ]
self . _df . index = np . arange ( self . _df . shape [ 0 ] )
self . _df = self . _df . join ( _iodf )
2022-04-21 16:12:09 +00:00
#
# The following is a full shuffle
self . _df = self . _df . loc [ _index ]
self . _df . index = np . arange ( self . _df . shape [ 0 ] )
2022-04-21 15:53:19 +00:00
2022-04-14 16:06:28 +00:00
_log = { ' action ' : ' io-data ' , ' input ' : { ' candidates ' : 1 , ' rows ' : int ( self . _df . shape [ 0 ] ) } }
self . log ( * * _log )
2022-04-14 23:07:17 +00:00
try :
self . post ( [ self . _df ] )
self . log ( * * { ' action ' : ' completed ' , ' input ' : { ' candidates ' : 1 , ' rows ' : int ( self . _df . shape [ 0 ] ) } } )
except Exception as e :
# print (e)
self . log ( * * { ' action ' : ' failed ' , ' input ' : { ' msg ' : e , ' info ' : self . info } } )
2022-08-09 17:22:07 +00:00
class apply :
TRAIN , GENERATE , RANDOM = ' train ' , ' generate ' , ' random '
2022-04-11 23:33:07 +00:00
class factory :
_infocache = { }
@staticmethod
def instance ( * * _args ) :
"""
An instance of an object that trains and generates candidate datasets
: param gpu ( optional ) index of the gpu to be used if using one
: param store { source , target } if no target is provided console will be output
: param epochs ( default 2 ) number of epochs to train
: param candidates ( default 1 ) number of candidates to generate
: param info { columns , sql , from }
: param autopilot will generate output automatically
: param batch ( default 2 k ) size of the batch
"""
2022-04-14 16:41:30 +00:00
2022-08-09 17:22:07 +00:00
if _args [ ' apply ' ] in [ apply . RANDOM ] :
pthread = Shuffle ( * * _args )
elif _args [ ' apply ' ] == apply . GENERATE :
pthread = Generator ( * * _args )
2022-04-14 16:27:55 +00:00
else :
2022-05-16 16:11:33 +00:00
pthread = Trainer ( * * _args )
2022-08-09 17:22:07 +00:00
if ' start ' in _args and _args [ ' start ' ] == True :
pthread . start ( )
return pthread