Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from lasio.las_items import CurveItem
- from scipy import stats
- import numpy as np
- import pickle
- import lasio
- import string
- import langid
- # from common import settings
- # from utils.storages import MediaStorage
- # from wells.functions import decode_las
- # from wells.models import WellLogFile
- class DataMatchingService:
- def __init__(
- self,
- well_log_file,#: WellLogFile,
- language,#: str,
- confidence_level,#: float = 0.9,
- dict_of_mnemonic,
- ) -> None:
- # if settings.ENVIRONMENT in ['prod', 'dev']:
- # file = MediaStorage().open(well_log_file.file.name).file.read().decode('cp1251')
- # else:
- # file = well_log_file.file.path
- self.file = lasio.read(well_log_file)
- self.loaded_model = pickle.load(open(f'wl_18_unit_model_{language}.pickle', 'rb'))[0]
- self.language = language
- self.confidence_level = confidence_level
- self.mnemonics = dict_of_mnemonic#well_log_file.mnemonics
- # " read dict_of_mnemonic "
- # self.dict_of_mnemonic = dict_of_mnemonic
- def get_mnemonic(self, curve: CurveItem):# -> None | str:
- found_logs = []
- for mnemonic in self.mnemonics:
- if curve.mnemonic.lower() in mnemonic['possible_mnemonics']:
- found_logs.append(mnemonic['inlog_mnemonic'])
- mnemonic = None
- if len(found_logs) > 0:
- mnemonic = found_logs[0]
- return mnemonic
- def get_predicted_unit(self, curve: CurveItem):# -> None | str:
- predicted_unit = None
- if self.mnemonics:
- log_values = curve.data
- log_values = log_values[~np.isnan(log_values)]
- if len(log_values) > 0:
- median = np.median(log_values)
- mean = np.mean(log_values)
- mode = stats.mode(log_values)[0][0]
- min_ = np.min(log_values)
- max_ = np.max(log_values)
- input_ = np.array([[median, mean, mode, min_, max_]]).reshape(1, -1)
- probs = self.loaded_model.predict_proba(input_)
- if any(k >= self.confidence_level for k in probs[0]):
- predicted_unit = self.loaded_model.predict(input_)[0]
- return predicted_unit
- def get_found_unit(self, curve: CurveItem):# -> str:
- found_units = []
- for mnem_element in self.mnemonics:
- if curve.mnemonic.lower() in mnem_element['possible_mnemonics']:
- print('=======', mnem_element['inlog_unit_and_conversion_' + self.language])
- found_units.append(
- [
- [i for i in mnem_element['inlog_unit_and_conversion_' + self.language]
- if i['value'] == 1][0]['scale']
- ]
- )
- found_unit = None
- if len(found_units) > 0:
- found_unit = found_units[0][0]
- return found_unit
- def get_conversion_func(self, curve: CurveItem, found_unit: str, predicted_unit: str):# -> None | float:
- conversion_func = None
- if curve.unit.lower() == found_unit and found_unit is not None:
- conversion_func = float(1)
- if curve.unit.lower() != found_unit and found_unit is not None:
- conversion_func_list = []
- for mnem_element in self.mnemonics:
- if curve.mnemonic.lower() in mnem_element['possible_mnemonics']:
- current_lang = langid.classify(curve.unit.lower())[0]
- if len([i for i in mnem_element['inlog_unit_and_conversion_' + current_lang] if i['scale'] == curve.unit.lower()]) > 0:
- conversion_func_list.append(
- [i for i in mnem_element['inlog_unit_and_conversion_' + current_lang]
- if i['scale'] == curve.unit.lower()][0]['value']
- )
- if len(conversion_func_list) > 0:
- print(conversion_func_list)
- conversion_func = conversion_func_list[0]
- print('пппппппппппп',curve.mnemonic.lower(), conversion_func)
- return conversion_func
- def get_depth_values(self, curve: CurveItem):# -> None | str:
- # print('values',curve.data)
- return curve.data.tolist()
- def get_matching_matrix(self):# -> list:
- output = []
- for curve_id, curve in enumerate(self.file.curves):
- print(curve)
- print('unit curve',curve.unit.lower())
- #===extract depth values
- depth_values = self.file.data[:,[0, curve_id]]
- depth_values_no_nans = depth_values[~np.isnan(depth_values).any(axis=1)]
- depthes = list(depth_values_no_nans[:,0])
- values = list(depth_values_no_nans[:,1])
- #===extract depth values
- mnemonic = self.get_mnemonic(curve)
- predicted_unit = self.get_predicted_unit(curve)
- found_unit = self.get_found_unit(curve)
- conversion_func = self.get_conversion_func(curve, found_unit, predicted_unit)
- output.append(
- [
- curve.descr,
- curve.mnemonic,
- mnemonic,
- curve.unit.lower(),
- predicted_unit,
- found_unit,
- conversion_func,
- depthes,
- values
- ]
- )
- return output
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement