diff --git a/pipeline.py b/pipeline.py index 3f8358b..9d33873 100644 --- a/pipeline.py +++ b/pipeline.py @@ -198,6 +198,52 @@ class Components : return values pass + def shuffle(self,_args): + if 'data' in args : + df = data['data'] + else: + reader = factory.instance(**args['store']['source']) + if 'file' in args : + df = pd.read_csv(args['file']) + else: + if 'row_limit' in args and 'sql' in args: + df = reader.read(sql=args['sql'],limit=args['row_limit']) + else: + df = reader.read(sql=args['sql']) + schema = None + if 'schema' not in args and hasattr(reader,'meta') and 'file' not in args: + schema = reader.meta(table=args['from']) + schema = [{"name":_item.name,"type":_item.field_type} for _item in schema] + # + # We are shufling designated colmns and will be approximating the others + # + x_cols = [] #-- coumns tobe approximated. + _cols = [] #-- columns to be ignored + if 'continuous' in args : + x_cols = args['continuous'] + if 'ignore' in args and 'columns' in args['ignore'] : + _cols = self.get_ignore(data=df,columns=args['ignore']['columns']) + + + for name in list (set(df.columns) - set(_cols)) : + i = np.arange(df.shape[0]) + np.random.shuffle(i) + if name in x_cols : + df[name] = self.approximate(df[name].values) + df[name] = df.iloc[i][name] + self.post(data=df,schema=schema,store=args['store']['target']) + def post(self,**_args) : + _schema = _args['schema'] if 'schema' in _args else None + writer = factory.instance(**_args['store']) + _df = _args['data'] + if _schema : + + for _item in _schema : + if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] : + _df[_item['name']] = _df[_item['name']].astype(str) + writer.write(_df,schema=_schema,table=args['from']) + else: + writer.write(_df,table=args['from']) # @staticmethod def generate(self,args): @@ -338,20 +384,25 @@ class Components : _df = pd.DataFrame.join(df,_df) - if _schema : - for _item in _schema : - if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] : - _df[_item['name']] = _df[_item['name']].astype(str) + # if _schema : + # for _item in _schema : + # if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] : + # _df[_item['name']] = _df[_item['name']].astype(str) - pass + # pass + _params = {'data':_df,'store' : ostore} if _schema : - writer.write(_df[cols],schema=_schema,table=args['from']) - else: - writer.write(_df[cols],table=args['from']) - # writer.write(df,table=table) - pass - else: + _params ['schema'] = _schema + self.post(**_params) + # if _schema : + # writer.write(_df[cols],schema=_schema,table=args['from']) + # self.post(data=_df,schema=) + # else: + # writer.write(_df[cols],table=args['from']) + pass + # else: + # pass # # @@ -537,7 +588,20 @@ if __name__ == '__main__' : else: generator = Components() generator.generate(args) - + elif 'shuffle' in SYS_ARGS : + index = 0 + if GPU_CHIPS and '--all-chips': + + for index in GPU_CHIPS : + publisher = lambda _params: ( Components() ).shuffle(_params) + job = Process (target = publisher,args=( dict(args))) + job.name = 'Shuffler #' + str(index) + job.start() + jobs.append(job) + else: + shuffler = Components() + shuffler.shuffle(args) + pass else: # DATA = np.array_split(DATA,PART_SIZE) diff --git a/setup.py b/setup.py index 544f4b3..4eb869f 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = {"name":"data-maker", - "version":"1.4.4", + "version":"1.4.5", "author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']