adding shuffle feature to be used for very large spaces
This commit is contained in:
parent
f26795387e
commit
6a6352169c
88
pipeline.py
88
pipeline.py
|
@ -198,6 +198,52 @@ class Components :
|
||||||
return values
|
return values
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def shuffle(self,_args):
|
||||||
|
if 'data' in args :
|
||||||
|
df = data['data']
|
||||||
|
else:
|
||||||
|
reader = factory.instance(**args['store']['source'])
|
||||||
|
if 'file' in args :
|
||||||
|
df = pd.read_csv(args['file'])
|
||||||
|
else:
|
||||||
|
if 'row_limit' in args and 'sql' in args:
|
||||||
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
||||||
|
else:
|
||||||
|
df = reader.read(sql=args['sql'])
|
||||||
|
schema = None
|
||||||
|
if 'schema' not in args and hasattr(reader,'meta') and 'file' not in args:
|
||||||
|
schema = reader.meta(table=args['from'])
|
||||||
|
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
|
||||||
|
#
|
||||||
|
# We are shufling designated colmns and will be approximating the others
|
||||||
|
#
|
||||||
|
x_cols = [] #-- coumns tobe approximated.
|
||||||
|
_cols = [] #-- columns to be ignored
|
||||||
|
if 'continuous' in args :
|
||||||
|
x_cols = args['continuous']
|
||||||
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
||||||
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
||||||
|
|
||||||
|
|
||||||
|
for name in list (set(df.columns) - set(_cols)) :
|
||||||
|
i = np.arange(df.shape[0])
|
||||||
|
np.random.shuffle(i)
|
||||||
|
if name in x_cols :
|
||||||
|
df[name] = self.approximate(df[name].values)
|
||||||
|
df[name] = df.iloc[i][name]
|
||||||
|
self.post(data=df,schema=schema,store=args['store']['target'])
|
||||||
|
def post(self,**_args) :
|
||||||
|
_schema = _args['schema'] if 'schema' in _args else None
|
||||||
|
writer = factory.instance(**_args['store'])
|
||||||
|
_df = _args['data']
|
||||||
|
if _schema :
|
||||||
|
|
||||||
|
for _item in _schema :
|
||||||
|
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
|
||||||
|
_df[_item['name']] = _df[_item['name']].astype(str)
|
||||||
|
writer.write(_df,schema=_schema,table=args['from'])
|
||||||
|
else:
|
||||||
|
writer.write(_df,table=args['from'])
|
||||||
|
|
||||||
# @staticmethod
|
# @staticmethod
|
||||||
def generate(self,args):
|
def generate(self,args):
|
||||||
|
@ -338,20 +384,25 @@ class Components :
|
||||||
|
|
||||||
_df = pd.DataFrame.join(df,_df)
|
_df = pd.DataFrame.join(df,_df)
|
||||||
|
|
||||||
if _schema :
|
# if _schema :
|
||||||
for _item in _schema :
|
# for _item in _schema :
|
||||||
if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
|
# if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
|
||||||
_df[_item['name']] = _df[_item['name']].astype(str)
|
# _df[_item['name']] = _df[_item['name']].astype(str)
|
||||||
|
|
||||||
pass
|
# pass
|
||||||
|
_params = {'data':_df,'store' : ostore}
|
||||||
if _schema :
|
if _schema :
|
||||||
writer.write(_df[cols],schema=_schema,table=args['from'])
|
_params ['schema'] = _schema
|
||||||
else:
|
self.post(**_params)
|
||||||
writer.write(_df[cols],table=args['from'])
|
# if _schema :
|
||||||
# writer.write(df,table=table)
|
# writer.write(_df[cols],schema=_schema,table=args['from'])
|
||||||
pass
|
# self.post(data=_df,schema=)
|
||||||
else:
|
# else:
|
||||||
|
# writer.write(_df[cols],table=args['from'])
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
# else:
|
||||||
|
# pass
|
||||||
|
|
||||||
|
|
||||||
# #
|
# #
|
||||||
|
@ -537,7 +588,20 @@ if __name__ == '__main__' :
|
||||||
else:
|
else:
|
||||||
generator = Components()
|
generator = Components()
|
||||||
generator.generate(args)
|
generator.generate(args)
|
||||||
|
elif 'shuffle' in SYS_ARGS :
|
||||||
|
index = 0
|
||||||
|
if GPU_CHIPS and '--all-chips':
|
||||||
|
|
||||||
|
for index in GPU_CHIPS :
|
||||||
|
publisher = lambda _params: ( Components() ).shuffle(_params)
|
||||||
|
job = Process (target = publisher,args=( dict(args)))
|
||||||
|
job.name = 'Shuffler #' + str(index)
|
||||||
|
job.start()
|
||||||
|
jobs.append(job)
|
||||||
|
else:
|
||||||
|
shuffler = Components()
|
||||||
|
shuffler.shuffle(args)
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
|
|
||||||
# DATA = np.array_split(DATA,PART_SIZE)
|
# DATA = np.array_split(DATA,PART_SIZE)
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -5,7 +5,7 @@ import sys
|
||||||
def read(fname):
|
def read(fname):
|
||||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
||||||
args = {"name":"data-maker",
|
args = {"name":"data-maker",
|
||||||
"version":"1.4.4",
|
"version":"1.4.5",
|
||||||
"author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
|
"author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
|
||||||
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
|
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
|
||||||
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
|
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
|
||||||
|
|
Loading…
Reference in New Issue