parser/healthcareio/x12/plugins/default/common.py

525 lines
19 KiB
Python

from typing import Any
import numpy as np
import json
from multiprocessing import Process, RLock
import os
import io
import queue
import transport
from transport import providers
class Store(Process):
"""
This is the data-store service that will handle read/writes
"""
dataStore = None
@staticmethod
def init(self,**_args):
if Store.dataStore is None :
_args = _args['store']
else:
pass
@staticmethod
def reset():
pass
class X12DOCUMENT (Process):
"""
X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
"""
_queue = queue.Queue()
class MODE :
#
# The following allow us to handle raw content (stream) or a filename
# The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
#
NAMES,STREAM = 'NAMES','STREAM'
class ConfigHandler :
def format(self,**_args):
"""
This function formats variations of an element's parsing rules
:info {index,field|label,map}
"""
_info = _args['info']
_ref = {}
for _item in _info :
_index = str(_item['index'])
_field = _item['field'] if 'field' in _item else None
_label = _item['label'] if 'label' in _item else None
if _field :
_ref[_index] = {'field':_field}
elif _label :
_ref[_index] = {'label':_label}
return {'@ref':_ref}
def _getColumnsIndexes(self,_columns,_indexes,_map):
"""
This function return columns and indexes related if a parsing map is passed
:param _columns
:param _indexes
:param _map parsing map (field:index)
"""
# @TODO: insure the lengths are the same for adequate usage downstream ...
_xcolumns,_xindexes = list(_map.keys()), list(_map.values())
keys,values = _xcolumns + _columns,_xindexes + _indexes
_config = dict(zip(keys,values))
_outColumns,_outIndexes = list(_config.keys()),list(_config.values())
return _outColumns,_outIndexes
def _getObjectAtributes(self,_config):
_field = _config['field'] if 'field' in _config else {}
_label = _config['label'] if 'label' in _config else {}
return _field,_label
def consolidate(self,**_args):
#
# This function overrides the old configuration with the new configuration specifications
#
# _columns,_indexes = [],[]
_columns,_indexes = _args['columns'],_args['index']
_map = {}
_config = _args['config'] if 'config' in _args else {}
_field,_label = self._getObjectAtributes(_config)
if 'map' in _config :
_map = _args['config']['map']
_columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
if '@ref' in _config :
# _columns,_indexes = [],[]
_row = _args['row']
_ref = _config['@ref']
for _anchor in _ref:
if _anchor == _row[1].strip() :
_field,_label = self._getObjectAtributes(_ref[_anchor])
_map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
if _map :
_columns,_indexes = self._getColumnsIndexes([],[],_map)
else:
# print ([_anchor,_indexes,_columns])
_map = dict(zip(_columns,_indexes))
pass
break
# _columns,_indexes = _columns + _map.keys()
_out = {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
return _out
def legacy(self,**_args):
#
# This function returns the legacy configuration (default parsing)
#
_config = _args['config'] if 'config' in _args else {}
_field,_label = self._getObjectAtributes(_config)
_columns,_indexes = [],[]
if 'map' in _config :
_columns = list(_config['map'].keys())
_indexes = list(_config['map'].values())
return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
def override(self,**_args):
return _args['columns'],_args['indexes']
def __init__(self,**_args):
super().__init__()
self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
self.lastelement = {} # This to store in the loop
if 'files' in _args :
self.files = _args['files']
self._config = _args['config'] if 'config' in _args else {}
#
# NM1 is a fluid type and thus will be cached in order to recreate the hierarchy
# @TODO:
# -add this to the configuration
#
self._hierarchy = {'NM1':['N1','N2','N3','N4']}
self._document = []
self._x12FileType = None
self._configHandler = X12DOCUMENT.ConfigHandler()
#
#-- The files need to be classified, the files need to be either claims or remits
#
if 'store' not in self._config :
self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
else:
self._store_args = self._config['store']
def init(self,_header):
"""
Expected Elements must include ST
"""
pass
def merge (self,_x,_y):
"""
This function will merge two objects _x, _y
"""
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in _y.keys() :
if not _key in _zcols :
_out[_key] = _y[_key]
else:
if type(_out[_key]) == list :
_out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
else:
_out[_key] = _y[_key]
return _out
else:
return dict(_x,**_y)
def split(self,content):
"""
This function will split the content of an X12 document into blocks and headers
:content x12 document in raw format (text)
"""
#_content = content.split('~')
_content = content.split('HL')
xchar = '~\n' if '~\n' in _content[0] else '~'
_header = _content[:1][0].split(xchar) #.split('~')
_blocks = ['HL*'+_item for _item in _content[1:]]
# xchar = '~\n' if '~\n' in _blocks[0] else '~'
_blocks = [_item.split(xchar) for _item in _blocks ]
# for row in _content :
# if not _blocks and not row.startswith('HL') :
# _header.append(row)
# else:
# _blocks.append(row)
return {'header':_header,'blocks':_blocks}
def parse (self,columns,index,**_args):
"""
This function encapulates how an x12 document element will be processed
:columns list of attributes that make up the object
:index indexes of the said items in the element
:_args
- row raw x12 element (string)
- config configuration of the element. his should indicate functions to apply against function
"""
_ELEMENT = _args['row'][0]
#
# get the right configuration from the _config object
_config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
# _field = _config['field'] if 'field' in _config else None
# _label = _config['label'] if 'label' in _config else None
_map = _config['map'] if 'map' in _config else {}
#
# Let's see if overriding the fields/labels isn't necessary
# columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
# _field = _field if not _refField else _refField
# _label = _label if not _refLabel else _refLabel
#
# @TODO:
# There should be a priority in parsing i.e plugin - config
#
if 'plugin-context' in _args :
_config = _args['plugin-context'] #dict(_config,**_args['plugin-context'])
_outInfo = self._configHandler.consolidate(row=_args['row'],columns=columns,index=index,config=_config)
_field,_label = _outInfo['field'],_outInfo['label']
_columns,_index = _outInfo['columns'],_outInfo['index']
if 'row' in _args:
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
_index = np.array(_index)
#
# Sometimes the _row doesn't have all expected indexes, we will compensate
# This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
#
if np.max(_index) > len(_row) -1 :
_delta = 1 + np.max(_index) - len(_row)
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
_info = dict(zip(_columns,_row[_index].tolist()))
_document = _args['document'] if 'document' in _args else {}
#
# @TODO:
# Apply parsing/casting function to the object retrieved
# _apply(_info) #-- the object will be processed accordingly
#
#
# @TODO:
# The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4
# - Find a way to drive this from a configuration ...
#
if _field :
if not _field in _document :
_item = {_field:_info}
else:
_item = self.merge(_document[_field],_info)
elif _label :
if not _label in _document :
_item = {_label:[_info]}
else:
_item = _document[_label] + [_info]
else:
_item = _info
if _ELEMENT in self._hierarchy and _field:
# print ([_field,_item])
self.lastelement = _item
pass
else:
for key in self._hierarchy :
if _ELEMENT in self._hierarchy[key] :
_ikey = list(self.lastelement.keys())[0]
_oldinfo = self.lastelement[_ikey]
if type(_oldinfo) != dict :
#
# This is we should log somewhere to suggest an issue happened
#
# self.log(action='error',input=_row)
pass
else:
_item = {_ikey: self.merge(_oldinfo,_item)}
break
pass
return _item
else:
#
#
print (_config)
return columns
def elements(self):
"""
This function returns elements that are supported as specified by X12 standard
"""
return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
def pointers(self):
"""
This function returns pointers associated with each element ...
:return Object of Element:Function
"""
_attr = self.elements()
_pointers = [getattr(self,_name) for _name in _attr]
return dict(zip(_attr,_pointers))
def set(self,_info,_document,_config):
_attrName,_attrType = None,None
if 'label' in _config :
_attrType = 'label'
_attrName = _config['label']
elif 'field' in _config :
_attrType = 'field'
_attrName = _config['field']
if _attrName :
if _attrName not in _document :
_document[_attrName] = [] if _attrType == 'label' else {}
#
# @TODO: make sure we don't have a case of an attribute being overridden
if type(_document[_attrName]) == list :
_document[_attrName] += [_info]
else:
_document[_attrName] = dict(_document[_attrName],**_info)
# _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
return _document
return dict(_document,**_info)
pass
def log (self,**_args):
print(_args)
def parseBlocks (self,_blocks,_header):
"""
This function extracts blocks and returns them to the caller,
Blocks of a document are made of transactional loops, that constitute a patients claim
"""
_tmp = {}
_documents = []
for _content in _blocks :
_body = self.apply(_content,header=_header)
_doc = self.merge(_header,_body)
# self.log(action='parse',section='body',input=_content[0])
if _doc and 'claim_id' in _doc:
# X12DOCUMENT._queue.put(_document)
# self.log(action='parse',section='document')
_documents += [self.merge(_tmp,_doc)]
_tmp = {}
else:
#
# The document is being built and not yet ready
_tmp = self.merge(_tmp,_doc)
return _documents
def run(self):
"""
This function will trigger the workflow associated with a particular file
"""
for _filename in self.files :
# self.log(action='parse',section='file',input=_filename)
try:
_documents = []
_parts = []
if os.sep in _filename and os.path.exists(_filename) :
_reader = open(_filename)
else:
#
# This is a stream, we are wrapping it into an appropriate structure
#
_reader = io.StringIO(_filename)
#
# Let us log the mode we have set ...
_content = _reader.read()
if hasattr(_reader,'close') :
_reader.close()
_info = self.split(_content)
_fileType=self.init(_content)
_header = self.apply(_info['header'])
if _info['blocks'] :
#
# processing blocks for the current claim
#
_documents = self.parseBlocks(_info['blocks'],_header)
except Exception as e:
#
# @TODO: Log this issue for later analysis ...
print (e)
pass
# #
# # Let us post this to the documents we have, we should find a place to post it
# #
if _documents :
# print (_header['header'])
_writer = transport.factory.instance(**self._store_args)
self.post(document=_documents,writer=_writer)
def post(self,**_args):
"""
This function is intended to post content to a given location
:param document
:param writer
"""
_writer = _args['writer'] if 'writer' in _args else None
_document = _args['document']
if not _writer:
X12DOCUMENT._queue.put(_document)
else:
_writer.write(_document)
def _getConfig(self,_chunk):
#
# Let us determine what kind of file we are dealing with, so we can extract the configuration
# For this we need to look for the ST loop ...
#
line = [line for line in _chunk if line and line[:2] == 'ST' ]
if line :
#
# We found the header of the block, so we can set the default configuration
#
self._x12FileType = line[0].split('*')[1].strip()
_config = {}
if self._x12FileType :
_config = self._config[self._x12FileType]
return _config
def apply(self,_chunk, header = {}):
"""
_chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
"""
_document,_cached = {},{}
_pointers = self.pointers()
_config = self._getConfig(_chunk)
#
# The configuration comes from the file, let's run this in merge mode
# _config = self._configHandler.merge
_pid = None
for line in _chunk :
segments = line.split('*')
_ELEMENT = segments[0]
if _ELEMENT not in _pointers or not _ELEMENT:
continue
if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
_pid = _ELEMENT
if _pid not in _cached :
_cached [_pid] = {}
_pointer = _pointers[_ELEMENT]
_args = {'row':segments,'document':_document,'header':header,'config':(_config)}
_parsedLine = _pointer(**_args)
if _pid in _cached :
_cached[_pid] = self.merge(_cached[_pid],_parsedLine)
else:
_cached[_pid] = _parsedLine
#
# Let's create the documents as we understand them to be
# @TODO: Create a log so there can be visibility into the parser
#
_document = {}
for _id in _cached :
# print ('patient' in _cached[_id] )
_document = self.merge(_document,_cached[_id])
return _document