From f26795387ef60690512ee33b3dcfaee2ca7c5c10 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 7 Apr 2021 15:30:59 -0500 Subject: [PATCH] feature: bootstrap-like with candidates --- data/gan.py | 26 +++++++++---- data/maker/__init__.py | 14 +++++-- data/maker/prepare/__init__.py | 2 +- pipeline.py | 71 +++++++++++++++++++++++++++++++--- 4 files changed, 94 insertions(+), 19 deletions(-) diff --git a/data/gan.py b/data/gan.py index dd8ea6a..643e838 100644 --- a/data/gan.py +++ b/data/gan.py @@ -67,8 +67,9 @@ class GNet : self.NUM_GPUS = 0 else: self.NUM_GPUS = len(self.GPU_CHIPS) + # os.environ['CUDA_VISIBLE_DEVICES'] = str(self.GPU_CHIPS[0]) - self.PARTITION = args['partition'] + self.PARTITION = args['partition'] if 'partition' in args else None # if self.NUM_GPUS > 1 : # os.environ['CUDA_VISIBLE_DEVICES'] = "4" @@ -117,9 +118,14 @@ class GNet : for key in ['train','output'] : self.mkdir(os.sep.join([self.log_dir,key])) self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT])) + if 'partition' in args : + self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT,str(args['partition'])])) self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT]) self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT]) + if 'partition' in args : + self.train_dir = os.sep.join([self.train_dir,str(args['partition'])]) + self.out_dir = os.sep.join([self.out_dir,str(args['partition'])]) # if self.logger : # We will clear the logs from the data-store @@ -130,7 +136,7 @@ class GNet : # db.backup.insert({'name':column,'logs':list(db[column].find()) }) # db[column].drop() - def load_meta(self,column): + def load_meta(self,**args): """ This function is designed to accomodate the uses of the sub-classes outside of a strict dependency model. Because prediction and training can happen independently @@ -145,6 +151,9 @@ class GNet : setattr(self,key,value) self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT]) self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT]) + if 'partition' in args : + self.train_dir = os.sep.join([self.train_dir,str(args['partition'])]) + self.out_dir = os.sep.join([self.out_dir,str(args['partition'])]) def log_meta(self,**args) : @@ -265,9 +274,9 @@ class Generator (GNet): #tf.add_to_collection('glosses', loss) tf.compat.v1.add_to_collection('glosses', loss) return loss, loss - def load_meta(self, column): - super().load_meta(column) - self.discriminator.load_meta(column) + def load_meta(self, **args): + super().load_meta(**args) + self.discriminator.load_meta(**args) def network(self,**args) : """ This function will build the network that will generate the synthetic candidates @@ -454,6 +463,7 @@ class Train (GNet): # - determine if the GPU/CPU are busy # for i in self.GPU_CHIPS : #range(self.NUM_GPUS): + with tf.device('/gpu:%d' % i): with tf.name_scope('%s_%d' % ('TOWER', i)) as scope: if self._LABEL is not None : @@ -559,9 +569,9 @@ class Predict(GNet): # self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value'] - def load_meta(self, column): - super().load_meta(column) - self.generator.load_meta(column) + def load_meta(self, **args): + super().load_meta(**args) + self.generator.load_meta(**args) self.ROW_COUNT = self.oROW_COUNT def apply(self,**args): suffix = self.CONTEXT #self.get.suffix() diff --git a/data/maker/__init__.py b/data/maker/__init__.py index bfd6a5f..803590a 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -112,7 +112,8 @@ def train (**_args): args ['max_epochs'] = _args['max_epochs'] args['matrix_size'] = _matrix.shape[0] args['batch_size'] = 2000 - args['partition'] = 0 if 'partition' not in _args else _args['partition'] + if 'partition' in _args : + args['partition'] = _args['partition'] if 'gpu' in _args : args['gpu'] = _args['gpu'] # os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' @@ -121,7 +122,8 @@ def train (**_args): # # @TODO: Write the map.json in the output directory for the logs # - f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w') + # f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w') + f = open(os.sep.join([trainer.out_dir,'map.json']),'w') f.write(json.dumps(_map)) f.close() @@ -140,7 +142,11 @@ def generate(**_args): :param context :param logs """ - f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json'])) + partition = _args['partition'] if 'partition' in _args else None + if not partition : + f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json'])) + else: + f = open(os.sep.join([_args['logs'],'output',_args['context'],str(partition),'map.json'])) _map = json.loads(f.read()) f.close() # if 'file' in _args : @@ -165,7 +171,7 @@ def generate(**_args): args['gpu'] = _args['gpu'] handler = gan.Predict (**args) - handler.load_meta(None) + handler.load_meta(column=None) # # Let us now format the matrices by reverting them to a data-frame with values # diff --git a/data/maker/prepare/__init__.py b/data/maker/prepare/__init__.py index ecb47bd..5ace56a 100644 --- a/data/maker/prepare/__init__.py +++ b/data/maker/prepare/__init__.py @@ -237,7 +237,7 @@ class Input : # # @NOTE: For some reason, there is an out of memory error created here, this seems to fix it (go figure) # - _matrix = np.array([np.repeat(0,cols.size) for i in range(row_count)]) + _matrix = np.array([np.repeat(0,cols.size) for i in range(0,row_count)]) [np.put(_matrix[i], np.where(cols == rows[i]) ,1)for i in np.arange(row_count) if np.where(cols == rows[i])[0].size > 0] # else: # _matrix = cp.zeros([row_count,cols.size]) diff --git a/pipeline.py b/pipeline.py index d73f1fc..3f8358b 100644 --- a/pipeline.py +++ b/pipeline.py @@ -146,6 +146,8 @@ class Components : _args['data'] = _args['data'][list(set(_args['data'].columns) - set(x_cols))] if 'gpu' in args : _args['gpu'] = self.set_gpu(gpu=args['gpu']) + if 'partition' in args : + _args['partition'] = args['partition'] if df.shape[0] and df.shape[0] : # # We have a full blown matrix to be processed @@ -154,7 +156,7 @@ class Components : print ("... skipping training !!") if 'autopilot' in ( list(args.keys())) : - + args['data'] = df print (['autopilot mode enabled ....',args['context']]) self.generate(args) @@ -171,6 +173,7 @@ class Components : r = np.random.dirichlet(values+.001) #-- dirichlet doesn't work on values with zeros _sd = values[values > 0].std() _me = values[values > 0].mean() + _mi = values.min() x = [] _type = values.dtype for index in np.arange(values.size) : @@ -182,7 +185,7 @@ class Components : value = values[index] - (values[index] * r[index]) # # randomly shifting the measurements - if np.random.choice([0,1],1)[0] and _me > _sd: + if np.random.choice([0,1],1)[0] and _me > _sd : if np.random.choice([0,1],1)[0] : value = value * np.divide(_me,_sd) else: @@ -273,6 +276,9 @@ class Components : args['candidates'] = 1 if 'candidates' not in args else int(args['candidates']) if 'gpu' in args : args['gpu'] = self.set_gpu(gpu=args['gpu']) + # if 'partition' in args : + # args['logs'] = os.sep.join([args['logs'],str(args['partition'])]) + _info = {"module":"gan-prep","action":"prune","shape":{"rows":args['data'].shape[0],"columns":args['data'].shape[1]}} logger.write(_info) if args['data'].shape[0] > 0 and args['data'].shape[1] > 0 : @@ -459,12 +465,18 @@ if __name__ == '__main__' : # COLUMNS = DATA.columns # DATA = np.array_split(DATA,PART_SIZE) # args['schema'] = schema + GPU_CHIPS = SYS_ARGS['gpu'] if 'gpu' in SYS_ARGS else None + if GPU_CHIPS and type(GPU_CHIPS) != list : + GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS] + if 'gpu' in SYS_ARGS : + args['gpu'] = GPU_CHIPS + jobs = [] if 'generate' in SYS_ARGS : # # Let us see if we have partitions given the log folder content = os.listdir( os.sep.join([args['logs'],'train',args['context']])) - generator = Components() + # if ''.join(content).isnumeric() : # # @@ -508,13 +520,60 @@ if __name__ == '__main__' : # else: # generator.generate(args) # Components.generate(args) - generator.generate(args) + if '--all-chips' in SYS_ARGS and GPU_CHIPS: + index = 0 + jobs = [] + for _id in GPU_CHIPS : + _args = copy.deepcopy(args) + _args['gpu'] = [int(_gpu)] + _args['partition'] = index + index += 1 + make = lambda _params: (Components()).generate(_params) + job = Process(target=make,args=( dict(_args),)) + job.name = 'Trainer # ' + str(index) + job.start() + jobs.append(job) + pass + else: + generator = Components() + generator.generate(args) else: # DATA = np.array_split(DATA,PART_SIZE) - agent = Components() - agent.train(**args) + # + # Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition + # @TODO: Find better name for partition + # + if GPU_CHIPS and '--all-chips' in SYS_ARGS: + index = 0 + + for _gpu in GPU_CHIPS : + _args = copy.deepcopy(args) + _args['gpu'] = [int(_gpu)] + _args['partition'] = index + index += 1 + make = lambda _params: (Components()).train(**_params) + job = Process(target=make,args=( dict(_args),)) + job.name = 'Trainer # ' + str(index) + job.start() + jobs.append(job) + + + + + else: + # + # The choice of the chip will be made internally + agent = Components() + agent.train(**args) + # + # If we have any obs we should wait till they finish + # + while len(jobs)> 0 : + jobs = [job for job in jobs if job.is_alive()] + time.sleep(2) + # jobs = [] # for index in range(0,PART_SIZE) : # if 'focus' in args and int(args['focus']) != index :