parser/healthcareio/x12/plugins/default/_common.py

456 lines
16 KiB
Python

from typing import Any
import numpy as np
import json
from multiprocessing import Process, RLock
import os
import io
import queue
import transport
from transport import providers
class Store(Process):
"""
This is the data-store service that will handle read/writes
"""
dataStore = None
@staticmethod
def init(self,**_args):
if Store.dataStore is None :
_args = _args['store']
else:
pass
@staticmethod
def reset():
pass
class X12DOCUMENT (Process):
"""
X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
"""
_queue = queue.Queue()
class MODE :
#
# The following allow us to handle raw content (stream) or a filename
# The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
#
NAMES,STREAM = 'NAMES','STREAM'
class ConfigHandler :
def format(self,**_args):
"""
This function formats variations of an element's parsing rules
:info {index,field|label,map}
"""
_info = _args['info']
_ref = {}
for _item in _info :
_index = str(_item['index'])
_field = _item['field'] if 'field' in _item else None
_label = _item['label'] if 'label' in _item else None
if _field :
_ref[_index] = {'field':_field}
elif _label :
_ref[_index] = {'label':_label}
return {'@ref':_ref}
def _getColumnsIndexes(self,_columns,_indexes,_map):
"""
This function return columns and indexes related if a parsing map is passed
:param _columns
:param _indexes
:param _map parsing map (field:index)
"""
# @TODO: insure the lengths are the same for adequate usage downstream ...
_xcolumns,_xindexes = list(_map.keys()), list(_map.values())
keys,values = _xcolumns + _columns,_xindexes + _indexes
_config = dict(zip(keys,values))
_outColumns,_outIndexes = list(_config.keys()),list(_config.values())
return _outColumns,_outIndexes
def _getObjectAtributes(self,_config):
_field = _config['field'] if 'field' in _config else {}
_label = _config['label'] if 'label' in _config else {}
return _field,_label
def merge(self,**_args):
#
# This function overrides the old configuration with the new configuration specifications
#
# _columns,_indexes = [],[]
_columns,_indexes = _args['columns'],_args['index']
_map = {}
_config = _args['config'] if 'config' in _args else {}
_field,_label = self._getObjectAtributes(_config)
if 'map' in _config :
_map = _args['config']['map']
_columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
if '@ref' in _config :
# _columns,_indexes = [],[]
_row = _args['row']
_ref = _config['@ref']
for _anchor in _ref:
# print ([_anchor,_anchor == _row[1].strip()])
if _anchor == _row[1].strip() :
_field,_label = self._getObjectAtributes(_ref[_anchor])
_map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
if _map :
_columns,_indexes = self._getColumnsIndexes([],[],_map)
break
# _columns,_indexes = _columns + _map.keys()
return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
def legacy(self,**_args):
#
# This function returns the legacy configuration (default parsing)
#
_config = _args['config'] if 'config' in _args else {}
_field,_label = self._getObjectAtributes(_config)
_columns,_indexes = [],[]
if 'map' in _config :
_columns = list(_config['map'].keys())
_indexes = list(_config['map'].values())
return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
def override(self,**_args):
return _args['columns'],_args['indexes']
def __init__(self,**_args):
super().__init__()
self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
if 'files' in _args :
self.files = _args['files']
self._config = _args['config'] if 'config' in _args else {}
self._document = []
self._x12FileType = None
self._configHandler = X12DOCUMENT.ConfigHandler()
#
#-- The files need to be classified, the files need to be either claims or remits
#
if 'store' not in self._config :
self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
else:
self._store_args = self._config['store']
def init(self,_header):
"""
Expected Elements must include ST
"""
pass
def merge (self,_x,_y):
"""
This function will merge two objects _x, _y
"""
_zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
if _zcols :
_out = dict(_x,**{})
for _key in _y.keys() :
if not _key in _zcols :
_out[_key] = _y[_key]
else:
if type(_out[_key]) == list :
_out[_key] += _y[_key]
elif type(_out[_key]) == dict:
_out[_key] = dict(_out[_key],**_y[_key])
else:
_out[_key] = _y[_key]
return _out
else:
return dict(_x,**_y)
def split(self,content):
"""
This function will split the content of an X12 document into blocks and headers
:content x12 document in raw format (text)
"""
#_content = content.split('~')
_content = content.split('HL')
_header = _content[:1][0].split('~')
_blocks = ['HL'+_item for _item in _content[1:]]
_blocks = [_item.split('~') for _item in _blocks ]
# for row in _content :
# if not _blocks and not row.startswith('HL') :
# _header.append(row)
# else:
# _blocks.append(row)
return {'header':_header,'blocks':_blocks}
def parse (self,columns,index,**_args):
"""
This function encapulates how an x12 document element will be processed
:columns list of attributes that make up the object
:index indexes of the said items in the element
:_args
- row raw x12 element (string)
- config configuration of the element. his should indicate functions to apply against function
"""
_ELEMENT = _args['row'][0]
#
# get the right configuration from the _config object
_config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
# _field = _config['field'] if 'field' in _config else None
# _label = _config['label'] if 'label' in _config else None
_map = _config['map'] if 'map' in _config else {}
#
# Let's see if overriding the fields/labels isn't necessary
# columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
# _field = _field if not _refField else _refField
# _label = _label if not _refLabel else _refLabel
_outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
_field,_label = _outInfo['field'],_outInfo['label']
_columns,_index = _outInfo['columns'],_outInfo['index']
if 'row' in _args:
_row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
_index = np.array(_index)
#
# Sometimes the _row doesn't have all expected indexes, we will compensate
# This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
#
if np.max(_index) > len(_row) -1 :
_delta = 1 + np.max(_index) - len(_row)
_row = _row + np.repeat('',_delta).tolist()
_row = np.array(_row)
# _element = _row[0]
_configKeys = [] #list(self._config.keys())
_configTree = [] #list(self._config.values())
if 'config' in _args :
_config = _args['config']
_configKeys = list(_config.keys())
_configTree = list(_config.values())
else:
_config = {}
_info = dict(zip(_columns,_row[_index].tolist()))
_document = _args['document'] if 'document' in _args else {}
#
# Extracting configuration (minimal information)
# _config = _args['config'] if 'config' in _args else {}
# _config = self._config
# if '@ref' in _config :
# print (_config['@ref'])
# _values = _config['@ref']
# print (_values)
if _field :
if not _field in _document :
return {_field:_info}
else:
return self.merge(_document[_field],_info)
elif _label :
if not _label in _document :
return {_label:[_info]}
else:
return _document[_label] + [_info]
else:
return _info
else:
return columns
def elements(self):
"""
This function returns elements that are supported as specified by X12 standard
"""
return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
def pointers(self):
"""
This function returns pointers associated with each element ...
:return Object of Element:Function
"""
_attr = self.elements()
_pointers = [getattr(self,_name) for _name in _attr]
return dict(zip(_attr,_pointers))
def set(self,_info,_document,_config):
_attrName,_attrType = None,None
if 'label' in _config :
_attrType = 'label'
_attrName = _config['label']
elif 'field' in _config :
_attrType = 'field'
_attrName = _config['field']
if _attrName :
if _attrName not in _document :
_document[_attrName] = [] if _attrType == 'label' else {}
#
# @TODO: make sure we don't have a case of an attribute being overridden
if type(_document[_attrName]) == list :
_document[_attrName] += [_info]
else:
_document[_attrName] = dict(_document[_attrName],**_info)
# _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
return _document
return dict(_document,**_info)
pass
def log (self,**_args):
pass
def run(self):
"""
This function will trigger the workflow associated with a particular file
"""
_getContent = {
#
# For the sake of testing, the following insures
# that raw string content is handled as if it were a file
#
X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
}
_writer = transport.factory.instance(**self._store_args)
for _filename in self.files :
try:
_documents = []
_parts = []
# _content = (open(_filename)).read()
_reader = _getContent[self._mode]
_content = _reader(_filename).read()
_info = self.split(_content)
_fileType=self.init(_content)
_header = self.apply(_info['header'])
# print (json.dumps(_header))
for _content in _info['blocks'] :
_body = self.apply(_content,header=_header)
_doc = self.merge(_header,_body)
if _doc and 'claim_id' in _doc:
# X12DOCUMENT._queue.put(_document)
_documents += [_doc]
except Exception as e:
#
# @TODO: Log this issue for later analysis ...
print (e)
pass
#
# Let us post this to the documents we have, we should find a place to post it
#
if _documents :
# print (_header['header'])
self.post(document=_documents,writer=_writer)
break
def post(self,**_args):
"""
This function is intended to post content to a given location
:param document
:param writer
"""
_writer = _args['writer'] if 'writer' in _args else None
_document = _args['document']
if not _writer:
X12DOCUMENT._queue.put(_document)
else:
_writer.write(_document)
def _getConfig(self,_chunk):
#
# Let us determine what kind of file we are dealing with, so we can extract the configuration
# For this we need to look for the ST loop ...
#
line = [line for line in _chunk if line and line[:2] == 'ST' ]
if line :
#
# We found the header of the block, so we can set the default configuration
#
self._x12FileType = line[0].split('*')[1].strip()
_config = {}
if self._x12FileType :
_config = self._config[self._x12FileType]
return _config
def apply(self,_chunk, header = {}):
"""
_chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
"""
_document,_cached = {},{}
_pointers = self.pointers()
_config = self._getConfig(_chunk)
#
# The configuration comes from the file, let's run this in merge mode
# _config = self._configHandler.merge
_pid = None
for line in _chunk :
segments = line.split('*')
_ELEMENT = segments[0]
if _ELEMENT not in _pointers or not _ELEMENT:
continue
if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
_pid = _ELEMENT
if _pid not in _cached :
_cached [_pid] = {}
_pointer = _pointers[_ELEMENT]
_args = {'row':segments,'document':_document,'header':header,'config':(_config)}
_parsedLine = _pointer(**_args)
# print ([_pid,_ELEMENT,_parsedLine])
_cached[_pid] = self.merge(_cached[_pid],_parsedLine)
#
# Let's create the documents as we understand them to be
# @TODO: Create a log so there can be visibility into the parser
#
_document = {}
for _id in _cached :
# print ('patient' in _cached[_id] )
_document = self.merge(_document,_cached[_id])
return _document