data-maker/data/bridge.py

328 lines
12 KiB
Python
Raw Permalink Normal View History

2019-10-07 16:49:12 +00:00
"""
Create pseudonyms map as follows :
table, field,value,enc,filter
"""
import pandas as pd
import numpy as np
from google.oauth2 import service_account
from google.cloud import bigquery as bq
import json
import threading
import sys
import os
import itertools
DATASET_SUFFIX = '_pseudo'
PSEUDO_TABLENAME = 'map'
SYS_ARGS = {'context':''}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i].replace('-','')
2019-12-12 17:04:41 +00:00
SYS_ARGS[key] = 1
2019-10-07 16:49:12 +00:00
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value:
SYS_ARGS[key] = value
2019-12-12 17:04:41 +00:00
2019-10-07 16:49:12 +00:00
i += 2
class void :
pass
class pseudonym :
@staticmethod
def meta(**args) :
"""
:key Bigquery private key (service account)
:dataset dataset of the input table
:table table name
:filter optional filter (SQL statement)
"""
credentials = service_account.Credentials.from_service_account_file(args['key'])
SQL = ["SELECT * FROM :dataset.:table"]
2019-10-08 17:37:35 +00:00
# if 'filter' in args :
# SQL += ['WHERE',args['filter']]
2019-10-07 16:49:12 +00:00
dataset = args['dataset']
table = args['table']
SQL = " ".join(SQL+["LIMIT 1"]).replace(":dataset",dataset).replace(":table",table)
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
return df.columns
@staticmethod
def apply(**args):
"""
This function applies the
"""
columns = pseudonym.meta(**args)
#
# we need to make the schema here
client = bq.Client.from_service_account_json(args['key'])
datasets = list(client.list_datasets())
dataset_name = args['dataset']+DATASET_SUFFIX
if np.sum( [ 1*(item.dataset_id == dataset_name) for item in datasets]) == 0:
#-- make the target dataset
dataset = bq.Dataset(client.dataset(dataset_name))
client.create_dataset(dataset)
for name in columns :
p = dict(args,**{"field":name})
2019-10-08 17:37:35 +00:00
p['filter'] = '' if 'filter' not in args else args['filter']
2019-10-07 16:49:12 +00:00
pseudonym.post(**p)
#
# let us submit the query
pass
@staticmethod
def post(**args) :
"""
This function will submit a query to bigquery for insertion
"""
SQL = " ".join(['SELECT DISTINCT CAST(',args['field']," AS STRING) AS values, COUNT(*) as counts FROM :dataset.:table :filter"]).replace(':dataset',args['dataset'])
SQL = SQL.replace(':table',args['table'])
if args['filter'].strip() != '' :
SQL = SQL.replace(":filter", "WHERE "+args['filter'])
2019-10-07 17:07:33 +00:00
else:
SQL = SQL.replace(":filter","")
2019-10-07 16:49:12 +00:00
SQL += " ".join(['GROUP BY ',args['field'],'ORDER BY 1 '])
TABLE_NAME = ".".join([args['dataset'],args['table']])
credentials = service_account.Credentials.from_service_account_file(args['key'])
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
df['table'] = args['table']
df['field'] = args['field']
# df['filter']= args['filter']
N = df.shape[0] + 10000
beg = np.random.randint(11,200)
df['encoded'] = np.random.choice(np.arange(beg,N),df.shape[0],replace=False)
df = df[['table','field','values','counts','encoded']]
# print (df.head()[:5])
# sys.stdout.flush()
TABLE_NAME = ".".join([args['dataset']+DATASET_SUFFIX,PSEUDO_TABLENAME])
2019-12-12 17:04:41 +00:00
df.to_gbq(TABLE_NAME,credentials=credentials,if_exists='append',chunksize=10000)
2019-10-07 16:49:12 +00:00
# df.to_gbq(TABLE_NAME.replace('.','_pseudo.'),credentials=credentials,if_exists='append')
class Builder :
"""
This class will build a dataset from encoded values
"""
def encode(self,**args):
"""
This function will create pseudonyms for a given table from the mapping tables
"""
SQL = "SELECT * FROM :dataset.:table limit 1".replace(':dataset',args['dataset']).replace(":table",args['table'])
credentials = service_account.Credentials.from_service_account_file(args['key'])
columns = pd.read_gbq(SQL,credentials=credentials,dialect='standard').columns.tolist()
TEMPLATE = ['(SELECT encoded FROM :dataset'+DATASET_SUFFIX+'.'+PSEUDO_TABLENAME,"WHERE table=':table' AND field = ':name' AND CAST(values AS STRING)=CAST(:table.:name AS STRING ) ) as :name"]
SQL = ["SELECT"]
FIELDS = []
2019-10-08 17:37:35 +00:00
FILTER = args['filter'] if 'filter' in args else ""
2019-10-07 16:49:12 +00:00
for field in columns :
FIELDS += [" ".join(TEMPLATE).replace(":name",field)]
2019-10-08 17:37:35 +00:00
# if field in FILTER :
# FILTER = FILTER.replace(field,'values')
2019-10-07 16:49:12 +00:00
SQL += [",\n\t".join(FIELDS)]
SQL += ['FROM :dataset.:table']
2019-10-08 17:37:35 +00:00
if FILTER != "" :
SQL += ["WHERE ",FILTER]
2019-10-07 16:49:12 +00:00
return ("\n".join(SQL).replace(":dataset",args['dataset']).replace(':table',args['table']) )
def process(self,**args):
"""
:dataset
:table
:key
"""
pseudonym.apply(**args)
def decode(self,**args):
"""
This function should be able to take a pseudonymized data frame and convert it to original values
...
"""
pass
class Binary :
"""
This is a utility class to import and export a data to/from a binary matrix
"""
def __stream(self,column,size=-1) :
2019-10-07 16:49:12 +00:00
"""
This function will convert a column into a binary matrix with the value-space representing each column of the resulting matrix
:column a column vector i.e every item is a row
"""
2019-12-12 17:04:41 +00:00
# values = np.unique(column)
2020-04-15 14:18:06 +00:00
# values = column.dropna().unique()
# values.sort()
# column = column.values
values = self.get_column(column,size)
column = column.values
2019-12-12 17:04:41 +00:00
#
# Let's treat the case of missing values i.e nulls
#
2019-10-07 16:49:12 +00:00
row_count,col_count = column.size,values.size
# if row_count * col_count > size and row_count < size:
2019-12-12 17:04:41 +00:00
matrix = [ np.zeros(col_count,dtype=np.float32) for i in np.arange(row_count)]
2019-10-07 16:49:12 +00:00
#
# let's create a binary matrix of the feature that was passed in
# The indices of the matrix are inspired by classical x,y axis
2019-12-12 17:04:41 +00:00
if col_count > 0 and values.size > 1:
for yi in np.arange(row_count) :
value = column[yi]
# if value not in values :
# continue
xi = np.where(values == value)
if xi and xi[0].size > 0:
xi = xi[0][0] #-- column index
matrix[yi][xi] = 1
return pd.DataFrame(matrix,columns=values)
def apply(self,column,size):
return self.__stream(column,size)
2020-04-15 14:18:06 +00:00
def get_column(self,column,size=-1):
"""
This function will return the columns that are available for processing ...
"""
values = column.dropna().value_counts().index.values
2020-04-15 15:23:14 +00:00
if size > 0 and column.size > size:
2020-04-15 14:18:06 +00:00
values = values[:size]
values.sort()
2020-04-15 14:18:06 +00:00
return values
def get_missing(self,column,size=-1):
values = column.dropna().value_counts().index.values
if size > 0 and column.size > size :
values = values[size:]
else:
values = np.array([])
values.sort()
return values.tolist();
2020-04-15 14:18:06 +00:00
def _get_column_values(self,column,size=-1):
values = column.dropna().unique()
values.sort()
2019-10-07 16:49:12 +00:00
#
# Let's treat the case of missing values i.e nulls
#
row_count,col_count = column.size,values.size
2020-04-15 14:18:06 +00:00
if col_count > size and size > 0:
# N = np.divide(size,row_count).astype(int)
# N =
i = np.random.choice(col_count,size)
values = values[-i]
2020-04-14 21:24:02 +00:00
col_count = size
return values
def _Export(self,df) :
2019-10-07 16:49:12 +00:00
"""
This function will convert a data-frame to a binary matrix
:return _map,matrix
"""
#
# This will give us a map of how each column was mapped to a bitstream
2019-12-12 17:04:41 +00:00
2020-02-18 08:59:39 +00:00
# _map = df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0)
# _map = df.fillna(np.nan).apply(lambda column: column,axis=0)
2019-12-12 17:04:41 +00:00
print (df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0))
2019-10-07 16:49:12 +00:00
#
# We will merge this to have a healthy matrix
_matrix = _map.apply(lambda row: list(list(itertools.chain(*row.values.tolist()))),axis=1)
2020-02-18 08:59:39 +00:00
_matrix = np.matrix([list(item) for item in _matrix]).astype(np.float32)
2019-10-07 16:49:12 +00:00
#
# let's format the map so we don't have an unreasonable amount of data
#
columns = _map.columns.tolist()
beg = 0
end = 0
_map = _map.loc[0]
_m = {}
for name in columns :
end += _map[name].size
_m[name] = {"start":beg,"end":end}
beg = end
2020-02-18 08:59:39 +00:00
# return _m,_matrix.astype(np.float32)
return _matrix
2019-10-07 16:49:12 +00:00
def Import(self,df,values,_map):
"""
This function will convert a binary stream into a
:values original/pseudonymed values
:_map field map of the binary matrix
"""
r = pd.DataFrame(None,columns=_map.keys())
for key in _map:
i = np.arange(_map[key]['start'],_map[key]['end'])
columns = values[key]
r[key] = df[i].apply(lambda row: np.array( columns)[row==1][0], axis=1 )
return r
pass
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
2019-12-12 17:04:41 +00:00
# df = pd.DataFrame({"fname":['james','james','steve','kevin','kevin'],"lname":["bond","dean","nyemba",'james','johnson']})
# df['age'] = (np.random.sample(df.shape[0]) * 100).astype(np.int32)
2019-10-07 16:49:12 +00:00
if __name__ == '__main__' :
"""
Run the program from the command line passing the following mandatory arguments
python bridge.py <[--pseudo|--export <PATH>]> --dataset <dataset> --table <tablename> [--filter <table filter>]
--pseudo will create pseudonyms for a given
--export will export data to a specified location
"""
df = pd.read_csv('sample.csv')
2020-04-15 14:18:06 +00:00
print ( df.race.value_counts())
print ( (Binary()).apply(df['race'], 3))
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
# if has_basic and has_action :
# builder = Builder()
# if 'export' in SYS_ARGS :
# print ()
# print ("exporting ....")
# if not os.path.exists(SYS_ARGS['export']) :
# os.mkdir(SYS_ARGS['export'])
# SQL = builder.encode(**SYS_ARGS)
# #
# # Assuming the user wants to filter the records returned :
# #
2019-10-08 17:37:35 +00:00
# credentials = service_account.Credentials.from_service_account_file(SYS_ARGS['key'])
# df = pd.read_gbq(SQL,credentials =credentials,dialect='standard')
# FILENAME = os.sep.join([SYS_ARGS['export'],SYS_ARGS['table']+'.csv'])
# #
# # This would allow us to export it to wherever we see fit
# print (FILENAME)
# df.to_csv(FILENAME,index=False)
# f = open(FILENAME.replace('.csv','.sql'),'w+')
# f.write(SQL)
# f.close()
# elif 'pseudo' in SYS_ARGS :
# builder.process(**SYS_ARGS)
# else:
# print ("")
# print (SYS_ARGS.keys())
# print ("has basic ",has_basic)
# print ("has action ",has_action)
2019-10-07 16:49:12 +00:00
# pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json')
# args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"}
# builder = Builder()
# # builder.encode(dataset='wgan_original',table='person',key='./curation-test-2.json')
# builder.process(**args)