Merge pull request #12 from lnyemba/v2.0.4
bug fix: use plugins to refer to plugins
This commit is contained in:
commit
a4481865ea
|
@ -59,8 +59,8 @@ class IReader(IO):
|
||||||
def __init__(self,_agent,pipeline=None):
|
def __init__(self,_agent,pipeline=None):
|
||||||
super().__init__(_agent,pipeline)
|
super().__init__(_agent,pipeline)
|
||||||
def read(self,**_args):
|
def read(self,**_args):
|
||||||
if 'pipeline' in _args :
|
if 'plugins' in _args :
|
||||||
self._init_plugins(_args['pipeline'])
|
self._init_plugins(_args['plugins'])
|
||||||
_data = self._agent.read(**_args)
|
_data = self._agent.read(**_args)
|
||||||
if self._plugins and self._plugins.ratio() > 0 :
|
if self._plugins and self._plugins.ratio() > 0 :
|
||||||
_data = self._plugins.apply(_data)
|
_data = self._plugins.apply(_data)
|
||||||
|
@ -71,8 +71,8 @@ class IWriter(IO):
|
||||||
def __init__(self,_agent,pipeline=None):
|
def __init__(self,_agent,pipeline=None):
|
||||||
super().__init__(_agent,pipeline)
|
super().__init__(_agent,pipeline)
|
||||||
def write(self,_data,**_args):
|
def write(self,_data,**_args):
|
||||||
if 'pipeline' in _args :
|
if 'plugins' in _args :
|
||||||
self._init_plugins(_args['pipeline'])
|
self._init_plugins(_args['plugins'])
|
||||||
if self._plugins and self._plugins.ratio() > 0 :
|
if self._plugins and self._plugins.ratio() > 0 :
|
||||||
_data = self._plugins.apply(_data)
|
_data = self._plugins.apply(_data)
|
||||||
|
|
||||||
|
@ -82,10 +82,6 @@ class IWriter(IO):
|
||||||
# The ETL object in its simplest form is an aggregation of read/write objects
|
# The ETL object in its simplest form is an aggregation of read/write objects
|
||||||
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
|
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
|
||||||
|
|
||||||
def _ProcessWriter (_data,_args):
|
|
||||||
writer = transport.get.writer(**_args)
|
|
||||||
writer.write(_data)
|
|
||||||
|
|
||||||
class IETL(IReader) :
|
class IETL(IReader) :
|
||||||
"""
|
"""
|
||||||
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
|
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
|
||||||
|
@ -105,14 +101,6 @@ class IETL(IReader) :
|
||||||
|
|
||||||
for _kwargs in self._targets :
|
for _kwargs in self._targets :
|
||||||
self.post(_data,**_kwargs)
|
self.post(_data,**_kwargs)
|
||||||
# pthread = Process(target=_ProcessWriter,args=(_data,_kwargs))
|
|
||||||
# pthread.start()
|
|
||||||
# self.jobs.append(pthread)
|
|
||||||
|
|
||||||
# if not self._hasParentProcess :
|
|
||||||
# while self.jobs :
|
|
||||||
# jobs = [pthread for pthread in self.jobs if pthread.is_alive()]
|
|
||||||
# time.sleep(1)
|
|
||||||
|
|
||||||
return _data
|
return _data
|
||||||
def post (self,_data,**_args) :
|
def post (self,_data,**_args) :
|
||||||
|
|
Loading…
Reference in New Issue