Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from itertools import combinations
- import warnings
- from more_itertools import consecutive_groups
- import pandas as pd
- import numpy as np
- warnings.simplefilter(action='ignore', category=FutureWarning)
- class DataQualityAssessmentService():
- def __init__(self, well_log_file_path: str, num_point_to_be_short: int = 100, norm_step: float = 0.1, allowable_constant_points: int = 50, corr_mnems_coef: float = 1):
- """ read dictionary of mnemonics """
- with open("dict_mnemonics_units_v2_lith_wl_strata_full.txt", "r") as fp:
- self.dict_mnemonics_units = json.load(fp)
- """ read list of strange nan """
- with open("strange_nans.json", "r") as fp:
- self.strange_nans = json.load(fp)
- """ read dict of value ranges for each mnem """
- with open("mnemonic_min_max_value_range.json", "r", encoding = 'utf-8') as fp:#_lith_wl_strata_full
- self.dict_value_ranges = json.load(fp)
- self.constant_mnemonics = []
- for mnemonic in self.dict_mnemonics_units.keys():
- if self.dict_mnemonics_units[mnemonic]['constant_mnemonic']:
- self.constant_mnemonics += self.dict_mnemonics_units[mnemonic]['possible_mnemonics']
- """ read las file """
- self.las = lasio.read(well_log_file_path)
- mnemonics = [i.lower() for i in self.las.keys()]
- df_logs = pd.DataFrame(self.las.data, columns = mnemonics)
- self.df = df_logs
- """ Criterion name """
- self.accept = ''
- self.reject = 'не пройден'
- """ set params for criteria """
- self.num_point_to_be_short = num_point_to_be_short #(~10 м при шаге 0.1)
- self.norm_step = norm_step
- self.allowable_constant_points = allowable_constant_points
- self.corr_mnems_coef = corr_mnems_coef
- def get_strange_nans(self) -> str:
- nan_in_df_list = [nan for nan in self.strange_nans['Nans'] if nan in self.df.values]
- df_without_lith_strata = self.df[[i for i in self.df.columns if i not in ['lith', 'strata']]]
- details = ''
- if len(nan_in_df_list) > 0:
- good_nans = sum([df_without_lith_strata[i].isna().sum() for i in df_without_lith_strata.columns])
- strange_nans_found = sum([np.sum(df_without_lith_strata.values == x) for x in nan_in_df_list])
- # Percent of norm nan among all nans
- share_of_strange_values = strange_nans_found / (good_nans + strange_nans_found)
- if strange_nans_found > 0:
- details = 'Найдено ' + str(strange_nans_found) + ' измерен. со значениями из списка: ' + str(nan_in_df_list)
- if share_of_strange_values > 0:
- return self.reject, details
- return self.accept, details
- # Replace strange values
- def replace_strange_nans(self) -> pd.DataFrame:
- nan_in_df_list = [nan for nan in self.strange_nans if nan in self.df.values]
- # If nan values not in dataframe do nothing
- if len(nan_in_df_list) == 0:
- df_return = self.df
- # If nan values in dataframe replace it to nan
- else:
- for tr_ind in nan_in_df_list:
- self.df = self.df.replace(tr_ind, np.nan)
- df_return = self.df.astype('float64')
- return df_return
- # Find empty columns
- def get_empty_columns(self) -> str:
- # Replace strange nan
- df = self.replace_strange_nans()
- # Quality checking
- initial_cols = len(df.columns)
- non_empty_cols = len(df.dropna(how='all', axis=1).columns)
- empty_col_mnemonics = [col for col in df.columns if col not in df.dropna(how='all', axis=1).columns]
- details = ''
- if initial_cols != non_empty_cols:
- share_of_empty_cols = (initial_cols - non_empty_cols) / initial_cols
- if share_of_empty_cols > 0:
- details = 'Найдены пустые колонки в списке мнемоник ' + str(empty_col_mnemonics)
- return self.reject
- return self.accept, details
- def get_cols_and_dept_mnem(self) -> tuple:
- cols = list(self.df.columns)
- lower_df_keys = [i.lower() for i in cols]
- dept_mnem = [
- value for value in lower_df_keys
- if value in self.dict_mnemonics_units['DEPT']['possible_mnemonics']
- ][0]
- dept_mnem = self.df.columns[lower_df_keys.index(dept_mnem)]
- return cols, dept_mnem
- # Find gaps in depth in logs (in number of measurements)
- def get_event_data_loss(self) -> str:
- cols, dept_mnem = self.get_cols_and_dept_mnem()
- cols.remove(dept_mnem)
- gaps_point = 0
- point_in_intervals_filled_should_be = 0
- dict_details = {}
- for col_name in cols:
- not_null_col_subdf = self.df[self.df[col_name].notnull()]
- last_top_nan_ind = not_null_col_subdf.index[0]
- first_bottom_nan_ind = not_null_col_subdf.index[-1] + 1
- sub_df = self.df[last_top_nan_ind:first_bottom_nan_ind]
- nan_gap_values = len(sub_df[sub_df[col_name].isna()])
- if nan_gap_values > 0:
- dict_details[col_name] = nan_gap_values
- gaps_point += nan_gap_values
- point_in_intervals_filled_should_be += len(sub_df)
- share_of_gap_values = gaps_point / point_in_intervals_filled_should_be
- details = ''
- if share_of_gap_values > 0:
- details = 'Пропущено измерений ' + str(list(dict_details.values())) + ' в списке мнемоник ' + str(list(dict_details.keys())) + ' соответственно'
- return self.reject, details
- return self.accept, details
- # Values are out of ranges, for example, negative values or greater than the maximum, less than the minimum
- # def get_values_out_of_range(self):
- # for col in selfdf.columns:
- # self.dict_value_ranges
- # raise NotImplementedError
- # Spikes or sudden changes which are implausible for the domain. (Recognize through gradients and max deviations)
- # def get_value_spikes(self):
- # raise NotImplementedError
- # Non-regular depth step, step more than 10 cm (set this value)
- def get_wrong_depth_step(self) -> str:
- # """ check if there is str in some cols """
- # target_col = []
- # for col in self.df.columns:
- # try:
- # self.df[col].astype(float)
- # except:
- # target_col.append(col)
- # if len(target_col) > 0:
- cols, dept_mnem = self.get_cols_and_dept_mnem()
- error = ''
- try:
- depth_diff = np.diff(list(self.df[dept_mnem]))
- except:
- error = 'Файл содержит строковые значения'
- return 'ошибка', error
- greater_steps_list = [i for i in depth_diff if round(i, 2) > self.norm_step]
- percent = round(len(greater_steps_list)/len(self.df[dept_mnem])*100, 3)
- diff_greater_than_step = len(greater_steps_list)
- details = ''
- if diff_greater_than_step > 0:
- details = 'Шаг записи превышает ' + str(self.norm_step) + ' м в ' + str(round(percent, 3)) + ' % измерений и варьируется от ' + str(round(min(greater_steps_list),3)) + ' до ' + str(round(max(greater_steps_list),3)) + ' м'
- return self.reject, details
- return self.accept, details
- # The value is not to the optimal level of detail
- # def get_rounded_measurement_value(self):
- # raise NotImplementedError
- # Small changes which are not in the process but result
- # From inaccurate measurements. (Recognize with low pass filter)
- # def get_signal_noise(self):
- # raise NotImplementedError
- # Func used in data_not_updated personally
- # def get_const_subsequent_in_log_found(self, col: str) -> str:
- # log = np.array(self.df[col])
- # ind_of_const_subsequent_values = list(np.where(log[1:] == log[:-1])[0])
- # iterable = ind_of_const_subsequent_values
- # const_subsequent_values_groups = [list(group) for group in consecutive_groups(iterable)]
- # found_const_intervals = [
- # i for i in const_subsequent_values_groups
- # if len(i) > self.allowable_constant_points
- # ]
- # if len(found_const_intervals) > 0:
- # return 'found' # , col
- # else:
- # return 'not found' # , col
- # Find value spikes
- def get_data_not_updated(self):
- # allowable_constant_points = 50 # 5 M
- columns_ = [col for col in self.df.columns if col not in self.constant_mnemonics]
- target_cols = []
- dict_details = []
- for col in columns_:
- log = np.array(self.df[col])
- ind_of_const_subsequent_values = list(np.where(log[1:] == log[:-1])[0])
- iterable = ind_of_const_subsequent_values
- const_subsequent_values_groups = [list(group) for group in consecutive_groups(iterable)]
- found_const_intervals = [
- i for i in const_subsequent_values_groups
- if len(i) > self.allowable_constant_points
- ]
- if len(found_const_intervals) > 0:
- target_cols.append(col)
- dict_details.append([[log[found_const_intervals[i][0]] for i in range(0, len(found_const_intervals))], [len(found_const_intervals[i]) for i in range(0, len(found_const_intervals))]])
- details = ''
- if len(target_cols) > 0:
- details = ''.join(['В ' + str(target_cols[i]) + ' непрерывные постоянные значения из списка ' + str(dict_details[i][0]) + ' с кол-ом измерений из списка ' + str(dict_details[i][1]) + ' соответственно. ' for i in range(0,len(target_cols))])
- return self.reject, details
- return self.accept, details
- # Values which are normally correlated behave unexpectedly
- # def get_divergent_despite_correlation(self):
- # raise NotImplementedError
- # Units of measurement are presented
- def get_presence_of_units(self) -> str:
- cols_with_empty_units = [curve.mnemonic.lower() for curve in self.las.curves if curve.unit in ['', ' ']]
- details = ''
- if len(cols_with_empty_units) > 0:
- details = 'Мнемоники без ед.измерения: ' + ', '.join(cols_with_empty_units)
- return self.reject, details
- return self.accept, details
- # Different data formats, e.g. float vs. string etc.
- def get_data_formats(self) -> str:
- target_col = []
- for col in self.df.columns:
- try:
- self.df[col].astype(float)
- except:
- target_col.append(col)
- details = ''
- if len(target_col) > 0:
- print('reject')
- details = 'В мнемониках из списка ' + str(target_col) + ' присутствуют не числовые (строковые) значения'
- return self.reject, details
- return self.accept, details
- # Duplicated values in columns
- def get_repeated_columns(self) -> str:
- letters = self.df.columns
- pair_of_cols = list(combinations(letters, 2))
- try:
- coffel_list = [np.corrcoef(self.df[list(i)].dropna().values.T)[0][1] for i in pair_of_cols]
- mnems_pairs_correlated = '; '.join(['(' + ', '.join(pair_of_cols[i]) + ')' for i in range(0, len(coffel_list)) if coffel_list[i] >= self.corr_mnems_coef])
- coef_correlated = ', '.join([str(round(i, 2)) for i in coffel_list if i >= self.corr_mnems_coef])
- except:
- error = 'Файл содержит строковые значения'
- return 'ошибка', error
- share_of_pair_with_1_correlation = len([x for x in coffel_list if x >= self.corr_mnems_coef]) / len(coffel_list)
- details = ''
- if share_of_pair_with_1_correlation > 0:
- details = 'Коэф. корреляции пар мнемоник ' + mnems_pairs_correlated + ' превышают или равны значению ' + str(self.corr_mnems_coef) + ' и равны ' + coef_correlated + ' соответственно'
- return self.reject, details
- else:
- return self.accept, details
- # Duplicated mnemonic names
- def get_repeated_mnemonics(self) -> str:
- details = ''
- if len(list(set(self.df.columns))) < len(self.df.columns):
- details = 'Мнемоники из списка ' + ', '.join(set(self.df.columns[np.where(self.df.columns.duplicated())[0]])) + ' повторяются в файле'
- return self.reject, details
- return self.accept, details
- # Find value spikes
- def get_short_data_history(self) -> str:
- cols, dept_mnem = self.get_cols_and_dept_mnem()
- cols.remove(dept_mnem)
- colum_with_short_story = []
- for col_name in cols:
- if len(self.df[self.df[col_name].notnull()]) < self.num_point_to_be_short:
- colum_with_short_story.append(col_name)
- details = ''
- if len(colum_with_short_story) > 0:
- details = 'Мнемоники из списка ' + str(colum_with_short_story) + ' имеют короткую историю записи данных, < ' + str(num_point_to_be_short) + ' последовательных измерен.'
- return self.reject, details
- return self.accept, details
- def get_lithology_number(self) -> str:
- details = ''
- lith_mnem_in_df = [i for i in self.df.columns if i in self.dict_mnemonics_units['LITH']['possible_mnemonics']]
- if len(lith_mnem_in_df) > 0:
- if False in list(set(self.df[lith_mnem_in_df[0]].dropna() % 1 == 0)):
- details = 'Код литологии не целочисленный для мнемоники ' + str(lith_mnem_in_df[0])
- return self.reject, details
- return self.accept, details
- # if self.df['lith']
- # cols, dept_mnem = self.get_cols_and_dept_mnem()
- # cols.remove(dept_mnem)
- # colum_with_short_story = []
- # for col_name in cols:
- # if len(self.df[self.df[col_name].notnull()]) < self.num_point_to_be_short:
- # colum_with_short_story.append(col_name)
- # details = ''
- # if len(colum_with_short_story) > 0:
- # details = 'Мнемоники из списка ' + str(colum_with_short_story) + ' имеют короткую историю записи данных, < ' + str(num_point_to_be_short) + ' последовательных измерен.'
- # return self.reject, details
- # return self.accept, details
- # The level of noise changes over time/depth
- # def get_inconsistent_noise_level(self):
- # raise NotImplementedError
- # There are subpopulations that have different variabilities from others. (Detect via Goldfeld-Quandt test)
- # def get_heteroscedasticity(self):
- # raise NotImplementedError
- # def get_extremum_depths(self) -> tuple:
- # _, dept_mnem = self.get_cols_and_dept_mnem()
- # df = self.df.dropna(how='all')
- # min_depth, max_depth = min(df[dept_mnem]), max(df[dept_mnem])
- # return min_depth, max_depth
- def estimate_data_quality(self):
- # in_development = 'in_development'
- answer_get_strange_nans, details_get_strange_nans = self.get_strange_nans()
- answer_get_empty_columns, details_get_empty_columns = self.get_empty_columns()
- answer_event_data_loss, details_event_data_loss = self.get_event_data_loss()
- answer_wrong_depth_step, details_wrong_depth_step = self.get_wrong_depth_step()
- answer_data_not_updated, details_data_not_updated = self.get_data_not_updated()
- answer_presence_of_units, details_presence_of_units = self.get_presence_of_units()
- answer_data_formats, details_data_formats = self.get_data_formats()
- answer_repeated_columns, details_repeated_columns = self.get_repeated_columns()
- answer_repeated_mnemonics, details_repeated_mnemonics = self.get_repeated_mnemonics()
- answer_short_data_history, details_short_data_history = self.get_short_data_history()
- answer_lithology_number, details_lithology_number = self.get_lithology_number()
- return {
- 'data_formats': [
- answer_data_formats,
- 'Data Formats',
- 'Данные в численном формате',
- 'Different data formats, e.g. float vs. string etc.',
- 'Наличие неприемлемых форматов данных, например, строк',
- details_data_formats,
- ],
- 'strange_nans': [
- answer_get_strange_nans,
- 'Strange NaNs',
- 'Странные пустые значения',
- "For example, '9999', ' - 999.25', ...",
- "Например, '9999', '-999.25', ...",
- details_get_strange_nans,
- ],
- 'empty_columns': [
- answer_get_empty_columns,
- 'Empty Columns',
- 'Пустые колонки',
- 'There are empty columns',
- 'Наличие хотя бы одной пустой колонки в файле ГИС',
- details_get_empty_columns,
- ],
- 'event_data_loss': [
- answer_event_data_loss,
- 'Event Data Loss',
- 'Пропуски в данных ',
- 'There are gaps in the event data/depths',
- 'Наличие пропусков в данных по глубине',
- details_event_data_loss,
- ],
- # 'values_out_of_range': [
- # in_development,
- # 'Values out of Range',
- # 'Значения вне физ. диапазона',
- # 'Values are out of ranges, for example, negative values '
- # 'or greater than the maximum, less than the minimum',
- # 'Значения выходят за пределы диапазона, например, отрицательные '
- # 'значения или больше макс., меньше мин. допустимого значения',
- # ],
- # 'value_spikes': [
- # in_development,
- # 'Value Spikes',
- # 'Всплеск значений',
- # 'Spikes or sudden changes which are implausible for '
- # 'the domain (recognize through gradients and max deviations)',
- # 'Всплески или внезапные изменения, неправдоподобные для записей '
- # 'ГИС (распознавание по градиентам и максимальным отклонениям)',
- # ],
- 'wrong_depth_step': [
- answer_wrong_depth_step,
- 'Wrong Depth Step',
- 'Нарушение шага глубины',
- 'Non regular depth step, step more than ' + str(self.norm_step) + ' м (set this value)',
- 'Неравномерный шаг глубины, шаг более ' + str(self.norm_step) + ' м (задайте это значение)',
- details_wrong_depth_step,
- ],
- # 'rounded_measurement_value': [
- # in_development,
- # 'Rounded Measurement Value',
- # 'Округленное значение ',
- # 'The value is not to the optimal level of detail',
- # 'Значение не соответствует оптимальному уровню детализации',
- # ],
- # 'signal_noise': [
- # in_development,
- # 'Signal Noise',
- # 'Шумное локальное измерение',
- # 'Small changes which are not in the process but result '
- # 'from inaccurate measurements (recognize with low pass filter)',
- # 'Небольшие изменения, которые не происходят в процессе, но являются '
- # 'результатом неточные измерения. (Распознать с помощью фильтра нижних частот)',
- # ],
- 'data_not_updated': [
- answer_data_not_updated,
- 'Data Not Updated',
- 'Отсутствие обновления данных',
- 'Data is not up-to-date (sensors might still display old values)',
- 'Данные не обновляются (датчики могут отображать старые/предыдущие значения)',
- details_data_not_updated,
- ],
- # 'divergent_despite_correlation': [
- # in_development,
- # 'Divergent Despite Correlation',
- # 'Расходящиеся, несмотря на корреляцию',
- # 'Values which are normally correlated behave unexpectedly',
- # 'Значения, которые обычно коррелированы, ведут себя подозрительно',
- # ],
- 'presence_of_units': [
- answer_presence_of_units,
- 'Presence of Units',
- 'Наличие всех единиц измерений',
- 'Units of measurement are presented',
- 'Представлены все единицы измерения для мнемоник',
- details_presence_of_units,
- ],
- 'repeated_columns': [
- answer_repeated_columns,
- 'Repeated Columns',
- 'Повторяющиеся столбцы',
- 'Duplicated values in columns',
- 'Повторяющиеся значения в столбцах',
- details_repeated_columns,
- ],
- 'repeated_mnemonics': [
- answer_repeated_mnemonics,
- 'Repeated Mnemonics',
- 'Повторяющиеся мнемоники',
- 'Duplicated mnemonic names',
- 'Повторяющиеся названия мнемоник',
- details_repeated_mnemonics,
- ],
- 'short_data_history': [
- answer_short_data_history,
- 'Short Data History',
- 'Короткая запись данных',
- 'The history of recorded data is too short for a good analysis',
- 'История записи данных слишком коротка для качественного анализа',
- details_short_data_history,
- ],
- 'float_lithology_id': [
- answer_lithology_number,
- 'Non-integer values of lithology indices',
- 'Не целочисленные значения индексов литологии',
- 'Non-integer values of lithology indices',
- 'Не целочисленные значения индексов литологии',
- details_lithology_number,
- ],
- # 'inconsistent_noise_level': [
- # in_development,
- # 'Inconsistent Noise Level',
- # 'Непостоянный уровень шума',
- # 'The level of noise changes over time/depth',
- # 'Уровень шума меняется со временем/глубиной',
- # ],
- # 'heteroscedasticity': [
- # in_development,
- # 'Heteroscedasticity',
- # 'Гетероскедастичность',
- # 'There are subpopulations that have different variabilities '
- # 'from others (detect via Goldfeld-Quandt test)',
- # 'Наличие субпопуляции, отличающейся изменчивостью '
- # 'от другие (тест Гольдфельда-Квандта)',
- # ],
- }
- pd.set_option('display.max_colwidth', -1)
- first_well = list(las_interpret_staratigraph_pathes['Месторождение 1']['без куста'].keys())[0]
- las_path = las_interpret_staratigraph_pathes['Месторождение 1']['без куста'][first_well][0]
- data_quality = DataQualityAssessmentService(las_path, num_point_to_be_short = 100, norm_step = 0.1, corr_mnems_coef = 1)
- d_q_dict = data_quality.estimate_data_quality()
- criteria_name = [d_q_dict[i][2] for i in d_q_dict.keys()]
- criteria_name_details = []
- for ind, i in enumerate(criteria_name):
- criteria_name_details.append(str(ind) + '. ' + i)
- criteria_name_details.append(str(ind) + '. ' + 'Детали')
- criteria_well_results = pd.DataFrame([], columns = ['Скважина'] + ['Качество данных, %'] + criteria_name_details)
- well_names = las_interpret_staratigraph_pathes['Месторождение 1']['без куста'].keys()
- for well in tqdm(well_names):
- las_path = las_interpret_staratigraph_pathes['Месторождение 1']['без куста'][well][0]
- data_quality = DataQualityAssessmentService(las_path, num_point_to_be_short = 100, norm_step = 0.1, corr_mnems_coef = 1)
- d_q_dict = data_quality.estimate_data_quality()
- # print(well, d_q_dict['data_not_updated'])
- criteria_answer = [d_q_dict[i][0] for i in d_q_dict.keys()]
- criteria_details = [d_q_dict[i][-1] for i in d_q_dict.keys()]
- # print('======== ', criteria_details)
- val = []
- for i in range(0, len(criteria_name)):
- val.append(criteria_answer[i])
- val.append(criteria_details[i])
- dict_criteria_well = {}
- for i in range(0, len(val)):
- dict_criteria_well[criteria_name_details[i]] = val[i]
- # dict_criteria_well = {criteria_name[i]: criteria_answer[i] for i in range(len(criteria_name))}
- dict_criteria_well['Качество данных, %'] = round(len([i for i in dict_criteria_well.values() if i == ''])/len(dict_criteria_well)*100, 2)
- dict_criteria_well['Скважина'] = well
- # print(well)
- # print(dict_criteria_well)
- criteria_well_results = criteria_well_results.append(dict_criteria_well, ignore_index = True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement