bug fix: ETL jobs and streamline

This commit is contained in:
Steve Nyemba 2023-05-30 15:01:51 -05:00
parent 695c10e797
commit ed66370fdf
1 changed files with 51 additions and 36 deletions

View File

@ -91,20 +91,24 @@ class ETL (Process):
super().__init__() super().__init__()
self.name = _args['id'] if 'id' in _args else 'UNREGISTERED' self.name = _args['id'] if 'id' in _args else 'UNREGISTERED'
if 'provider' not in _args['source'] : # if 'provider' not in _args['source'] :
#@deprecate # #@deprecate
self.reader = transport.factory.instance(**_args['source']) # self.reader = transport.factory.instance(**_args['source'])
else: # else:
# # #
# This is the new interface # # This is the new interface
_args['source']['context'] = 'read' # _args['source']['context'] = 'read'
# self.reader = transport.instance(**_args['source'])
self.reader = transport.instance(**_args['source'])
# #
# do we have an sql query provided or not .... # do we have an sql query provided or not ....
# self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None
self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None
self._oargs = _args['target'] #transport.factory.instance(**_args['target']) # self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
self._source = dict(_args ['source'],**{'context':'write'})
self._target = dict(_args['target'],**{'context':'read','lock':True})
self.JOB_COUNT = _args['jobs'] self.JOB_COUNT = _args['jobs']
self.jobs = [] self.jobs = []
# self.logger = transport.factory.instance(**_args['logger']) # self.logger = transport.factory.instance(**_args['logger'])
@ -113,46 +117,57 @@ class ETL (Process):
ETL.logger.info(**_args) ETL.logger.info(**_args)
def run(self): def run(self):
if self.cmd : # if self.cmd :
idf = self.reader.read(**self.cmd) # idf = self.reader.read(**self.cmd)
else: # else:
idf = self.reader.read() # idf = self.reader.read()
idf = pd.DataFrame(idf) # idf = pd.DataFrame(idf)
# idf = idf.replace({np.nan: None}, inplace = True) # # idf = idf.replace({np.nan: None}, inplace = True)
idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] # idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) # self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
# #
# writing the data to a designated data source # writing the data to a designated data source
# #
try: try:
_log = {"name":self.name,"rows":{"input":0,"output":0}}
_reader = transport.factory.instance(**self._source)
self.log(module='write',action='partitioning',jobs=self.JOB_COUNT) if 'table' in self._source :
rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT) _df = _reader.read()
else:
_df = _reader.read(**self._source['cmd'])
_log['rows']['input'] = _df.shape[0]
# #
# @TODO: locks # Let's write the input data-frame to the target ...
for i in np.arange(self.JOB_COUNT) : _writer = transport.factory.instance(**self._target)
# _id = ' '.join([str(i),' table ',self.name]) _writer.write(_df)
indexes = rows[i] _log['rows']['output'] = _df.shape[0]
segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
_name = "partition-"+str(i)
if segment.shape[0] == 0 :
continue
proc = Post(target = self._oargs,rows = segment,name=_name) # self.log(module='write',action='partitioning',jobs=self.JOB_COUNT)
self.jobs.append(proc) # rows = np.array_split(np.arange(0,idf.shape[0]),self.JOB_COUNT)
proc.start()
self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0]) # #
# # @TODO: locks
# for i in np.arange(self.JOB_COUNT) :
# # _id = ' '.join([str(i),' table ',self.name])
# indexes = rows[i]
# segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
# _name = "partition-"+str(i)
# if segment.shape[0] == 0 :
# continue
# proc = Post(target = self._oargs,rows = segment,name=_name)
# self.jobs.append(proc)
# proc.start()
# self.log(module='write',action='working',segment=str(self.name),table=self.name,rows=segment.shape[0])
# while self.jobs : # while self.jobs :
# jobs = [job for job in proc if job.is_alive()] # jobs = [job for job in proc if job.is_alive()]
# time.sleep(1) # time.sleep(1)
except Exception as e: except Exception as e:
print (e) print (e)
self.log(**_log)
def is_done(self): def is_done(self):
self.jobs = [proc for proc in self.jobs if proc.is_alive()] self.jobs = [proc for proc in self.jobs if proc.is_alive()]
return len(self.jobs) == 0 return len(self.jobs) == 0