data-maker/data/maker/prepare/__init__.py

262 lines
10 KiB
Python

"""
(c) 2018 - 2021, Vanderbilt University Medical Center
Steve L. Nyemba, steve.l.nyemba@vumc.org
This file is designed to handle preconditions for a generative adversarial network:
- The file will read/get data from a source specified by transport (or data-frame)
- The class will convert the data to a binary vector
- The class will also help rebuild the data from a binary matrix.
Usage :
"""
import transport
import json
import pandas as pd
import numpy as np
# import cupy as cp
import sys
import os
#
# The following is to address the issue over creating a large matrix ...
#
# from multiprocessing import Process, Queue
# if 'GPU' in os.environ :
# import cupy as np
# else:
# import numpy as np
class void:
pass
class Hardware :
"""
This class is intended to allow the use of hardware i.e GPU, index or CPU
"""
pass
class Input :
"""
This class is designed to read data from a source and and perform a variet of operations :
- provide a feature space, and rows (matrix profile)
- a data index map
"""
# def learn(self,**_args):
# """
# This function is designed to learn about, the data and persist
# :param table
# :param store
# """
# table = _args['table']
# reader = transport.factory.instance(**_args['store'])
# df = reader.read(table=table,limit=1)
# self.columns = df.columns.tolist()
# self._metadf = pd.DataFrame(self.df[self._columns].dtypes.values.astype(str)).T #,self._columns]
# self._metadf.columns = self._columns
# sql = "SELECT :fields from :table".replace(":table",table)
def __init__(self,**_args):
"""
:param table
:param store data-store parameters/configuration
:param sql sql query that pulls a representative sample of the data
"""
self._schema = _args['schema'] if 'schema' in _args else {}
self.df = _args['data']
if 'sql' not in _args :
self._initdata(**_args)
#
pass
else:
self._initsql(**_args)
self._map = {} if 'map' not in _args else _args['map']
# self._metadf = pd.DataFrame(self.df[self._columns].dtypes.values.astype(str)).T #,self._columns]
# self._metadf.columns = self._columns
# if 'gpu' in _args and 'GPU' in os.environ:
# np = cp
# index = int(_args['gpu'])
# np.cuda.Device(index).use()
# print(['..:: GPU ',index])
def _initsql(self,**_args):
"""
This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized
:param store data-store configuration
:param sql sql query to be applied to the transported data
:param columns list of columns to be
"""
# _store_args = _args['store']
# reader = transport.factory.instance(**_store_args)
# sql = _args['sql']
# self.df = reader.read(sql=_args['sql'])
if 'columns' not in _args :
self._initcols(data=self.df)
else:
self._initcols(data=self.df,columns=_args['columns'])
pass
def _initcols (self,**_args) :
"""
This function will initialize the columns to be synthesized and/or determine which ones can be synthesized
:param data data-frame that holds the data (matrix)
:param columns optional columns to be synthesized
"""
# df = _args['data'].copy()
row_count = self.df.shape[0]
cols = None if 'columns' not in _args else _args['columns']
self.columns = self.df.columns.tolist()
self._io = []
if 'columns' in _args :
self._columns = _args['columns']
# else:
#
# We will look into the count and make a judgment call
_df = pd.DataFrame(self.df.apply(lambda col: col.dropna().unique().size )).T
MIN_SPACE_SIZE = 2
self._columns = cols if cols else _df.apply(lambda col:None if col[0] == row_count or col[0] < MIN_SPACE_SIZE else col.name).dropna().tolist()
self._io = _df.to_dict(orient='records')
def _initdata(self,**_args):
"""
This function will initialize the class with a data-frame and columns of interest (if any)
:param data data-frame that holds the data
:param columns columns that need to be synthesized if any
"""
#
# setting class-level variables to be reused across the class
# self.df = _args['data']
row_count = self.df.shape[0]
# self.columns = self.df.columns
# self._metadf = self.df.apply(lambda col: col.unique().size)
# _df = pd.DataFrame(self.df.apply(lambda col: col.unique().size )).T
# cols = None if 'columns' not in _args else _args['columns']
self._initcols(**_args)
def convert(self,**_args):
"""
This function will convert a data-frame into a binary matrix and provide a map to be able to map the values back to the matrix
:param columns in case we specify the columns to account for (just in case the original assumptions don't hold)
"""
if 'columns' in _args or 'column' in _args :
columns = _args['columns'] if 'columns' in _args else [_args['column']]
else:
columns = self._columns
_df = self.df if 'data' not in _args else _args['data']
#
# At this point we have the list of features we want to use
i = 0
_m = np.array([])
_values = []
for name in columns :
#
# In case we have dataset with incomplete value space, we should still be able to generate something meaningful
#
values = None if name not in self._map else list(self._map[name]['values'])
_type = self._schema[name] if name in self._schema else _df[name].dtype
cols, _matrix = self.tobinary(_df[name],values)
_beg,_end = i,i+len(cols)
if name not in self._map :
self._map[name] = {"beg":_beg,"end":_end ,"values":cols}
i += len(cols)
if not _m.shape[0]:
_m = _matrix ;
else:
_m = np.concatenate((_m,_matrix),axis=1)
if values :
_values += list(values)
#
# @NOTE:
# The map should allow us to be able to convert or reconvert the binary matrix to whatever we want ...
#
# self._matrix = _m
return _values,_m
def revert(self,**_args) :
"""
This function will take in a binary matrix and based on the map of values it will repopulate it with values
:param _matrix binary matrix
:param column|columns column name or columns if the column is specified
"""
_column = _args['column'] if 'column' in _args else None
matrix = _args['matrix']
row_count = matrix.shape[0]
r = {}
for key in self._map :
if _column and key != _column :
continue
_item = self._map[key]
_beg = _item['beg']
_end = _item['end']
columns = np.array(_item['values'])
#
# @NOTE: We are accessing matrices in terms of [row,col],
# The beg,end variables are for the columns in the matrix (mini matrix)
#
# if not _column :
# _matrix = matrix[:,_beg:_end] #-- The understanding is that _end is not included
# else:
# _matrix = matrix
_matrix = matrix[:,_beg:_end]
#
# vectorize the matrix to replace the bits by their actual values (accounting for the data-types)
# @TODO: Find ways to do this on a GPU (for big data) or across threads
#
row_count = _matrix.shape[0]
# r[key] = [columns[np.where(row == 1) [0][0] ] for row in _matrix[:,_beg:_end]]
r[key] = [columns[np.where(row==1)[0][0]] if np.where(row==1)[0].size > 0 else '' for row in _matrix]
return pd.DataFrame(r)
def tobinary(self,rows,cols=None) :
"""
This function will compile a binary matrix from a row of values this allows hopefully this can be done in parallel, this function can be vectorized and processed
:param rows np.array or list of vector of values
:param cols a space of values if it were to be different fromt he current sample.
"""
if not cols:
#
# In the advent the sample rows do NOT have the values of the
cols = rows.unique()
cols = np.array(cols)
row_count = len(rows)
# if 'GPU' not in os.environ :
# _matrix = np.zeros([row_count,cols.size],dtype=int)
#
# @NOTE: For some reason, there is an out of memory error created here, this seems to fix it (go figure)
#
_matrix = np.array([np.repeat(0,cols.size) for i in range(0,row_count)])
[np.put(_matrix[i], np.where(cols == rows[i]) ,1)for i in np.arange(row_count) if np.where(cols == rows[i])[0].size > 0]
# else:
# _matrix = cp.zeros([row_count,cols.size])
# [cp.put(_matrix[i], cp.where(cols == rows[i]),1)for i in cp.arange(row_count) ]
# _matrix = _matrix.asnumpy()
return cols,_matrix
if __name__ == '__main__' :
df = pd.read_csv('../../sample.csv')
_input = Input(data=df,columns=['age','race'])
_m = _input.convert(column='age')
print (_m.shape)
print (_input.revert(matrix=_m,column='age'))
print (_input._metadf)
# _args = {"store":{"type":"sql.BQReader","args":{"service_key":"/home/steve/dev/aou/accounts/curation-prod.json"}}}
# _args['table'] = 'io.observation'
# _i = Input(**_args)
# df = pd.read_csv('../../sample.csv')
# print (Input.ToBinary(df.age))