2019-10-07 16:49:12 +00:00
"""
Create pseudonyms map as follows :
table , field , value , enc , filter
"""
import pandas as pd
import numpy as np
from google . oauth2 import service_account
from google . cloud import bigquery as bq
import json
import threading
import sys
import os
import itertools
DATASET_SUFFIX = ' _pseudo '
PSEUDO_TABLENAME = ' map '
SYS_ARGS = { ' context ' : ' ' }
if len ( sys . argv ) > 1 :
N = len ( sys . argv )
for i in range ( 1 , N ) :
value = None
if sys . argv [ i ] . startswith ( ' -- ' ) :
key = sys . argv [ i ] . replace ( ' - ' , ' ' )
2019-12-12 17:04:41 +00:00
SYS_ARGS [ key ] = 1
2019-10-07 16:49:12 +00:00
if i + 1 < N :
value = sys . argv [ i + 1 ] = sys . argv [ i + 1 ] . strip ( )
if key and value :
SYS_ARGS [ key ] = value
2019-12-12 17:04:41 +00:00
2019-10-07 16:49:12 +00:00
i + = 2
class void :
pass
class pseudonym :
@staticmethod
def meta ( * * args ) :
"""
: key Bigquery private key ( service account )
: dataset dataset of the input table
: table table name
: filter optional filter ( SQL statement )
"""
credentials = service_account . Credentials . from_service_account_file ( args [ ' key ' ] )
SQL = [ " SELECT * FROM :dataset.:table " ]
2019-10-08 17:37:35 +00:00
# if 'filter' in args :
# SQL += ['WHERE',args['filter']]
2019-10-07 16:49:12 +00:00
dataset = args [ ' dataset ' ]
table = args [ ' table ' ]
SQL = " " . join ( SQL + [ " LIMIT 1 " ] ) . replace ( " :dataset " , dataset ) . replace ( " :table " , table )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
return df . columns
@staticmethod
def apply ( * * args ) :
"""
This function applies the
"""
columns = pseudonym . meta ( * * args )
#
# we need to make the schema here
client = bq . Client . from_service_account_json ( args [ ' key ' ] )
datasets = list ( client . list_datasets ( ) )
dataset_name = args [ ' dataset ' ] + DATASET_SUFFIX
if np . sum ( [ 1 * ( item . dataset_id == dataset_name ) for item in datasets ] ) == 0 :
#-- make the target dataset
dataset = bq . Dataset ( client . dataset ( dataset_name ) )
client . create_dataset ( dataset )
for name in columns :
p = dict ( args , * * { " field " : name } )
2019-10-08 17:37:35 +00:00
p [ ' filter ' ] = ' ' if ' filter ' not in args else args [ ' filter ' ]
2019-10-07 16:49:12 +00:00
pseudonym . post ( * * p )
#
# let us submit the query
pass
@staticmethod
def post ( * * args ) :
"""
This function will submit a query to bigquery for insertion
"""
SQL = " " . join ( [ ' SELECT DISTINCT CAST( ' , args [ ' field ' ] , " AS STRING) AS values, COUNT(*) as counts FROM :dataset.:table :filter " ] ) . replace ( ' :dataset ' , args [ ' dataset ' ] )
SQL = SQL . replace ( ' :table ' , args [ ' table ' ] )
if args [ ' filter ' ] . strip ( ) != ' ' :
SQL = SQL . replace ( " :filter " , " WHERE " + args [ ' filter ' ] )
2019-10-07 17:07:33 +00:00
else :
SQL = SQL . replace ( " :filter " , " " )
2019-10-07 16:49:12 +00:00
SQL + = " " . join ( [ ' GROUP BY ' , args [ ' field ' ] , ' ORDER BY 1 ' ] )
TABLE_NAME = " . " . join ( [ args [ ' dataset ' ] , args [ ' table ' ] ] )
credentials = service_account . Credentials . from_service_account_file ( args [ ' key ' ] )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
df [ ' table ' ] = args [ ' table ' ]
df [ ' field ' ] = args [ ' field ' ]
# df['filter']= args['filter']
N = df . shape [ 0 ] + 10000
beg = np . random . randint ( 11 , 200 )
df [ ' encoded ' ] = np . random . choice ( np . arange ( beg , N ) , df . shape [ 0 ] , replace = False )
df = df [ [ ' table ' , ' field ' , ' values ' , ' counts ' , ' encoded ' ] ]
# print (df.head()[:5])
# sys.stdout.flush()
TABLE_NAME = " . " . join ( [ args [ ' dataset ' ] + DATASET_SUFFIX , PSEUDO_TABLENAME ] )
2019-12-12 17:04:41 +00:00
df . to_gbq ( TABLE_NAME , credentials = credentials , if_exists = ' append ' , chunksize = 10000 )
2019-10-07 16:49:12 +00:00
# df.to_gbq(TABLE_NAME.replace('.','_pseudo.'),credentials=credentials,if_exists='append')
class Builder :
"""
This class will build a dataset from encoded values
"""
def encode ( self , * * args ) :
"""
This function will create pseudonyms for a given table from the mapping tables
"""
SQL = " SELECT * FROM :dataset.:table limit 1 " . replace ( ' :dataset ' , args [ ' dataset ' ] ) . replace ( " :table " , args [ ' table ' ] )
credentials = service_account . Credentials . from_service_account_file ( args [ ' key ' ] )
columns = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' ) . columns . tolist ( )
TEMPLATE = [ ' (SELECT encoded FROM :dataset ' + DATASET_SUFFIX + ' . ' + PSEUDO_TABLENAME , " WHERE table= ' :table ' AND field = ' :name ' AND CAST(values AS STRING)=CAST(:table.:name AS STRING ) ) as :name " ]
SQL = [ " SELECT " ]
FIELDS = [ ]
2019-10-08 17:37:35 +00:00
FILTER = args [ ' filter ' ] if ' filter ' in args else " "
2019-10-07 16:49:12 +00:00
for field in columns :
FIELDS + = [ " " . join ( TEMPLATE ) . replace ( " :name " , field ) ]
2019-10-08 17:37:35 +00:00
# if field in FILTER :
# FILTER = FILTER.replace(field,'values')
2019-10-07 16:49:12 +00:00
SQL + = [ " , \n \t " . join ( FIELDS ) ]
SQL + = [ ' FROM :dataset.:table ' ]
2019-10-08 17:37:35 +00:00
if FILTER != " " :
SQL + = [ " WHERE " , FILTER ]
2019-10-07 16:49:12 +00:00
return ( " \n " . join ( SQL ) . replace ( " :dataset " , args [ ' dataset ' ] ) . replace ( ' :table ' , args [ ' table ' ] ) )
def process ( self , * * args ) :
"""
: dataset
: table
: key
"""
pseudonym . apply ( * * args )
def decode ( self , * * args ) :
"""
This function should be able to take a pseudonymized data frame and convert it to original values
. . .
"""
pass
class Binary :
"""
This is a utility class to import and export a data to / from a binary matrix
"""
def __stream ( self , column ) :
"""
This function will convert a column into a binary matrix with the value - space representing each column of the resulting matrix
: column a column vector i . e every item is a row
"""
2019-12-12 17:04:41 +00:00
# values = np.unique(column)
values = column . dropna ( ) . unique ( )
2019-10-07 16:49:12 +00:00
values . sort ( )
2019-12-12 17:04:41 +00:00
#
# Let's treat the case of missing values i.e nulls
#
2019-10-07 16:49:12 +00:00
row_count , col_count = column . size , values . size
2019-12-12 17:04:41 +00:00
2019-10-07 16:49:12 +00:00
matrix = [ np . zeros ( col_count ) for i in np . arange ( row_count ) ]
#
# let's create a binary matrix of the feature that was passed in
# The indices of the matrix are inspired by classical x,y axis
2019-12-12 17:04:41 +00:00
if col_count > 0 and values . size > 1 :
for yi in np . arange ( row_count ) :
value = column [ yi ]
if value not in values :
continue
xi = np . where ( values == value )
xi = xi [ 0 ] [ 0 ] #-- column index
matrix [ yi ] [ xi ] = 1
2019-10-07 16:49:12 +00:00
return matrix
def Export ( self , df ) :
"""
This function will convert a data - frame to a binary matrix
: return _map , matrix
"""
#
# This will give us a map of how each column was mapped to a bitstream
2019-12-12 17:04:41 +00:00
2020-02-18 08:59:39 +00:00
# _map = df.fillna(np.nan).apply(lambda column: self.__stream(column),axis=0)
_map = df . fillna ( ' ' ) . apply ( lambda column : self . __stream ( column ) , axis = 0 )
2019-12-12 17:04:41 +00:00
2019-10-07 16:49:12 +00:00
#
# We will merge this to have a healthy matrix
_matrix = _map . apply ( lambda row : list ( list ( itertools . chain ( * row . values . tolist ( ) ) ) ) , axis = 1 )
2020-02-18 08:59:39 +00:00
_matrix = np . matrix ( [ list ( item ) for item in _matrix ] ) . astype ( np . float32 )
2019-10-07 16:49:12 +00:00
#
# let's format the map so we don't have an unreasonable amount of data
#
columns = _map . columns . tolist ( )
beg = 0
end = 0
_map = _map . loc [ 0 ]
_m = { }
for name in columns :
end + = _map [ name ] . size
_m [ name ] = { " start " : beg , " end " : end }
beg = end
2020-02-18 08:59:39 +00:00
# return _m,_matrix.astype(np.float32)
return _matrix
2019-10-07 16:49:12 +00:00
def Import ( self , df , values , _map ) :
"""
This function will convert a binary stream into a
: values original / pseudonymed values
: _map field map of the binary matrix
"""
r = pd . DataFrame ( None , columns = _map . keys ( ) )
for key in _map :
i = np . arange ( _map [ key ] [ ' start ' ] , _map [ key ] [ ' end ' ] )
columns = values [ key ]
r [ key ] = df [ i ] . apply ( lambda row : np . array ( columns ) [ row == 1 ] [ 0 ] , axis = 1 )
return r
pass
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
2019-12-12 17:04:41 +00:00
# df = pd.DataFrame({"fname":['james','james','steve','kevin','kevin'],"lname":["bond","dean","nyemba",'james','johnson']})
# df['age'] = (np.random.sample(df.shape[0]) * 100).astype(np.int32)
2019-10-07 16:49:12 +00:00
if __name__ == ' __main__ ' :
"""
Run the program from the command line passing the following mandatory arguments
python bridge . py < [ - - pseudo | - - export < PATH > ] > - - dataset < dataset > - - table < tablename > [ - - filter < table filter > ]
- - pseudo will create pseudonyms for a given
- - export will export data to a specified location
"""
has_basic = ' dataset ' in SYS_ARGS . keys ( ) and ' table ' in SYS_ARGS . keys ( ) and ' key ' in SYS_ARGS . keys ( )
has_action = ' export ' in SYS_ARGS . keys ( ) or ' pseudo ' in SYS_ARGS . keys ( )
if has_basic and has_action :
builder = Builder ( )
if ' export ' in SYS_ARGS :
print ( )
print ( " exporting .... " )
if not os . path . exists ( SYS_ARGS [ ' export ' ] ) :
os . mkdir ( SYS_ARGS [ ' export ' ] )
SQL = builder . encode ( * * SYS_ARGS )
2019-10-08 17:37:35 +00:00
#
# Assuming the user wants to filter the records returned :
#
2019-10-07 16:49:12 +00:00
credentials = service_account . Credentials . from_service_account_file ( SYS_ARGS [ ' key ' ] )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
FILENAME = os . sep . join ( [ SYS_ARGS [ ' export ' ] , SYS_ARGS [ ' table ' ] + ' .csv ' ] )
#
# This would allow us to export it to wherever we see fit
print ( FILENAME )
df . to_csv ( FILENAME , index = False )
2019-10-08 17:37:35 +00:00
f = open ( FILENAME . replace ( ' .csv ' , ' .sql ' ) , ' w+ ' )
f . write ( SQL )
f . close ( )
2019-10-07 16:49:12 +00:00
elif ' pseudo ' in SYS_ARGS :
builder . process ( * * SYS_ARGS )
else :
print ( " " )
2019-12-12 17:04:41 +00:00
print ( SYS_ARGS . keys ( ) )
2019-10-07 16:49:12 +00:00
print ( " has basic " , has_basic )
print ( " has action " , has_action )
# pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json')
# args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"}
# builder = Builder()
# # builder.encode(dataset='wgan_original',table='person',key='./curation-test-2.json')
# builder.process(**args)