2020-03-04 18:16:50 +00:00
#!/usr/bin/env python3
2020-03-01 18:07:02 +00:00
import json
from transport import factory
2020-03-04 17:49:18 +00:00
import numpy as np
2020-03-06 17:40:47 +00:00
import time
2020-03-01 18:07:02 +00:00
import os
2020-03-27 05:34:05 +00:00
from multiprocessing import Process , Lock
2020-03-01 18:07:02 +00:00
import pandas as pd
from google . oauth2 import service_account
2020-04-15 20:22:43 +00:00
from google . cloud import bigquery as bq
2020-03-01 18:07:02 +00:00
import data . maker
2021-03-29 16:10:57 +00:00
import copy
2020-03-01 18:07:02 +00:00
from data . params import SYS_ARGS
2021-04-13 15:24:36 +00:00
2020-03-01 18:07:02 +00:00
#
# The configuration array is now loaded and we will execute the pipe line as follows
class Components :
2020-03-27 05:34:05 +00:00
lock = Lock ( )
2020-03-25 22:43:23 +00:00
class KEYS :
PIPELINE_KEY = ' pipeline '
SQL_FILTER = ' filter '
2020-04-02 05:04:05 +00:00
@staticmethod
def get_filter ( * * args ) :
if args [ ' qualifier ' ] == ' IN ' :
return ' ' . join ( [ args [ ' field ' ] , args [ ' qualifier ' ] , ' ( ' , args [ ' value ' ] , ' ) ' ] )
else :
return ' ' . join ( [ args [ ' field ' ] , args [ ' qualifier ' ] , args [ ' value ' ] ] )
2020-03-25 22:43:23 +00:00
@staticmethod
def get_logger ( * * args ) :
return factory . instance ( type = ' mongo.MongoWriter ' , args = { ' dbname ' : ' aou ' , ' doc ' : args [ ' context ' ] } )
2020-03-04 17:49:18 +00:00
@staticmethod
def get ( args ) :
"""
This function returns a data - frame provided a bigquery sql statement with conditions ( and limits for testing purposes )
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code . ( Vital when testing )
: sql basic sql statement
: condition optional condition and filters
"""
SQL = args [ ' sql ' ]
2020-03-25 22:43:23 +00:00
if Components . KEYS . SQL_FILTER in args :
2020-04-02 05:04:05 +00:00
FILTER_KEY = Components . KEYS . SQL_FILTER
SQL_FILTER = args [ FILTER_KEY ] if type ( args [ FILTER_KEY ] ) == list else [ args [ FILTER_KEY ] ]
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
condition = ' AND ' . join ( [ Components . get_filter ( * * item ) for item in SQL_FILTER ] )
2020-03-04 17:49:18 +00:00
SQL = " " . join ( [ SQL , ' WHERE ' , condition ] )
2020-03-01 18:07:02 +00:00
2020-03-08 13:48:38 +00:00
SQL = SQL . replace ( ' :dataset ' , args [ ' dataset ' ] ) #+ " LI "
2020-03-04 17:49:18 +00:00
if ' limit ' in args :
2020-03-08 13:48:38 +00:00
SQL = SQL + ' LIMIT ' + args [ ' limit ' ]
2020-03-25 22:43:23 +00:00
#
# let's log the sql query that has been performed here
logger = factory . instance ( type = ' mongo.MongoWriter ' , args = { ' dbname ' : ' aou ' , ' doc ' : args [ ' context ' ] } )
logger . write ( { " module " : " bigquery " , " action " : " read " , " input " : { " sql " : SQL } } )
2020-03-04 17:49:18 +00:00
credentials = service_account . Credentials . from_service_account_file ( ' /home/steve/dev/aou/accounts/curation-prod.json ' )
2020-04-01 05:21:51 +00:00
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
2020-03-04 17:49:18 +00:00
return df
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
@staticmethod
def split ( X , MAX_ROWS = 3 , PART_SIZE = 3 ) :
return list ( pd . cut ( np . arange ( X . shape [ 0 ] + 1 ) , PART_SIZE ) . categories )
2021-03-30 10:18:28 +00:00
def format_schema ( self , schema ) :
_schema = { }
for _item in schema :
_type = int
_value = 0
if _item . field_type == ' FLOAT ' :
_type = float
elif _item . field_type != ' INTEGER ' :
_type = str
_value = ' '
_schema [ _item . name ] = _type
return _schema
def get_ignore ( self , * * _args ) :
if ' columns ' in _args and ' data ' in _args :
_df = _args [ ' data ' ]
terms = _args [ ' columns ' ]
2021-04-04 16:45:20 +00:00
return [ name for name in _df . columns if np . sum ( [ int ( field in name ) for field in terms ] ) ]
2021-03-30 10:18:28 +00:00
return [ ]
2021-04-01 18:09:06 +00:00
def set_gpu ( self , * * _args ) :
if ' gpu ' in _args :
gpu = _args [ ' gpu ' ] if type ( _args [ ' gpu ' ] ) != str else [ _args [ ' gpu ' ] ]
_index = str ( gpu [ 0 ] )
os . environ [ ' CUDA_VISIBLE_DEVICES ' ] = _index
return gpu
2021-04-01 18:20:35 +00:00
else :
return None
2020-03-04 17:49:18 +00:00
def train ( self , * * args ) :
"""
This function will perform training on the basis of a given pointer that reads data
2020-03-01 18:07:02 +00:00
2020-03-04 17:49:18 +00:00
"""
2021-03-29 16:10:57 +00:00
schema = None
if ' file ' in args :
df = pd . read_csv ( args [ ' file ' ] )
del args [ ' file ' ]
elif ' data ' not in args :
2022-03-24 16:38:52 +00:00
2021-03-29 16:10:57 +00:00
reader = factory . instance ( * * args [ ' store ' ] [ ' source ' ] )
2022-03-24 16:38:52 +00:00
2021-03-29 16:10:57 +00:00
if ' row_limit ' in args :
df = reader . read ( sql = args [ ' sql ' ] , limit = args [ ' row_limit ' ] )
else :
2022-03-24 16:38:52 +00:00
df = reader . read ( sql = args [ ' sql ' ] )
2021-03-29 16:10:57 +00:00
schema = reader . meta ( table = args [ ' from ' ] ) if hasattr ( reader , ' meta ' ) and ' from ' in args else None
2020-03-04 17:49:18 +00:00
else :
2021-03-29 16:10:57 +00:00
df = args [ ' data ' ]
2021-03-30 10:18:28 +00:00
#
#
2021-03-29 16:10:57 +00:00
# df = df.fillna('')
if schema :
2021-03-30 10:18:28 +00:00
_schema = [ ]
2021-03-29 16:10:57 +00:00
for _item in schema :
_type = int
_value = 0
if _item . field_type == ' FLOAT ' :
_type = float
elif _item . field_type != ' INTEGER ' :
_type = str
_value = ' '
2021-03-30 10:18:28 +00:00
_schema + = [ { " name " : _item . name , " type " : _item . field_type } ]
2021-03-29 16:10:57 +00:00
df [ _item . name ] = df [ _item . name ] . fillna ( _value ) . astype ( _type )
args [ ' schema ' ] = _schema
# df[_item.name] = df[_item.name].astype(_type)
_args = copy . deepcopy ( args )
# _args['store'] = args['store']['source']
_args [ ' data ' ] = df
2021-04-01 17:14:51 +00:00
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if ' continuous ' in args :
x_cols = args [ ' continuous ' ]
else :
x_cols = [ ]
2021-03-30 10:23:55 +00:00
if ' ignore ' in args and ' columns ' in args [ ' ignore ' ] :
_cols = self . get_ignore ( data = df , columns = args [ ' ignore ' ] [ ' columns ' ] )
_args [ ' data ' ] = df [ list ( set ( df . columns ) - set ( _cols ) ) ]
2021-04-01 17:14:51 +00:00
#
# We need to make sure that continuous columns are removed
if x_cols :
2021-04-01 18:43:07 +00:00
_args [ ' data ' ] = _args [ ' data ' ] [ list ( set ( _args [ ' data ' ] . columns ) - set ( x_cols ) ) ]
2021-04-01 17:43:09 +00:00
if ' gpu ' in args :
2021-04-01 18:09:06 +00:00
_args [ ' gpu ' ] = self . set_gpu ( gpu = args [ ' gpu ' ] )
2021-04-07 20:30:59 +00:00
if ' partition ' in args :
_args [ ' partition ' ] = args [ ' partition ' ]
2021-04-01 18:38:28 +00:00
if df . shape [ 0 ] and df . shape [ 0 ] :
#
# We have a full blown matrix to be processed
2022-01-13 21:05:00 +00:00
print ( ' -- Training -- ' )
2021-04-01 18:38:28 +00:00
data . maker . train ( * * _args )
else :
print ( " ... skipping training !! " )
2020-04-12 09:50:54 +00:00
2020-04-13 01:07:15 +00:00
if ' autopilot ' in ( list ( args . keys ( ) ) ) :
2021-04-07 20:30:59 +00:00
2021-03-30 10:18:28 +00:00
args [ ' data ' ] = df
2021-03-29 16:10:57 +00:00
print ( [ ' autopilot mode enabled .... ' , args [ ' context ' ] ] )
2020-04-13 01:07:15 +00:00
self . generate ( args )
2020-03-04 17:49:18 +00:00
pass
2020-04-29 06:27:25 +00:00
2021-04-01 17:14:51 +00:00
def approximate ( self , values ) :
"""
: param values array of values to be approximated
"""
if values . dtype in [ int , float ] :
2021-04-04 17:51:27 +00:00
#
# @TODO: create bins?
r = np . random . dirichlet ( values + .001 ) #-- dirichlet doesn't work on values with zeros
2021-04-04 18:52:15 +00:00
_sd = values [ values > 0 ] . std ( )
_me = values [ values > 0 ] . mean ( )
2021-04-07 20:30:59 +00:00
_mi = values . min ( )
2021-04-01 17:14:51 +00:00
x = [ ]
_type = values . dtype
for index in np . arange ( values . size ) :
if np . random . choice ( [ 0 , 1 ] , 1 ) [ 0 ] :
value = values [ index ] + ( values [ index ] * r [ index ] )
2021-04-04 18:52:15 +00:00
2021-04-01 17:14:51 +00:00
else :
value = values [ index ] - ( values [ index ] * r [ index ] )
2021-04-04 18:52:15 +00:00
#
# randomly shifting the measurements
2021-04-07 20:30:59 +00:00
if np . random . choice ( [ 0 , 1 ] , 1 ) [ 0 ] and _me > _sd :
2021-04-04 18:52:15 +00:00
if np . random . choice ( [ 0 , 1 ] , 1 ) [ 0 ] :
value = value * np . divide ( _me , _sd )
else :
value = value + ( np . divide ( _me , _sd ) )
2021-04-01 17:14:51 +00:00
value = int ( value ) if _type == int else np . round ( value , 2 )
x . append ( value )
np . random . shuffle ( x )
return np . array ( x )
else :
return values
2020-04-15 20:22:43 +00:00
pass
2021-04-12 17:55:01 +00:00
def shuffle ( self , _args ) :
if ' data ' in args :
df = data [ ' data ' ]
else :
reader = factory . instance ( * * args [ ' store ' ] [ ' source ' ] )
if ' file ' in args :
df = pd . read_csv ( args [ ' file ' ] )
2021-04-13 15:24:36 +00:00
elif ' data ' in _args :
df = _args [ ' data ' ]
2021-04-12 17:55:01 +00:00
else :
if ' row_limit ' in args and ' sql ' in args :
df = reader . read ( sql = args [ ' sql ' ] , limit = args [ ' row_limit ' ] )
else :
df = reader . read ( sql = args [ ' sql ' ] )
schema = None
if ' schema ' not in args and hasattr ( reader , ' meta ' ) and ' file ' not in args :
schema = reader . meta ( table = args [ ' from ' ] )
schema = [ { " name " : _item . name , " type " : _item . field_type } for _item in schema ]
#
# We are shufling designated colmns and will be approximating the others
#
x_cols = [ ] #-- coumns tobe approximated.
_cols = [ ] #-- columns to be ignored
if ' continuous ' in args :
x_cols = args [ ' continuous ' ]
if ' ignore ' in args and ' columns ' in args [ ' ignore ' ] :
_cols = self . get_ignore ( data = df , columns = args [ ' ignore ' ] [ ' columns ' ] )
2021-04-12 20:11:41 +00:00
columns = args [ ' columns ' ] if ' columns ' in args else df . columns
columns = list ( set ( columns ) - set ( _cols ) )
2021-04-13 22:41:30 +00:00
for name in columns :
i = np . arange ( df . shape [ 0 ] )
np . random . shuffle ( i )
if name in x_cols :
if df [ name ] . unique ( ) . size > 0 :
df [ name ] = self . approximate ( df . iloc [ i ] [ name ] . fillna ( 0 ) . values )
2021-04-13 22:53:15 +00:00
# df[name] = df[name].astype(str)
2021-04-13 15:24:36 +00:00
# pass
df . index = np . arange ( df . shape [ 0 ] )
2021-04-12 17:55:01 +00:00
self . post ( data = df , schema = schema , store = args [ ' store ' ] [ ' target ' ] )
def post ( self , * * _args ) :
2022-03-24 16:38:52 +00:00
table = _args [ ' from ' ] if ' from ' in _args else _args [ ' store ' ] [ ' table ' ]
2021-04-12 17:55:01 +00:00
_schema = _args [ ' schema ' ] if ' schema ' in _args else None
writer = factory . instance ( * * _args [ ' store ' ] )
_df = _args [ ' data ' ]
if _schema :
2021-05-10 20:02:55 +00:00
columns = [ ]
2021-04-12 17:55:01 +00:00
for _item in _schema :
2021-04-13 15:24:36 +00:00
name = _item [ ' name ' ]
_type = str
_value = 0
if _item [ ' type ' ] in [ ' DATE ' , ' TIMESTAMP ' , ' DATETIMESTAMP ' , ' DATETIME ' ] :
2022-03-24 16:38:52 +00:00
if _item [ ' type ' ] in [ ' DATE ' , ' TIMESTAMP ' , ' DATETIME ' ] :
2021-05-12 15:14:53 +00:00
#
# There is an issue with missing dates that needs to be resolved.
# for some reason a missing date/time here will cause the types to turn into timestamp (problem)
# The following is a hack to address the issue (alas) assuming 10 digit dates and 'NaT' replaces missing date values (pandas specifications)
#
2022-03-24 16:38:52 +00:00
_df [ name ] = _df [ name ] . apply ( lambda value : None if str ( value ) == ' NaT ' else ( str ( value ) [ : 10 ] ) if _item [ ' type ' ] in [ ' DATE ' , ' DATETIME ' ] else str ( value ) )
2021-05-12 15:14:53 +00:00
#_df[name] = _df[name].dt.date
# _df[name] = pd.to_datetime(_df[name].fillna(''),errors='coerce')
2022-01-13 21:05:00 +00:00
else :
pass
_df [ name ] = pd . to_datetime ( _df [ name ] )
2021-04-13 15:24:36 +00:00
else :
2022-01-13 21:05:00 +00:00
value = 0
2021-04-13 15:24:36 +00:00
if _item [ ' type ' ] == ' INTEGER ' :
_type = np . int64
elif _item [ ' type ' ] in [ ' FLOAT ' , ' NUMERIC ' ] :
_type = np . float64
else :
2022-01-13 21:05:00 +00:00
2021-04-13 15:24:36 +00:00
_value = ' '
2022-01-13 21:05:00 +00:00
_df [ name ] = _df [ name ] . fillna ( _value ) #.astype(_type)
2021-04-13 15:24:36 +00:00
columns . append ( name )
2022-03-24 16:38:52 +00:00
fields = _df . columns . tolist ( )
if not writer . has ( table = table ) and _args [ ' store ' ] [ ' provider ' ] != ' bigquery ' :
_map = { ' STRING ' : ' VARCHAR(256) ' , ' INTEGER ' : ' BIGINT ' } if ' provider ' in _args [ ' store ' ] and _args [ ' store ' ] [ ' provider ' ] != ' bigquery ' else { }
_params = { ' map ' : _map , ' table ' : args [ ' from ' ] }
if _schema :
_params [ ' schema ' ] = _schema
else :
_params [ ' fields ' ] = fields
writer . make ( * * _params )
fields = _df . columns . tolist ( )
_df = _df [ fields ]
# writer.fields = fields
if _args [ ' store ' ] [ ' provider ' ] == ' bigquery ' :
print ( [ ' _______ POSTING ______________ ' , table ] )
print ( [ ' _______________ ' , _df . shape [ 0 ] , ' ___________________ ' ] )
writer . write ( _df . astype ( object ) , schema = _schema , table = table )
2021-04-12 17:55:01 +00:00
else :
2022-03-24 16:38:52 +00:00
writer . table = table
writer . write ( _df )
# else:
# writer.write(_df,table=args['from'])
2020-04-15 20:22:43 +00:00
2021-04-28 21:47:38 +00:00
def finalize ( self , args ) :
"""
This function performs post - processing opertions on a synthetic table i . e :
- remove duplicate keys
- remove orphaned keys i . e
"""
reader = factory . instance ( * * args [ ' store ' ] [ ' source ' ] )
logger = factory . instance ( * * args [ ' store ' ] [ ' logs ' ] )
2022-03-24 16:38:52 +00:00
target = args [ ' store ' ] [ ' target ' ] [ ' args ' ] [ ' dataset ' ]
source = args [ ' store ' ] [ ' source ' ] [ ' args ' ] [ ' dataset ' ]
2021-04-28 21:47:38 +00:00
table = args [ ' from ' ]
schema = reader . meta ( table = args [ ' from ' ] )
#
# keys :
unique_field = " _ " . join ( [ args [ ' from ' ] , ' id ' ] ) if ' unique_fields ' not in args else args [ ' unique_fields ' ]
fields = [ item . name if item . name != unique_field else " y. " + item . name for item in schema ]
SQL = [
" SELECT :fields FROM " ,
" (SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x " , " INNER JOIN " ,
2021-04-28 23:16:55 +00:00
" (SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table ORDER BY RAND()) y " ,
2021-04-28 21:47:38 +00:00
" ON y.row_number = x.row_number "
]
SQL = " " . join ( SQL ) . replace ( " :fields " , " , " . join ( fields ) ) . replace ( " :table " , table ) . replace ( " :source " , source ) . replace ( " :target " , target )
SQL = SQL . replace ( " :unique_field " , unique_field )
#
# Use a native job to get this done ...
#
client = bq . Client . from_service_account_json ( args [ ' store ' ] [ ' source ' ] [ ' args ' ] [ " private_key " ] )
job = bq . QueryJobConfig ( )
job . destination = client . dataset ( target ) . table ( table )
job . use_query_cache = True
job . allow_large_results = True
# job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY)
job . write_disposition = " WRITE_TRUNCATE "
job . priority = ' BATCH '
r = client . query ( SQL , location = ' US ' , job_config = job )
logger . write ( { " job " : r . job_id , " action " : " finalize " , " args " : { " sql " : SQL , " source " : " " . join ( [ source , table ] ) , " destimation " : " . " . join ( [ target , table ] ) } } )
#
# Keep a log of what just happened...
#
otable = " . " . join ( [ args [ ' store ' ] [ ' source ' ] [ ' args ' ] [ ' dataset ' ] , args [ ' from ' ] ] )
dtable = " . " . join ( [ args [ ' store ' ] [ ' target ' ] [ ' args ' ] [ ' dataset ' ] , args [ ' from ' ] ] )
2020-03-04 17:49:18 +00:00
def generate ( self , args ) :
"""
This function will generate data and store it to a given ,
"""
2021-03-29 16:10:57 +00:00
store = args [ ' store ' ] [ ' logs ' ]
2022-03-24 16:38:52 +00:00
if ' args ' in store :
store [ ' args ' ] [ ' doc ' ] = args [ ' context ' ]
else :
store [ ' doc ' ] = args [ ' context ' ]
2021-03-29 16:10:57 +00:00
logger = factory . instance ( * * store ) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
ostore = args [ ' store ' ] [ ' target ' ]
writer = factory . instance ( * * ostore )
2021-03-30 10:18:28 +00:00
2021-03-29 16:10:57 +00:00
schema = args [ ' schema ' ] if ' schema ' in args else None
2021-03-30 10:18:28 +00:00
if ' data ' in args :
2020-04-14 12:26:24 +00:00
2021-03-30 10:18:28 +00:00
df = args [ ' data ' ]
2021-03-29 16:10:57 +00:00
else :
2020-04-12 09:50:54 +00:00
2021-03-30 10:18:28 +00:00
reader = factory . instance ( * * args [ ' store ' ] [ ' source ' ] )
if ' row_limit ' in args :
df = reader . read ( sql = args [ ' sql ' ] , limit = args [ ' row_limit ' ] )
2021-03-29 16:10:57 +00:00
else :
2021-03-30 10:18:28 +00:00
df = reader . read ( sql = args [ ' sql ' ] )
if ' schema ' not in args and hasattr ( reader , ' meta ' ) :
schema = reader . meta ( table = args [ ' from ' ] )
schema = [ { " name " : _item . name , " type " : _item . field_type } for _item in schema ]
# else:
# #
# # This will account for autopilot mode ...
# df = args['data']
2021-04-04 17:05:23 +00:00
_cast = { }
if schema :
2022-03-24 16:38:52 +00:00
2021-04-04 17:11:40 +00:00
for _item in schema :
2021-04-04 17:09:34 +00:00
dtype = str
name = _item [ ' name ' ]
2022-01-13 21:05:00 +00:00
novalue = 0
2021-04-04 17:51:27 +00:00
if _item [ ' type ' ] in [ ' INTEGER ' , ' NUMERIC ' ] :
2021-04-04 17:09:34 +00:00
dtype = np . int64
elif _item [ ' type ' ] == ' FLOAT ' :
dtype = np . float64
else :
novalue = ' '
# _cast[schema['name']] = dtype
df [ name ] = df [ name ] . fillna ( novalue ) . astype ( dtype )
2021-04-04 17:05:23 +00:00
_info = { " module " : " gan-prep " , " action " : " read " , " shape " : { " rows " : df . shape [ 0 ] , " columns " : df . shape [ 1 ] } , " schema " : schema }
2021-04-04 16:17:34 +00:00
logger . write ( _info )
2020-04-02 12:52:09 +00:00
2021-03-29 16:10:57 +00:00
2020-04-02 05:04:05 +00:00
_dc = pd . DataFrame ( )
# for mdf in df :
2021-04-01 18:38:28 +00:00
args [ ' data ' ] = df . copy ( )
2021-04-01 17:14:51 +00:00
#
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
if ' continuous ' in args :
x_cols = args [ ' continuous ' ]
else :
x_cols = [ ]
2021-03-30 15:48:25 +00:00
if ' ignore ' in args and ' columns ' in args [ ' ignore ' ] :
_cols = self . get_ignore ( data = df , columns = args [ ' ignore ' ] [ ' columns ' ] )
2021-04-01 18:53:29 +00:00
args [ ' data ' ] = args [ ' data ' ] [ list ( set ( df . columns ) - set ( _cols ) ) ]
2021-04-01 17:14:51 +00:00
#
# We need to remove the continuous columns from the data-frame
# @TODO: Abstract this !!
#
2021-04-01 19:04:55 +00:00
real_df = pd . DataFrame ( )
2021-04-01 17:14:51 +00:00
if x_cols :
2021-04-01 18:43:07 +00:00
args [ ' data ' ] = args [ ' data ' ] [ list ( set ( args [ ' data ' ] . columns ) - set ( x_cols ) ) ]
2021-04-01 19:13:00 +00:00
real_df = df [ x_cols ] . copy ( )
2021-04-01 17:14:51 +00:00
2021-03-29 16:10:57 +00:00
args [ ' candidates ' ] = 1 if ' candidates ' not in args else int ( args [ ' candidates ' ] )
2021-04-01 18:09:06 +00:00
if ' gpu ' in args :
args [ ' gpu ' ] = self . set_gpu ( gpu = args [ ' gpu ' ] )
2021-04-07 20:30:59 +00:00
# if 'partition' in args :
# args['logs'] = os.sep.join([args['logs'],str(args['partition'])])
2021-04-04 16:26:24 +00:00
_info = { " module " : " gan-prep " , " action " : " prune " , " shape " : { " rows " : args [ ' data ' ] . shape [ 0 ] , " columns " : args [ ' data ' ] . shape [ 1 ] } }
2021-04-04 16:17:34 +00:00
logger . write ( _info )
2021-04-04 16:45:20 +00:00
if args [ ' data ' ] . shape [ 0 ] > 0 and args [ ' data ' ] . shape [ 1 ] > 0 :
candidates = ( data . maker . generate ( * * args ) )
2022-03-24 16:38:52 +00:00
2021-04-04 16:45:20 +00:00
else :
2021-04-04 16:46:36 +00:00
candidates = [ df ]
2022-03-24 16:38:52 +00:00
# if 'sql.BQWriter' in ostore['type'] :
_columns = None
skip_columns = [ ]
_schema = schema
if schema :
cols = [ _item [ ' name ' ] for _item in _schema ]
else :
cols = df . columns . tolist ( )
_info = { " module " : " gan-prep " , " action " : " selection " , " input " : { " candidates " : len ( candidates ) , " features " : cols } }
logger . write ( _info )
for _df in candidates :
#
# we need to format the fields here to make sure we have something cohesive
#
2021-03-29 16:10:57 +00:00
2022-03-24 16:38:52 +00:00
if not skip_columns :
if ' ignore ' in args and ' columns ' in args [ ' ignore ' ] :
skip_columns = self . get_ignore ( data = _df , columns = args [ ' ignore ' ] [ ' columns ' ] )
#
# We perform a series of set operations to insure that the following conditions are met:
# - the synthetic dataset only has fields that need to be synthesized
# - The original dataset has all the fields except those that need to be synthesized
#
_df = _df [ list ( set ( _df . columns ) - set ( skip_columns ) ) ] . copy ( )
if x_cols :
_approx = { }
for _col in x_cols :
if real_df [ _col ] . unique ( ) . size > 0 :
2021-04-04 18:29:57 +00:00
2022-03-24 16:38:52 +00:00
_df [ _col ] = self . approximate ( real_df [ _col ] . values )
_approx [ _col ] = {
" io " : { " min " : _df [ _col ] . min ( ) . astype ( float ) , " max " : _df [ _col ] . max ( ) . astype ( float ) , " mean " : _df [ _col ] . mean ( ) . astype ( float ) , " sd " : _df [ _col ] . values . std ( ) . astype ( float ) , " missing " : _df [ _col ] . where ( _df [ _col ] == - 1 ) . dropna ( ) . count ( ) . astype ( float ) , " zeros " : _df [ _col ] . where ( _df [ _col ] == 0 ) . dropna ( ) . count ( ) . astype ( float ) } ,
" real " : { " min " : real_df [ _col ] . min ( ) . astype ( float ) , " max " : real_df [ _col ] . max ( ) . astype ( float ) , " mean " : real_df [ _col ] . mean ( ) . astype ( float ) , " sd " : real_df [ _col ] . values . std ( ) . astype ( float ) , " missing " : real_df [ _col ] . where ( _df [ _col ] == - 1 ) . dropna ( ) . count ( ) . astype ( float ) , " zeros " : real_df [ _col ] . where ( _df [ _col ] == 0 ) . dropna ( ) . count ( ) . astype ( float ) }
}
else :
_df [ _col ] = - 1
logger . write ( { " module " : " gan-generate " , " action " : " approximate " , " status " : _approx } )
if set ( df . columns ) & set ( _df . columns ) :
_columns = list ( set ( df . columns ) - set ( _df . columns ) )
df = df [ _columns ]
2021-03-29 16:10:57 +00:00
2022-03-24 16:38:52 +00:00
#
# Let us merge the dataset here and and have a comprehensive dataset
2021-03-29 16:10:57 +00:00
2022-03-24 16:38:52 +00:00
_df = pd . DataFrame . join ( df , _df )
2022-04-11 23:33:07 +00:00
_params = { ' data ' : _df , ' store ' : ostore , ' from ' : args [ ' from ' ] }
2022-03-24 16:38:52 +00:00
if _schema :
_params [ ' schema ' ] = _schema
_info = { " module " : " gan-prep " , " action " : " write " , " input " : { " rows " : _df . shape [ 0 ] , " cols " : _df . shape [ 1 ] } }
logger . write ( _info )
self . post ( * * _params )
# print (['_______ posting _________________',_df.shape])
break
2021-04-12 17:55:01 +00:00
2021-03-29 16:10:57 +00:00
pass
2021-04-12 17:55:01 +00:00
# else:
# pass
2022-03-24 16:38:52 +00:00
def bind ( self , * * _args ) :
print ( _args )
2020-03-25 22:43:23 +00:00
2020-03-04 17:49:18 +00:00
2020-03-01 18:07:02 +00:00
if __name__ == ' __main__ ' :
2020-03-04 17:49:18 +00:00
filename = SYS_ARGS [ ' config ' ] if ' config ' in SYS_ARGS else ' config.json '
f = open ( filename )
2020-03-25 22:43:23 +00:00
_config = json . loads ( f . read ( ) )
2020-03-04 17:49:18 +00:00
f . close ( )
2020-03-25 22:43:23 +00:00
PIPELINE = _config [ ' pipeline ' ]
2020-03-16 21:22:34 +00:00
index = SYS_ARGS [ ' index ' ]
if index . isnumeric ( ) :
index = int ( SYS_ARGS [ ' index ' ] )
else :
#
# The index provided is a key to a pipeline entry mainly the context
#
N = len ( PIPELINE )
f = [ i for i in range ( 0 , N ) if PIPELINE [ i ] [ ' context ' ] == index ]
index = f [ 0 ] if f else 0
#
2020-04-02 05:04:05 +00:00
2022-01-13 21:05:00 +00:00
print ( " ..::: " , PIPELINE [ index ] [ ' context ' ] , ' :::.. ' )
2020-03-04 17:49:18 +00:00
args = ( PIPELINE [ index ] )
2020-03-25 22:43:23 +00:00
for key in _config :
if key == ' pipeline ' or key in args :
#
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
#
continue
args [ key ] = _config [ key ]
2020-04-02 05:04:05 +00:00
2020-03-04 17:49:18 +00:00
args = dict ( args , * * SYS_ARGS )
2020-04-14 21:24:02 +00:00
if ' matrix_size ' in args :
args [ ' matrix_size ' ] = int ( args [ ' matrix_size ' ] )
2020-04-01 05:53:56 +00:00
if ' batch_size ' not in args :
args [ ' batch_size ' ] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
2020-03-04 20:30:40 +00:00
if ' dataset ' not in args :
args [ ' dataset ' ] = ' combined20191004v2_deid '
2022-01-13 21:05:00 +00:00
args [ ' logs ' ] = args [ ' logs ' ] if ' logs ' in args else ' logs '
2020-03-08 13:48:38 +00:00
PART_SIZE = int ( args [ ' part_size ' ] ) if ' part_size ' in args else 8
2020-03-04 17:49:18 +00:00
#
# @TODO:
# Log what was initiated so we have context of this processing ...
#
2020-04-15 20:51:53 +00:00
2021-04-13 22:41:30 +00:00
GPU_CHIPS = args [ ' gpu ' ] if ' gpu ' in args else None
2021-04-07 20:30:59 +00:00
if GPU_CHIPS and type ( GPU_CHIPS ) != list :
GPU_CHIPS = [ int ( _id . strip ( ) ) for _id in GPU_CHIPS . split ( ' , ' ) ] if type ( GPU_CHIPS ) == str else [ GPU_CHIPS ]
if ' gpu ' in SYS_ARGS :
args [ ' gpu ' ] = GPU_CHIPS
jobs = [ ]
2020-03-04 17:49:18 +00:00
if ' generate ' in SYS_ARGS :
#
# Let us see if we have partitions given the log folder
2021-03-29 16:10:57 +00:00
content = os . listdir ( os . sep . join ( [ args [ ' logs ' ] , ' train ' , args [ ' context ' ] ] ) )
2021-04-13 22:41:30 +00:00
if ' all-chips ' in SYS_ARGS and GPU_CHIPS :
2021-04-07 20:30:59 +00:00
index = 0
jobs = [ ]
2021-05-10 19:33:18 +00:00
for _gpu in GPU_CHIPS :
2021-04-07 20:30:59 +00:00
_args = copy . deepcopy ( args )
_args [ ' gpu ' ] = [ int ( _gpu ) ]
2021-05-10 19:33:18 +00:00
_args [ ' partition ' ] = int ( _gpu ) #index
2021-04-07 20:30:59 +00:00
index + = 1
make = lambda _params : ( Components ( ) ) . generate ( _params )
job = Process ( target = make , args = ( dict ( _args ) , ) )
job . name = ' Trainer # ' + str ( index )
job . start ( )
jobs . append ( job )
pass
else :
2022-01-13 21:05:00 +00:00
2021-04-07 20:30:59 +00:00
generator = Components ( )
generator . generate ( args )
2022-03-24 16:38:52 +00:00
elif ' bind ' in SYS_ARGS :
import binder
_args = _config [ ' _map ' ]
_args [ ' store ' ] = copy . deepcopy ( _config [ ' store ' ] )
if ' init ' in SYS_ARGS :
#
# Creating and persisting the map ...
print ( [ ' .... Binding Initialization ' ] )
# jobs = binder.Init(**_args)
_mapped = binder . Init ( * * _args )
_schema = [ { " name " : _name , " type " : " INTEGER " } for _name in _mapped . columns . tolist ( ) ]
publisher = lambda _params : ( Components ( ) ) . post ( * * _params )
_args = { ' data ' : _mapped , ' store ' : _config [ ' store ' ] [ ' target ' ] }
_args [ ' store ' ] [ ' table ' ] = ' _map '
if _args [ ' store ' ] [ ' provider ' ] == ' bigquery ' :
_args [ ' schema ' ] = _schema
job = Process ( target = publisher , args = ( _args , ) )
job . start ( )
jobs = [ job ]
else :
#
# Applying the map of k on a particular dataset
#
index = int ( SYS_ARGS [ ' index ' ] )
_args [ ' config ' ] = _config [ ' pipeline ' ] [ index ]
_args [ ' original_key ' ] = ' person_id ' if ' original_key ' in _config else ' person_id '
table = _config [ ' pipeline ' ] [ index ] [ ' from ' ]
_df = binder . ApplyOn ( * * _args )
_df = np . array_split ( _df , PART_SIZE )
jobs = [ ]
print ( [ ' Publishing ' , PART_SIZE , ' PARTITION ' ] )
for data in _df :
publisher = lambda _params : ( Components ( ) ) . post ( * * _params )
_args = { ' data ' : data , ' store ' : _config [ ' store ' ] [ ' target ' ] }
_args [ ' store ' ] [ ' table ' ] = table
print ( _args [ ' store ' ] )
job = Process ( target = publisher , args = ( _args , ) )
job . name = " Publisher " + str ( len ( jobs ) + 1 )
job . start ( )
jobs . append ( job )
2021-04-12 17:55:01 +00:00
elif ' shuffle ' in SYS_ARGS :
index = 0
2021-04-13 22:41:30 +00:00
if GPU_CHIPS and ' all-chips ' in SYS_ARGS :
2021-04-12 17:55:01 +00:00
for index in GPU_CHIPS :
publisher = lambda _params : ( Components ( ) ) . shuffle ( _params )
2021-04-13 22:46:24 +00:00
job = Process ( target = publisher , args = ( args , ) )
2021-04-12 17:55:01 +00:00
job . name = ' Shuffler # ' + str ( index )
job . start ( )
jobs . append ( job )
else :
shuffler = Components ( )
shuffler . shuffle ( args )
pass
2021-04-28 21:47:38 +00:00
elif ' train ' in SYS_ARGS :
2020-03-08 13:48:38 +00:00
# DATA = np.array_split(DATA,PART_SIZE)
2021-04-07 20:30:59 +00:00
#
# Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition
# @TODO: Find better name for partition
#
2022-03-24 16:38:52 +00:00
2021-04-13 22:41:30 +00:00
if GPU_CHIPS and ' all-chips ' in SYS_ARGS :
2021-04-07 20:30:59 +00:00
index = 0
2021-04-13 22:43:43 +00:00
print ( [ ' ... launching ' , len ( GPU_CHIPS ) , ' jobs ' , args [ ' context ' ] ] )
2021-04-07 20:30:59 +00:00
for _gpu in GPU_CHIPS :
_args = copy . deepcopy ( args )
_args [ ' gpu ' ] = [ int ( _gpu ) ]
2021-05-10 19:33:18 +00:00
_args [ ' partition ' ] = int ( _gpu ) #index
2021-04-07 20:30:59 +00:00
index + = 1
make = lambda _params : ( Components ( ) ) . train ( * * _params )
2021-04-13 22:43:43 +00:00
job = Process ( target = make , args = ( _args , ) )
2021-04-07 20:30:59 +00:00
job . name = ' Trainer # ' + str ( index )
job . start ( )
jobs . append ( job )
else :
#
# The choice of the chip will be made internally
2022-03-24 16:38:52 +00:00
2021-04-07 20:30:59 +00:00
agent = Components ( )
agent . train ( * * args )
#
# If we have any obs we should wait till they finish
#
2021-04-28 21:47:38 +00:00
DIRTY = 0
2022-03-24 16:38:52 +00:00
if ( len ( jobs ) ) :
print ( [ ' .... waiting on ' , len ( jobs ) , ' jobs ' ] )
2021-04-28 21:47:38 +00:00
while len ( jobs ) > 0 :
DIRTY = 1
jobs = [ job for job in jobs if job . is_alive ( ) ]
time . sleep ( 2 )
2021-05-12 15:37:40 +00:00
if DIRTY :
print ( [ " ..:: jobs finished " ] )
2021-04-28 21:47:38 +00:00
#
# We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations
2022-03-24 16:38:52 +00:00
# This holds true for bigquery - bigquery only
IS_BIGQUERY = _config [ ' store ' ] [ ' source ' ] [ ' provider ' ] == _config [ ' store ' ] [ ' target ' ] [ ' provider ' ] and _config [ ' store ' ] [ ' source ' ] [ ' provider ' ] == ' bigquery '
2020-03-08 13:48:38 +00:00
2022-03-24 16:38:52 +00:00
# if 'bind' not in SYS_ARGS and IS_BIGQUERY and ('autopilot' in SYS_ARGS or 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS)) :
# #
# # We should pull all the primary keys and regenerate them in order to insure some form of consistency
# #
2020-03-07 15:16:17 +00:00
2022-03-24 16:38:52 +00:00
# #
# #
2020-03-04 17:49:18 +00:00
2022-03-24 16:38:52 +00:00
# print (["..:: Finalizing process"])
# (Components()).finalize(args)