From ed66370fdff37b75cc590c1c817b9f322b4d718a Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 30 May 2023 15:01:51 -0500 Subject: [PATCH] bug fix: ETL jobs and streamline --- transport/etl.py | 87 ++++++++++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/transport/etl.py b/transport/etl.py index 83c6147..83018e6 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -91,20 +91,24 @@ class ETL (Process): super().__init__() self.name = _args['id'] if 'id' in _args else 'UNREGISTERED' - if 'provider' not in _args['source'] : - #@deprecate - self.reader = transport.factory.instance(**_args['source']) - else: - # - # This is the new interface - _args['source']['context'] = 'read' + # if 'provider' not in _args['source'] : + # #@deprecate + # self.reader = transport.factory.instance(**_args['source']) + # else: + # # + # # This is the new interface + # _args['source']['context'] = 'read' - self.reader = transport.instance(**_args['source']) + # self.reader = transport.instance(**_args['source']) + # # do we have an sql query provided or not .... # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None - self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None - self._oargs = _args['target'] #transport.factory.instance(**_args['target']) + # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None + # self._oargs = _args['target'] #transport.factory.instance(**_args['target']) + self._source = dict(_args ['source'],**{'context':'write'}) + self._target = dict(_args['target'],**{'context':'read','lock':True}) + self.JOB_COUNT = _args['jobs'] self.jobs = [] # self.logger = transport.factory.instance(**_args['logger']) @@ -113,46 +117,57 @@ class ETL (Process): ETL.logger.info(**_args) def run(self): - if self.cmd : - idf = self.reader.read(**self.cmd) - else: - idf = self.reader.read() - idf = pd.DataFrame(idf) - # idf = idf.replace({np.nan: None}, inplace = True) + # if self.cmd : + # idf = self.reader.read(**self.cmd) + # else: + # idf = self.reader.read() + # idf = pd.DataFrame(idf) + # # idf = idf.replace({np.nan: None}, inplace = True) - idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] - self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) + # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] + # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) # # writing the data to a designated data source # try: - - - self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) - rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) - + _log = {"name":self.name,"rows":{"input":0,"output":0}} + _reader = transport.factory.instance(**self._source) + if 'table' in self._source : + _df = _reader.read() + else: + _df = _reader.read(**self._source['cmd']) + _log['rows']['input'] = _df.shape[0] # - # @TODO: locks - for i in np.arange(self.JOB_COUNT) : - # _id = ' '.join([str(i),' table ',self.name]) - indexes = rows[i] - segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') - _name = "partition-"+str(i) - if segment.shape[0] == 0 : - continue + # Let's write the input data-frame to the target ... + _writer = transport.factory.instance(**self._target) + _writer.write(_df) + _log['rows']['output'] = _df.shape[0] + + # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) + # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) + + # # + # # @TODO: locks + # for i in np.arange(self.JOB_COUNT) : + # # _id = ' '.join([str(i),' table ',self.name]) + # indexes = rows[i] + # segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') + # _name = "partition-"+str(i) + # if segment.shape[0] == 0 : + # continue - proc = Post(target = self._oargs,rows = segment,name=_name) - self.jobs.append(proc) - proc.start() + # proc = Post(target = self._oargs,rows = segment,name=_name) + # self.jobs.append(proc) + # proc.start() - self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) + # self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) # while self.jobs : # jobs = [job for job in proc if job.is_alive()] # time.sleep(1) except Exception as e: print (e) - + self.log(**_log) def is_done(self): self.jobs = [proc for proc in self.jobs if proc.is_alive()] return len(self.jobs) == 0