diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 77effb3..bce8d65 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -28,6 +28,7 @@ class Learner(Process): super(Learner, self).__init__() self.ndx = 0 + self.lock = RLock() if 'gpu' in _args : os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu']) @@ -63,13 +64,21 @@ class Learner(Process): # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' # sel.max_epoc def log(self,**_args): - logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True) - _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'info':self.info['context'],**_args}) - logger.write(_args) - self.ndx += 1 - if hasattr(logger,'close') : - logger.close() - + self.lock.acquire() + try: + logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True) + _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'info':self.info['context'],**_args}) + logger.write(_args) + self.ndx += 1 + if hasattr(logger,'close') : + logger.close() + except Exception as e: + print () + print (_args) + print (e) + pass + finally: + self.lock.release() def get_schema(self): if self.store['source']['provider'] != 'bigquery' : return [{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] @@ -88,9 +97,8 @@ class Learner(Process): _args = {"schema":self.get_schema(),"data":self._df,"columns":columns} if self._map : _args['map'] = self._map - self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None - - _log = {'action':'data-prep','input':{'rows':self._df.shape[0],'cols':self._df.shape[1]} } + self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None + _log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } } self.log(**_log) class Trainer(Learner): """ @@ -139,7 +147,7 @@ class Trainer(Learner): # g.run() end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S') - _min = float(timedelta(end,beg).seconds/ 60) + _min = float((end-beg).seconds/ 60) _logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}} self.log(**_logs) self.generate = g @@ -293,12 +301,27 @@ class Generator (Learner): writer.write(_df,schema=_schema) self.log(**{'action':'write','input':{'rows':N,'candidates':len(_candidates)}}) -class Shuffle(Trainer): +class Shuffle(Generator): """ This is a method that will yield data with low utility """ def __init__(self,**_args): super().__init__(self) + def run(self): + + + self.initalize() + _index = np.arange(self._df.shape[0]) + np.random.shuffle(_index) + _iocolumns = self.info['columns'] + _ocolumns = list(set(self._df.columns) - set(_iocolumns) ) + _iodf = pd.DataFrame(self._df[_ocolumns],self._df.loc[_index][_iocolumns],index=np.arange(self._df.shape[0])) + self._df = self._df[_ocolumns].join(_iodf) + + + _log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}} + self.log(**_log) + self.post([self._df]) class factory : _infocache = {} @staticmethod @@ -313,4 +336,9 @@ class factory : :param autopilot will generate output automatically :param batch (default 2k) size of the batch """ - return Trainer(**_args) \ No newline at end of file + if 'apply' not in _args : + return Trainer(**_args) + elif _args['apply'] == 'shuffe' : + return Shuffle(**_args) + elif _args['apply'] == 'generate' : + return Generator(**_args) \ No newline at end of file diff --git a/data/maker/prepare/__init__.py b/data/maker/prepare/__init__.py index 1bf4872..50fcfdf 100644 --- a/data/maker/prepare/__init__.py +++ b/data/maker/prepare/__init__.py @@ -95,8 +95,9 @@ class Input : # MIN_SPACE_SIZE = 2 # self._columns = cols if cols else _df.apply(lambda col:None if col[0] == row_count or col[0] < MIN_SPACE_SIZE else col.name).dropna().tolist() # self._io = _df.to_dict(orient='records') - _df = self.df.nunique().T / self.df.shape[0] - self._io = pd.DataFrame(_df).astype(float).to_dict(orient='records') + _df = pd.DataFrame(self.df.nunique().T / self.df.shape[0]).T + self._io = (_df.to_dict(orient='records')) + except Exception as e: print (e) self._io = [] diff --git a/setup.py b/setup.py index 3822df5..c96877b 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,8 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() -args = {"name":"data-maker","version":"1.5.0","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", +args = {"name":"data-maker","version":"1.5.1", + "author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vumc.org","license":"MIT", "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow'] args['url'] = 'https://hiplab.mc.vanderbilt.edu/aou/data-maker.git'