Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tkinter as tk
- from tkinter import ttk, messagebox, filedialog
- import os
- import shutil
- import pandas as pd
- import matplotlib.pyplot as plt
- from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
- import numpy as np
- from datetime import datetime
- from PIL import Image, ImageTk
- import pytz
- from scipy.stats import pearsonr
- import threading
- import time
- class SeismicWaveAnalyzer:
- def __init__(self, root):
- self.root = root
- self.root.withdraw()
- # Directories
- self.data_dir = r"C:\SeismicWaveData"
- self.output_base_dir = os.path.join(self.data_dir, "Outputs")
- try:
- os.makedirs(self.data_dir, exist_ok=True)
- os.makedirs(self.output_base_dir, exist_ok=True)
- except Exception as e:
- messagebox.showerror("Error", f"Failed to create directories: {str(e)}")
- self.root.destroy()
- return
- self.show_splash_screen()
- def show_splash_screen(self):
- splash = tk.Toplevel()
- splash.title("Welcome")
- splash.configure(bg="#ffffff")
- splash.attributes('-fullscreen', True)
- splash.overrideredirect(True)
- try:
- img1 = Image.open(r"C:\SeismicWaveData\picture1.png")
- img1 = img1.resize((200, 200), Image.LANCZOS)
- photo1 = ImageTk.PhotoImage(img1)
- img1_label = tk.Label(splash, image=photo1, bg="#ffffff")
- img1_label.image = photo1
- img1_label.place(x=20, y=20)
- except Exception:
- img1_placeholder = tk.Label(splash, text="[Reference Image 1 Placeholder]", width=20, height=10, bg="#d3d3d3")
- img1_placeholder.place(x=20, y=20)
- try:
- img2 = Image.open(r"C:\SeismicWaveData\picture2.png")
- img2 = img2.resize((200, 200), Image.LANCZOS)
- photo2 = ImageTk.PhotoImage(img2)
- img2_label = tk.Label(splash, image=photo2, bg="#ffffff")
- img2_label.image = photo2
- img2_label.place(relx=1.0, y=20, anchor="ne")
- except Exception:
- img2_placeholder = tk.Label(splash, text="[Reference Image 2 Placeholder]", width=20, height=10, bg="#d3d3d3")
- img2_placeholder.place(relx=1.0, y=20, anchor="ne")
- text_frame = tk.Frame(splash, bg="#ffffff")
- text_frame.place(relx=0.5, rely=0.5, anchor="center")
- tk.Label(text_frame, text="Seismic Wave Analysis Tool", font=("Segoe UI", 20, "bold"), bg="#ffffff").pack(pady=10)
- tk.Label(text_frame, text="An FTT Mode Project", font=("Segoe UI", 16), bg="#ffffff").pack(pady=5)
- tk.Label(text_frame, text="Instrumentation & Engineering Geophysics Group", font=("Segoe UI", 14), bg="#ffffff").pack(pady=5)
- tk.Label(text_frame, text="Dr N Satyavani, Project Lead", font=("Segoe UI", 14, "italic"), bg="#ffffff").pack(pady=5)
- splash.after(6000, lambda: self.launch_main_window(splash))
- def launch_main_window(self, splash):
- splash.destroy()
- self.root.deiconify()
- self.root.title("Seismic Wave Analysis Tool")
- self.root.geometry("1000x700")
- self.root.state('zoomed')
- self.light_theme = {
- "bg": "#ffffff",
- "fg": "#212121",
- "entry_bg": "#f5f5f5",
- "accent": "#003087",
- "button_fg": "#ffffff",
- }
- self.root.configure(bg=self.light_theme["bg"])
- self.raw_tree = None
- self.quality_tree = None
- self.analysis_tree = None
- self.summary_tree = None
- self.layers_tree = None
- self.correlation_tree = None
- self.realtime_tree = None
- self.plot_label = None
- self.plot_canvas = None
- self.current_data = None
- self.raw_data = None
- self.current_file = None
- self.deviations = None
- self.quality_issues = None
- self.layers = None
- self.correlation_data = None
- self.realtime_data = None
- self.current_fig = None
- self.selected_rows = {}
- self.raw_data_min_depth_var = tk.StringVar()
- self.raw_data_max_depth_var = tk.StringVar()
- self.realtime_running = False
- # Visualization options
- self.plot_options = {
- "Vp/Vs Ratio": tk.BooleanVar(value=True),
- "Poisson's Ratio": tk.BooleanVar(value=True),
- "Shear Modulus": tk.BooleanVar(value=True),
- "Bulk Modulus": tk.BooleanVar(value=True),
- "Young's Modulus": tk.BooleanVar(value=True),
- "Lame's Lambda": tk.BooleanVar(value=True),
- "Travel Time Deviations": tk.BooleanVar(value=True),
- }
- self.min_depth_var = tk.StringVar()
- self.max_depth_var = tk.StringVar()
- self.setup_gui()
- self.update_clock()
- def setup_gui(self):
- menubar = tk.Menu(self.root)
- self.root.config(menu=menubar)
- file_menu = tk.Menu(menubar, tearoff=0)
- menubar.add_cascade(label="File", menu=file_menu)
- file_menu.add_command(label="Exit", command=self.root.quit)
- file_frame = ttk.Frame(self.root)
- file_frame.pack(pady=10, padx=10, fill=tk.X)
- ttk.Label(file_frame, text="Upload CHST File:", font=("Segoe UI", 10)).pack(side=tk.LEFT, padx=5)
- upload_button = ttk.Button(file_frame, text="Upload", command=self.upload_file, style="Custom.TButton")
- upload_button.pack(side=tk.LEFT, padx=5)
- ttk.Button(file_frame, text="Create Template File", command=self.create_template_file, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(file_frame, text="Import CSV to PDF", command=self.import_csv_to_pdf, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- refresh_button = ttk.Button(file_frame, text="Refresh", command=self.refresh_file_list, style="Custom.TButton")
- refresh_button.pack(side=tk.LEFT, padx=5)
- ttk.Label(file_frame, text="Select File:", font=("Segoe UI", 10)).pack(side=tk.LEFT, padx=5)
- self.file_var = tk.StringVar()
- self.file_combobox = ttk.Combobox(file_frame, textvariable=self.file_var, state="readonly")
- self.file_combobox.pack(side=tk.LEFT, padx=5)
- self.file_combobox.bind("<<ComboboxSelected>>", self.load_file_data)
- self.notebook = ttk.Notebook(self.root)
- self.notebook.pack(pady=10, padx=10, fill=tk.BOTH, expand=True)
- self.raw_data_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.raw_data_frame, text="Raw Data")
- self.setup_raw_data_tab()
- self.quality_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.quality_frame, text="Quality Check")
- self.setup_quality_tab()
- self.analysis_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.analysis_frame, text="Analysis")
- self.setup_analysis_tab()
- self.summary_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.summary_frame, text="Summary")
- self.setup_summary_tab()
- self.layers_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.layers_frame, text="Layers")
- self.setup_layers_tab()
- self.correlation_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.correlation_frame, text="Correlation Analysis")
- self.setup_correlation_tab()
- self.realtime_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.realtime_frame, text="Real-Time")
- self.setup_realtime_tab()
- self.visualization_frame = ttk.Frame(self.notebook)
- self.notebook.add(self.visualization_frame, text="Visualizations")
- self.setup_visualization_tab()
- export_frame = ttk.Frame(self.root)
- export_frame.pack(pady=5, fill=tk.X)
- ttk.Button(export_frame, text="Export Raw Data to CSV", command=self.export_raw_data, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(export_frame, text="Export Quality Check to CSV", command=self.export_quality_check, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(export_frame, text="Export Analysis to PDF", command=self.export_analysis_to_pdf, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(export_frame, text="Export Summary to CSV", command=self.export_summary, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(export_frame, text="Export Layers to CSV", command=self.export_layers, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(export_frame, text="Export Correlation to CSV", command=self.export_correlation, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(export_frame, text="Export Detailed Report", command=self.export_detailed_report, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- self.status_var = tk.StringVar()
- self.clock_var = tk.StringVar()
- status_frame = ttk.Frame(self.root)
- status_frame.pack(side=tk.BOTTOM, fill=tk.X)
- status_bar = tk.Label(status_frame, textvariable=self.status_var, bd=1, relief=tk.SUNKEN, anchor=tk.W, font=("Segoe UI", 10))
- status_bar.pack(side=tk.LEFT, fill=tk.X, expand=True)
- clock_label = tk.Label(status_frame, textvariable=self.clock_var, bd=1, relief=tk.SUNKEN, anchor=tk.E, font=("Segoe UI", 10))
- clock_label.pack(side=tk.RIGHT)
- self.apply_theme()
- self.refresh_file_list()
- def apply_theme(self):
- style = ttk.Style()
- style.theme_use('clam')
- style.configure("Custom.TFrame", background=self.light_theme["bg"])
- style.configure("Custom.TButton",
- background=self.light_theme["accent"],
- foreground=self.light_theme["button_fg"],
- font=("Segoe UI", 10),
- padding=8)
- style.configure("TCombobox", fieldbackground=self.light_theme["entry_bg"], foreground=self.light_theme["fg"])
- style.configure("Treeview", background=self.light_theme["entry_bg"], foreground=self.light_theme["fg"], fieldbackground=self.light_theme["entry_bg"])
- style.configure("Treeview.Heading", background=self.light_theme["bg"], foreground=self.light_theme["fg"])
- def update_clock(self):
- ist = pytz.timezone('Asia/Kolkata')
- current_time = datetime.now(ist).strftime("%a, %b %d, %Y %I:%M %p IST")
- self.clock_var.set(current_time)
- self.root.after(1000, self.update_clock)
- def setup_raw_data_tab(self):
- selection_frame = ttk.Frame(self.raw_data_frame)
- selection_frame.pack(fill=tk.X, pady=5)
- ttk.Label(selection_frame, text="Select Depth Range (m):").pack(side=tk.LEFT, padx=5)
- ttk.Entry(selection_frame, textvariable=self.raw_data_min_depth_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Label(selection_frame, text="to").pack(side=tk.LEFT)
- ttk.Entry(selection_frame, textvariable=self.raw_data_max_depth_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Button(selection_frame, text="Apply Selection", command=self.apply_selection, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(selection_frame, text="Reset Selection", command=self.reset_selection, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- frame = ttk.Frame(self.raw_data_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=canvas.xview)
- scrollable_frame = ttk.Frame(canvas)
- scrollable_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
- )
- canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
- self.raw_tree = ttk.Treeview(scrollable_frame, show="headings")
- self.raw_tree.pack(fill=tk.BOTH, expand=True)
- def setup_quality_tab(self):
- frame = ttk.Frame(self.quality_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=canvas.xview)
- scrollable_frame = ttk.Frame(canvas)
- scrollable_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
- )
- canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
- self.quality_tree = ttk.Treeview(scrollable_frame, show="headings")
- self.quality_tree.pack(fill=tk.BOTH, expand=True)
- def setup_analysis_tab(self):
- frame = ttk.Frame(self.analysis_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- control_frame = ttk.Frame(frame)
- control_frame.pack(fill=tk.X, pady=5)
- ttk.Button(control_frame, text="Run Monte Carlo Simulation", command=self.run_monte_carlo, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=canvas.xview)
- scrollable_frame = ttk.Frame(canvas)
- scrollable_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
- )
- canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
- self.analysis_tree = ttk.Treeview(scrollable_frame, show="headings")
- self.analysis_tree.pack(fill=tk.BOTH, expand=True)
- def setup_summary_tab(self):
- frame = ttk.Frame(self.summary_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=canvas.xview)
- scrollable_frame = ttk.Frame(canvas)
- scrollable_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
- )
- canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
- self.summary_tree = ttk.Treeview(scrollable_frame, show="headings")
- self.summary_tree.pack(fill=tk.BOTH, expand=True)
- def setup_layers_tab(self):
- frame = ttk.Frame(self.layers_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=canvas.xview)
- scrollable_frame = ttk.Frame(canvas)
- scrollable_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
- )
- canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
- self.layers_tree = ttk.Treeview(scrollable_frame, show="headings")
- self.layers_tree.pack(fill=tk.BOTH, expand=True)
- def setup_correlation_tab(self):
- frame = ttk.Frame(self.correlation_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- control_frame = ttk.Frame(frame)
- control_frame.pack(fill=tk.X, pady=5)
- ttk.Button(control_frame, text="Upload SPT/Sonic Log File", command=self.upload_correlation_file, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(control_frame, text="Run Correlation Analysis", command=self.run_correlation_analysis, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=canvas.xview)
- scrollable_frame = ttk.Frame(canvas)
- scrollable_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
- )
- canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
- self.correlation_tree = ttk.Treeview(scrollable_frame, show="headings")
- self.correlation_tree.pack(fill=tk.BOTH, expand=True)
- self.correlation_plot_label = ttk.Label(scrollable_frame, text="Upload SPT/Sonic Log file to view correlations.")
- self.correlation_plot_label.pack(fill=tk.BOTH, expand=True)
- def setup_realtime_tab(self):
- frame = ttk.Frame(self.realtime_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- control_frame = ttk.Frame(frame)
- control_frame.pack(fill=tk.X, pady=5)
- ttk.Button(control_frame, text="Start Real-Time Acquisition", command=self.start_realtime, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(control_frame, text="Stop Real-Time Acquisition", command=self.stop_realtime, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=canvas.xview)
- scrollable_frame = ttk.Frame(canvas)
- scrollable_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
- )
- canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
- self.realtime_tree = ttk.Treeview(scrollable_frame, show="headings")
- self.realtime_tree.pack(fill=tk.BOTH, expand=True)
- self.realtime_plot_label = ttk.Label(scrollable_frame, text="Start real-time acquisition to view data.")
- self.realtime_plot_label.pack(fill=tk.BOTH, expand=True)
- def setup_visualization_tab(self):
- frame = ttk.Frame(self.visualization_frame, style="Custom.TFrame")
- frame.pack(pady=10, fill=tk.BOTH, expand=True)
- options_frame = ttk.Frame(frame)
- options_frame.pack(fill=tk.X, pady=5)
- ttk.Label(options_frame, text="Select Parameters to Plot:", font=("Segoe UI", 10)).pack(side=tk.LEFT, padx=5)
- for param, var in self.plot_options.items():
- ttk.Checkbutton(options_frame, text=param, variable=var, command=self.update_plots).pack(side=tk.LEFT, padx=5)
- depth_frame = ttk.Frame(options_frame)
- depth_frame.pack(side=tk.LEFT, padx=10)
- ttk.Label(depth_frame, text="Depth Range (m):").pack(side=tk.LEFT)
- ttk.Entry(depth_frame, textvariable=self.min_depth_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Label(depth_frame, text="to").pack(side=tk.LEFT)
- ttk.Entry(depth_frame, textvariable=self.max_depth_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Button(depth_frame, text="Update Plot", command=self.update_plots, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(options_frame, text="Save Plots", command=self.save_plots, style="Custom.TButton").pack(side=tk.RIGHT, padx=5)
- self.canvas = tk.Canvas(frame)
- scrollbar_y = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=self.canvas.yview)
- scrollbar_x = ttk.Scrollbar(frame, orient=tk.HORIZONTAL, command=self.canvas.xview)
- self.scrollable_frame = ttk.Frame(self.canvas)
- self.scrollable_frame.bind(
- "<Configure>",
- lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
- )
- self.canvas.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
- scrollbar_y.pack(side=tk.RIGHT, fill=tk.Y)
- scrollbar_x.pack(side=tk.BOTTOM, fill=tk.X)
- self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
- self.plot_label = ttk.Label(self.scrollable_frame, text="Select a file to view visualizations.")
- self.plot_label.pack(fill=tk.BOTH, expand=True)
- def import_csv_to_pdf(self):
- file_path = filedialog.askopenfilename(filetypes=[("CSV Files", "*.csv"), ("All Files", "*.*")])
- if not file_path:
- return
- try:
- df = pd.read_csv(file_path)
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(os.path.basename(file_path))[0]
- tex_filename = f"{input_filename}_data_{timestamp}.tex"
- tex_path = os.path.join(output_dir, tex_filename)
- pdf_filename = f"{input_filename}_data_{timestamp}.pdf"
- pdf_path = os.path.join(output_dir, pdf_filename)
- latex_content = r"""
- \documentclass[a4paper,12pt]{article}
- \usepackage{booktabs}
- \usepackage{longtable}
- \usepackage[margin=1in]{geometry}
- \usepackage{amsmath}
- \usepackage{amsfonts}
- \usepackage{noto}
- \begin{document}
- \section*{Seismic Data Table}
- \begin{longtable}{@{}""" + "c" * len(df.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in df.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in df.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- \end{document}
- """
- with open(tex_path, 'w', encoding='utf-8') as f:
- f.write(latex_content)
- os.system(f'latexmk -pdf -outdir="{output_dir}" "{tex_path}"')
- if os.path.exists(pdf_path):
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Imported CSV and generated PDF: {pdf_path} at {timestamp}")
- messagebox.showinfo("Success", f"CSV imported and PDF generated successfully at {pdf_path}.")
- else:
- raise Exception("PDF compilation failed.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to import CSV or generate PDF: {str(e)}")
- def export_analysis_to_pdf(self):
- if self.current_data is None:
- messagebox.showwarning("No Data", "Please load a valid file before exporting analysis.")
- return
- try:
- analysis_df = self.analyze_data(self.current_data)
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(self.current_file)[0]
- tex_filename = f"{input_filename}_analysis_{timestamp}.tex"
- tex_path = os.path.join(output_dir, tex_filename)
- pdf_filename = f"{input_filename}_analysis_{timestamp}.pdf"
- pdf_path = os.path.join(output_dir, pdf_filename)
- latex_content = r"""
- \documentclass[a4paper,12pt]{article}
- \usepackage{booktabs}
- \usepackage{longtable}
- \usepackage[margin=1in]{geometry}
- \usepackage{amsmath}
- \usepackage{amsfonts}
- \usepackage{noto}
- \begin{document}
- \section*{Seismic Analysis Report}
- \begin{longtable}{@{}""" + "c" * len(analysis_df.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in analysis_df.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in analysis_df.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- \end{document}
- """
- with open(tex_path, 'w', encoding='utf-8') as f:
- f.write(latex_content)
- os.system(f'latexmk -pdf -outdir="{output_dir}" "{tex_path}"')
- if os.path.exists(pdf_path):
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Exported analysis to PDF: {pdf_path} at {timestamp}")
- messagebox.showinfo("Success", f"Analysis exported successfully to {pdf_path}.")
- else:
- raise Exception("PDF compilation failed.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to export analysis to PDF: {str(e)}")
- def refresh_file_list(self):
- files = [f for f in os.listdir(self.data_dir) if f.endswith(('.xlsx', '.csv'))]
- self.file_combobox['values'] = files
- self.file_var.set("")
- self.clear_tabs()
- self.status_var.set("Please select a file from the dropdown.")
- def upload_file(self):
- file_path = filedialog.askopenfilename(filetypes=[
- ("Excel and CSV Files", "*.xlsx *.csv"),
- ("Excel Files", "*.xlsx"),
- ("CSV Files", "*.csv"),
- ("All Files", "*.*")
- ])
- if file_path:
- try:
- dest_path = os.path.join(self.data_dir, os.path.basename(file_path))
- shutil.copy(file_path, dest_path)
- self.refresh_file_list()
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Uploaded: {os.path.basename(file_path)} at {timestamp}")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to upload file: {str(e)}")
- def create_template_file(self):
- try:
- columns = ["Depth (m)", "Elevation (m)", "P wave time (ms)", "S wave time (ms)"]
- data_rows = [
- [3.00, 100.00, 5.00, 10.00],
- [4.50, 98.50, 5.50, 11.00],
- [6.00, 97.00, 6.00, 12.00]
- ]
- df_template = pd.DataFrame(data_rows, columns=columns)
- template_path = os.path.join(self.data_dir, "template_seismic_data.xlsx")
- df_template.to_excel(template_path, index=False)
- self.refresh_file_list()
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Created template file: template_seismic_data.xlsx at {timestamp}")
- messagebox.showinfo("Success", "Template file created successfully: template_seismic_data.xlsx")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to create template file: {str(e)}")
- def find_header_row(self, df):
- for idx, row in df.iterrows():
- row_values = [str(val).strip() if pd.notna(val) else "" for val in row.values]
- if "Depth" in row_values:
- depth_col_idx = row_values.index("Depth")
- if idx + 1 < len(df):
- next_row = df.iloc[idx + 1]
- next_row_values = [str(val).strip() if pd.notna(val) else "" for val in next_row.values]
- if depth_col_idx < len(next_row_values) and next_row_values[depth_col_idx] == "(m)":
- return idx
- return None
- def load_file_data(self, event=None):
- selected_file = self.file_var.get()
- if not selected_file:
- return
- self.clear_tabs()
- self.current_file = selected_file
- file_path = os.path.join(self.data_dir, selected_file)
- try:
- if selected_file.endswith('.xlsx'):
- xl = pd.ExcelFile(file_path)
- sheet_names = xl.sheet_names
- header_row = None
- target_sheet = None
- for sheet in sheet_names:
- df_sheet = pd.read_excel(file_path, sheet_name=sheet, header=None)
- header_row = self.find_header_row(df_sheet)
- if header_row is not None:
- target_sheet = sheet
- break
- if header_row is None:
- raise ValueError("Could not find 'Depth' followed by '(m)' in any sheet of the file.")
- df = pd.read_excel(file_path, sheet_name=target_sheet, header=[header_row, header_row + 1])
- col_names = []
- col_counts = {}
- for col in df.columns:
- col_name = f"{col[0]} {col[1]}".strip() if col[1] and pd.notna(col[1]) else col[0]
- if col_name in col_counts:
- col_counts[col_name] += 1
- col_names.append(f"{col_name} {col_counts[col_name]}")
- else:
- col_counts[col_name] = 0
- col_names.append(col_name)
- df.columns = col_names
- elif selected_file.endswith('.csv'):
- df_temp = pd.read_csv(file_path, header=None)
- header_row = self.find_header_row(df_temp)
- if header_row is None:
- raise ValueError("Could not find 'Depth' followed by '(m)' in the CSV file.")
- df = pd.read_csv(file_path, header=[header_row, header_row + 1])
- col_names = []
- col_counts = {}
- for col in df.columns:
- col_name = f"{col[0]} {col[1]}".strip() if col[1] and pd.notna(col[1]) else col[0]
- if col_name in col_counts:
- col_counts[col_name] += 1
- col_names.append(f"{col_name} {col_counts[col_name]}")
- else:
- col_counts[col_name] = 0
- col_names.append(col_name)
- df.columns = col_names
- else:
- raise ValueError("Unsupported file format. Please upload an Excel (.xlsx) or CSV (.csv) file.")
- df.columns = [col.replace('\n', ' ') for col in df.columns]
- df.columns = ['Depth (m)' if col.startswith('Depth') else col for col in df.columns]
- elevation_cols = [col for col in df.columns if col.startswith('Elevation')]
- if elevation_cols:
- valid_elevation_col = None
- for col in elevation_cols:
- if df[col].notna().any():
- valid_elevation_col = col
- break
- if valid_elevation_col:
- df.rename(columns={valid_elevation_col: 'Elevation (m)'}, inplace=True)
- other_elevation_cols = [col for col in elevation_cols if col != valid_elevation_col]
- df.drop(columns=other_elevation_cols, inplace=True)
- df = df.dropna(axis=1, how='all')
- df = df.loc[:, (df.notna().any()) | (df.columns == 'Depth (m)')].dropna(subset=df.columns.difference(['Depth (m)']), how='all')
- df_raw = df[df['Depth (m)'].notna()]
- self.raw_data = df_raw.copy()
- self.current_data = self.raw_data.copy()
- required_columns = ['Depth (m)', 'Elevation (m)']
- missing_columns = [col for col in required_columns if col not in df.columns]
- if missing_columns:
- raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")
- self.display_raw_data(self.raw_data)
- analysis_df = self.analyze_data(self.current_data)
- self.display_analysis(analysis_df)
- self.check_data_quality(self.current_data)
- self.display_quality_check()
- self.display_summary(analysis_df)
- self.identify_layers(analysis_df)
- self.display_layers()
- self.plot_visualizations(self.current_data, analysis_df)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Loaded: {selected_file} at {timestamp}")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to load file: {str(e)}")
- self.clear_tabs()
- def on_select_row(self, event):
- item = self.raw_tree.identify_row(event.y)
- if not item:
- return
- idx = int(self.raw_tree.index(item))
- current_value = self.selected_rows.get(idx, False)
- self.selected_rows[idx] = not current_value
- self.raw_tree.set(item, "Select", "✔" if self.selected_rows[idx] else "")
- def display_raw_data(self, df):
- for item in self.raw_tree.get_children():
- self.raw_tree.delete(item)
- full_df = self.raw_data
- columns = ["Select"] + list(full_df.columns)
- self.raw_tree["columns"] = columns
- for col in columns:
- self.raw_tree.heading(col, text=col)
- self.raw_tree.column(col, anchor=tk.CENTER, stretch=True)
- self.selected_rows = {}
- for idx, row in full_df.iterrows():
- formatted_row = [""] + [str(val) if pd.notna(val) else "" for val in row]
- self.raw_tree.insert("", tk.END, values=formatted_row)
- self.selected_rows[idx] = False
- self.raw_tree.column("Select", width=50, minwidth=50)
- self.raw_tree.bind("<Button-1>", self.on_select_row)
- for col in columns[1:]:
- data_lengths = [len(str(row.get(col, ""))) if pd.notna(row.get(col)) else 0 for _, row in full_df.iterrows()]
- data_max = max(data_lengths, default=0)
- max_length = max(len(str(col)), data_max)
- self.raw_tree.column(col, width=max_length * 10, minwidth=150)
- def apply_selection(self):
- if self.raw_data is None:
- return
- df = self.raw_data.copy()
- try:
- min_depth = float(self.raw_data_min_depth_var.get()) if self.raw_data_min_depth_var.get() else df['Depth (m)'].min()
- max_depth = float(self.raw_data_max_depth_var.get()) if self.raw_data_max_depth_var.get() else df['Depth (m)'].max()
- if min_depth > max_depth:
- min_depth, max_depth = max_depth, min_depth
- df = df[(df['Depth (m)'] >= min_depth) & (df['Depth (m)'] <= max_depth)]
- except ValueError:
- pass
- selected_indices = [idx for idx, selected in self.selected_rows.items() if selected]
- if selected_indices:
- df = df.iloc[selected_indices]
- self.current_data = df
- analysis_df = self.analyze_data(self.current_data)
- self.display_analysis(analysis_df)
- self.check_data_quality(self.current_data)
- self.display_quality_check()
- self.display_summary(analysis_df)
- self.identify_layers(analysis_df)
- self.display_layers()
- self.plot_visualizations(self.current_data, analysis_df)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Applied selection: {len(self.current_data)} rows at {timestamp}")
- def reset_selection(self):
- if self.raw_data is None:
- return
- self.current_data = self.raw_data.copy()
- self.raw_data_min_depth_var.set("")
- self.raw_data_max_depth_var.set("")
- self.selected_rows = {idx: False for idx in range(len(self.raw_data))}
- for item in self.raw_tree.get_children():
- self.raw_tree.set(item, "Select", "")
- analysis_df = self.analyze_data(self.current_data)
- self.display_analysis(analysis_df)
- self.check_data_quality(self.current_data)
- self.display_quality_check()
- self.display_summary(analysis_df)
- self.identify_layers(analysis_df)
- self.display_layers()
- self.plot_visualizations(self.current_data, analysis_df)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Reset selection: {len(self.current_data)} rows at {timestamp}")
- def check_data_quality(self, df):
- p_wave_cols = [col for col in df.columns if "P wave time" in col]
- s_wave_cols = [col for col in df.columns if "S wave time" in col]
- if not p_wave_cols or not s_wave_cols:
- self.quality_issues = pd.DataFrame(columns=['Depth (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)', 'Flag Reason'])
- return
- df['First P-wave Time (µs)'] = df[p_wave_cols].min(axis=1) * 1000
- df['First S-wave Time (µs)'] = df[s_wave_cols].min(axis=1) * 1000
- p_median = df['First P-wave Time (µs)'].median()
- p_std = df['First P-wave Time (µs)'].std()
- s_median = df['First S-wave Time (µs)'].median()
- s_std = df['First S-wave Time (µs)'].std()
- depth_diff = df['Depth (m)'].diff()
- non_monotonic_indices = depth_diff[depth_diff <= 0].index
- quality_issues = []
- for idx, row in df.iterrows():
- p_time = row['First P-wave Time (µs)']
- s_time = row['First S-wave Time (µs)']
- depth = row['Depth (m)']
- flags = []
- if pd.notna(p_time):
- if p_time < 0.5:
- flags.append("P-wave time too small (< 0.5 µs)")
- if abs(p_time - p_median) > 2 * p_std:
- flags.append("P-wave time outlier")
- if pd.notna(s_time):
- if s_time < 0.5:
- flags.append("S-wave time too small (< 0.5 µs)")
- if abs(s_time - s_median) > 2 * s_std:
- flags.append("S-wave time outlier")
- if idx in non_monotonic_indices and idx > 0:
- flags.append("Depth not monotonically increasing")
- if flags:
- quality_issues.append([depth, p_time, s_time, "; ".join(flags)])
- if quality_issues:
- self.quality_issues = pd.DataFrame(quality_issues, columns=['Depth (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)', 'Flag Reason'])
- else:
- self.quality_issues = pd.DataFrame(columns=['Depth (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)', 'Flag Reason'])
- def display_quality_check(self):
- for item in self.quality_tree.get_children():
- self.quality_tree.delete(item)
- columns = list(self.quality_issues.columns)
- self.quality_tree["columns"] = columns
- for col in columns:
- self.quality_tree.heading(col, text=col)
- self.quality_tree.column(col, anchor=tk.CENTER)
- for idx, row in self.quality_issues.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) for val in row]
- self.quality_tree.insert("", tk.END, values=formatted_row)
- for col in columns:
- data_lengths = [len(str(row.get(col, ""))) if pd.notna(row.get(col)) else 0 for _, row in self.quality_issues.iterrows()]
- data_max = max(data_lengths, default=0)
- max_length = max(len(str(col)), data_max)
- min_width = 300 if col == "Flag Reason" else 150
- self.quality_tree.column(col, width=max_length * 10, minwidth=min_width)
- def run_monte_carlo(self):
- if self.current_data is None:
- messagebox.showwarning("No Data", "Please load a valid file before running Monte Carlo simulation.")
- return
- try:
- df = self.current_data.copy()
- n_iterations = 1000
- time_noise_std = 0.01 # 1% noise in travel times
- p_wave_cols = [col for col in df.columns if "P wave time" in col]
- s_wave_cols = [col for col in df.columns if "S wave time" in col]
- if not p_wave_cols or not s_wave_cols:
- raise ValueError("P-wave or S-wave time columns not found.")
- results = {
- 'P-wave Velocity (m/s)': [],
- 'S-wave Velocity (m/s)': [],
- 'Vp/Vs Ratio': []
- }
- for _ in range(n_iterations):
- df_sim = df.copy()
- for col in p_wave_cols:
- df_sim[col] = df_sim[col] * (1 + np.random.normal(0, time_noise_std, len(df_sim)))
- for col in s_wave_cols:
- df_sim[col] = df_sim[col] * (1 + np.random.normal(0, time_noise_std, len(df_sim)))
- analysis_sim = self.analyze_data(df_sim)
- for param in results.keys():
- results[param].append(analysis_sim[param].values)
- for param in results:
- results[param] = np.array(results[param])
- mean = np.mean(results[param], axis=0)
- ci_lower = np.percentile(results[param], 2.5, axis=0)
- ci_upper = np.percentile(results[param], 97.5, axis=0)
- df[f'{param} Mean'] = mean
- df[f'{param} CI Lower'] = ci_lower
- df[f'{param} CI Upper'] = ci_upper
- self.current_data = df
- self.display_analysis(self.current_data)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Monte Carlo simulation completed at {timestamp}")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to run Monte Carlo simulation: {str(e)}")
- def upload_correlation_file(self):
- file_path = filedialog.askopenfilename(filetypes=[
- ("Excel and CSV Files", "*.xlsx *.csv"),
- ("Excel Files", "*.xlsx"),
- ("CSV Files", "*.csv"),
- ("All Files", "*.*")
- ])
- if file_path:
- try:
- if file_path.endswith('.xlsx'):
- df = pd.read_excel(file_path)
- elif file_path.endswith('.csv'):
- df = pd.read_csv(file_path)
- else:
- raise ValueError("Unsupported file format.")
- required_cols = ['Depth (m)']
- if not all(col in df.columns for col in required_cols):
- raise ValueError("SPT/Sonic Log file must contain 'Depth (m)' column.")
- self.correlation_data = df
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Uploaded SPT/Sonic Log file: {os.path.basename(file_path)} at {timestamp}")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to upload SPT/Sonic Log file: {str(e)}")
- def run_correlation_analysis(self):
- if self.current_data is None or self.correlation_data is None:
- messagebox.showwarning("No Data", "Please load both CHST and SPT/Sonic Log files.")
- return
- try:
- seismic_df = self.analyze_data(self.current_data)
- correlation_df = self.correlation_data
- merged_df = pd.merge(seismic_df, correlation_df, on='Depth (m)', how='inner')
- if merged_df.empty:
- raise ValueError("No matching depths found between CHST and SPT/Sonic Log data.")
- correlation_results = []
- seismic_params = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio']
- correlation_params = [col for col in correlation_df.columns if col != 'Depth (m)']
- for s_param in seismic_params:
- for c_param in correlation_params:
- if merged_df[s_param].notna().sum() > 1 and merged_df[c_param].notna().sum() > 1:
- corr, p_value = pearsonr(merged_df[s_param].dropna(), merged_df[c_param].dropna())
- correlation_results.append([s_param, c_param, corr, p_value])
- self.correlation_results = pd.DataFrame(correlation_results, columns=['Seismic Parameter', 'Correlation Parameter', 'Pearson Correlation', 'P-Value'])
- self.display_correlation()
- self.plot_correlation(merged_df)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Correlation analysis completed at {timestamp}")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to run correlation analysis: {str(e)}")
- def display_correlation(self):
- for item in self.correlation_tree.get_children():
- self.correlation_tree.delete(item)
- columns = list(self.correlation_results.columns)
- self.correlation_tree["columns"] = columns
- for col in columns:
- self.correlation_tree.heading(col, text=col)
- self.correlation_tree.column(col, anchor=tk.CENTER)
- for idx, row in self.correlation_results.iterrows():
- formatted_row = [f"{val:.3f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) for val in row]
- self.correlation_tree.insert("", tk.END, values=formatted_row)
- for col in columns:
- data_lengths = [len(str(row.get(col, ""))) if pd.notna(row.get(col)) else 0 for _, row in self.correlation_results.iterrows()]
- data_max = max(data_lengths, default=0)
- max_length = max(len(str(col)), data_max)
- self.correlation_tree.column(col, width=max_length * 10, minwidth=150)
- def plot_correlation(self, merged_df):
- if hasattr(self, 'correlation_canvas') and self.correlation_canvas:
- self.correlation_canvas.get_tk_widget().destroy()
- if hasattr(self, 'correlation_fig') and self.correlation_fig:
- plt.close(self.correlation_fig)
- self.correlation_plot_label.pack_forget()
- seismic_params = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)']
- correlation_params = [col for col in merged_df.columns if col not in seismic_params + ['Depth (m)']]
- if not correlation_params:
- self.correlation_plot_label = ttk.Label(self.correlation_tree.master, text="No valid correlation parameters found.")
- self.correlation_plot_label.pack(fill=tk.BOTH, expand=True)
- return
- n_plots = len(seismic_params) * len(correlation_params)
- rows = (n_plots + 2) // 3
- self.correlation_fig, axes = plt.subplots(rows, 3, figsize=(15, 4 * rows))
- axes = axes.flatten()
- plot_idx = 0
- for s_param in seismic_params:
- for c_param in correlation_params:
- axes[plot_idx].scatter(merged_df[s_param], merged_df[c_param], c='b', alpha=0.5)
- axes[plot_idx].set_xlabel(s_param)
- axes[plot_idx].set_ylabel(c_param)
- axes[plot_idx].set_title(f'{s_param} vs {c_param}')
- axes[plot_idx].grid(True)
- plot_idx += 1
- for i in range(plot_idx, len(axes)):
- axes[i].axis('off')
- plt.tight_layout()
- self.correlation_canvas = FigureCanvasTkAgg(self.correlation_fig, master=self.correlation_tree.master)
- self.correlation_canvas.draw()
- self.correlation_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
- def start_realtime(self):
- if self.realtime_running:
- return
- self.realtime_running = True
- self.realtime_data = pd.DataFrame(columns=['Depth (m)', 'Elevation (m)', 'P wave time (ms)', 'S wave time (ms)'])
- self.display_realtime()
- self.realtime_thread = threading.Thread(target=self.realtime_acquisition, daemon=True)
- self.realtime_thread.start()
- self.status_var.set("Started real-time acquisition.")
- def stop_realtime(self):
- self.realtime_running = False
- self.status_var.set("Stopped real-time acquisition.")
- def realtime_acquisition(self):
- realtime_file = os.path.join(self.data_dir, "realtime_chst_data.csv")
- while self.realtime_running:
- try:
- if os.path.exists(realtime_file):
- df_new = pd.read_csv(realtime_file)
- if not df_new.empty:
- self.realtime_data = pd.concat([self.realtime_data, df_new]).drop_duplicates(subset=['Depth (m)']).reset_index(drop=True)
- self.root.after(0, self.update_realtime_display)
- except Exception as e:
- print(f"Real-time error: {str(e)}")
- time.sleep(1)
- def update_realtime_display(self):
- self.display_realtime()
- self.plot_realtime()
- def display_realtime(self):
- for item in self.realtime_tree.get_children():
- self.realtime_tree.delete(item)
- columns = list(self.realtime_data.columns)
- self.realtime_tree["columns"] = columns
- for col in columns:
- self.realtime_tree.heading(col, text=col)
- self.realtime_tree.column(col, anchor=tk.CENTER)
- for idx, row in self.realtime_data.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) for val in row]
- self.realtime_tree.insert("", tk.END, values=formatted_row)
- for col in columns:
- data_lengths = [len(str(row.get(col, ""))) if pd.notna(row.get(col)) else 0 for _, row in self.realtime_data.iterrows()]
- data_max = max(data_lengths, default=0)
- max_length = max(len(str(col)), data_max)
- self.realtime_tree.column(col, width=max_length * 10, minwidth=150)
- def plot_realtime(self):
- if hasattr(self, 'realtime_canvas') and self.realtime_canvas:
- self.realtime_canvas.get_tk_widget().destroy()
- if hasattr(self, 'realtime_fig') and self.realtime_fig:
- plt.close(self.realtime_fig)
- self.realtime_plot_label.pack_forget()
- if self.realtime_data.empty:
- self.realtime_plot_label = ttk.Label(self.realtime_tree.master, text="No real-time data available.")
- self.realtime_plot_label.pack(fill=tk.BOTH, expand=True)
- return
- analysis_df = self.analyze_data(self.realtime_data)
- self.realtime_fig, ax = plt.subplots(figsize=(10, 5))
- ax.plot(analysis_df['Depth (m)'], analysis_df['P-wave Velocity (m/s)'], 'b-', label='P-wave Velocity')
- ax.plot(analysis_df['Depth (m)'], analysis_df['S-wave Velocity (m/s)'], 'r-', label='S-wave Velocity')
- ax.set_xlabel('Depth (m)')
- ax.set_ylabel('Velocity (m/s)')
- ax.set_title('Real-Time Seismic Velocities')
- ax.legend()
- ax.grid(True)
- self.realtime_canvas = FigureCanvasTkAgg(self.realtime_fig, master=self.realtime_tree.master)
- self.realtime_canvas.draw()
- self.realtime_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
- def analyze_data(self, df):
- df = df.copy()
- df['Depth (m)'] = pd.to_numeric(df['Depth (m)'], errors='coerce')
- p_wave_cols = [col for col in df.columns if "P wave time" in col]
- s_wave_cols = [col for col in df.columns if "S wave time" in col]
- if not p_wave_cols or not s_wave_cols:
- raise ValueError("P-wave or S-wave time columns not found in the data.")
- for col in p_wave_cols + s_wave_cols:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df['First P-wave Time (µs)'] = df[p_wave_cols].min(axis=1) * 1000
- df['First S-wave Time (µs)'] = df[s_wave_cols].min(axis=1) * 1000
- df['P-wave Time Diff'] = df['First P-wave Time (µs)'].diff()
- df['S-wave Time Diff'] = df['First S-wave Time (µs)'].diff()
- p_mean_diff = df['P-wave Time Diff'].mean()
- p_std_diff = df['P-wave Time Diff'].std()
- s_mean_diff = df['S-wave Time Diff'].mean()
- s_std_diff = df['S-wave Time Diff'].std()
- df['P-wave Deviation'] = (df['P-wave Time Diff'] < 0) | (abs(df['P-wave Time Diff'] - p_mean_diff) > 2 * p_std_diff)
- df['S-wave Deviation'] = (df['S-wave Time Diff'] < 0) | (abs(df['S-wave Time Diff'] - s_mean_diff) > 2 * s_std_diff)
- self.deviations = df[df['P-wave Deviation'] | df['S-wave Deviation']][['Depth (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)']]
- df['P-wave Velocity (m/s)'] = 0.0
- df['S-wave Velocity (m/s)'] = 0.0
- for i in range(1, len(df)):
- delta_depth = df['Depth (m)'].iloc[i] - df['Depth (m)'].iloc[i-1]
- delta_p_time = (df['First P-wave Time (µs)'].iloc[i] - df['First P-wave Time (µs)'].iloc[i-1]) / 10**6 if pd.notna(df['First P-wave Time (µs)'].iloc[i]) and pd.notna(df['First P-wave Time (µs)'].iloc[i-1]) else 0
- delta_s_time = (df['First S-wave Time (µs)'].iloc[i] - df['First S-wave Time (µs)'].iloc[i-1]) / 10**6 if pd.notna(df['First S-wave Time (µs)'].iloc[i]) and pd.notna(df['First S-wave Time (µs)'].iloc[i-1]) else 0
- if delta_depth > 0 and delta_p_time > 0:
- df.loc[df.index[i], 'P-wave Velocity (m/s)'] = delta_depth / delta_p_time
- if delta_depth > 0 and delta_s_time > 0:
- df.loc[df.index[i], 'S-wave Velocity (m/s)'] = delta_depth / delta_s_time
- df['P-wave Velocity (m/s)'] = df['P-wave Velocity (m/s)'].replace([np.inf, -np.inf], 0).fillna(0)
- df['S-wave Velocity (m/s)'] = df['S-wave Velocity (m/s)'].replace([np.inf, -np.inf], 0).fillna(0)
- df['Vp/Vs Ratio'] = df['P-wave Velocity (m/s)'] / df['S-wave Velocity (m/s)']
- df['Vp/Vs Ratio'] = df['Vp/Vs Ratio'].replace([np.inf, -np.inf], 0).fillna(0)
- vp_vs = df['Vp/Vs Ratio']
- df['Poisson\'s Ratio'] = ((vp_vs**2 - 2) / (2 * (vp_vs**2 - 1))).replace([np.inf, -np.inf], 0).fillna(0)
- rho = 2500
- df['Shear Modulus (GPa)'] = (rho * (df['S-wave Velocity (m/s)']**2)) / 1e9
- df['Shear Modulus (GPa)'] = df['Shear Modulus (GPa)'].replace([np.inf, -np.inf], 0).fillna(0)
- df['Bulk Modulus (GPa)'] = (rho * (df['P-wave Velocity (m/s)']**2 - (4/3) * df['S-wave Velocity (m/s)']**2)) / 1e9
- df['Bulk Modulus (GPa)'] = df['Bulk Modulus (GPa)'].replace([np.inf, -np.inf], 0).fillna(0)
- num = 3 * df['P-wave Velocity (m/s)']**2 - 4 * df['S-wave Velocity (m/s)']**2
- denom = df['P-wave Velocity (m/s)']**2 - df['S-wave Velocity (m/s)']**2
- df['Young\'s Modulus (GPa)'] = (rho * df['S-wave Velocity (m/s)']**2 * num / denom) / 1e9
- df['Young\'s Modulus (GPa)'] = df['Young\'s Modulus (GPa)'].replace([np.inf, -np.inf], 0).fillna(0)
- df['Lame\'s Lambda (GPa)'] = df['Bulk Modulus (GPa)'] - (2/3) * df['Shear Modulus (GPa)']
- df['Lame\'s Lambda (GPa)'] = df['Lame\'s Lambda (GPa)'].replace([np.inf, -np.inf], 0).fillna(0)
- params = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio',
- 'Poisson\'s Ratio', 'Shear Modulus (GPa)', 'Bulk Modulus (GPa)',
- 'Young\'s Modulus (GPa)', 'Lame\'s Lambda (GPa)']
- analysis_df = df.copy()
- for param in params:
- analysis_df[f"{param} +10%"] = analysis_df[param] * 1.10
- analysis_df[f"{param} -10%"] = analysis_df[param] * 0.90
- analysis_cols = ['Depth (m)', 'Elevation (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)'] + \
- [col for col in analysis_df.columns if col.startswith(tuple(params)) or col in params]
- return analysis_df[analysis_cols]
- def identify_layers(self, analysis_df):
- layers = []
- threshold = 0.20
- for i in range(1, len(analysis_df)):
- prev_p_velocity = analysis_df['P-wave Velocity (m/s)'].iloc[i-1]
- curr_p_velocity = analysis_df['P-wave Velocity (m/s)'].iloc[i]
- prev_s_velocity = analysis_df['S-wave Velocity (m/s)'].iloc[i-1]
- curr_s_velocity = analysis_df['S-wave Velocity (m/s)'].iloc[i]
- depth = analysis_df['Depth (m)'].iloc[i]
- p_change = abs(curr_p_velocity - prev_p_velocity) / prev_p_velocity if prev_p_velocity != 0 else 0
- s_change = abs(curr_s_velocity - prev_s_velocity) / prev_s_velocity if prev_s_velocity != 0 else 0
- if p_change > threshold or s_change > threshold:
- reason = []
- if p_change > threshold:
- reason.append(f"P-wave velocity change: {p_change:.2%}")
- if s_change > threshold:
- reason.append(f"S-wave velocity change: {s_change:.2%}")
- layers.append([depth, curr_p_velocity, curr_s_velocity, "; ".join(reason)])
- if layers:
- self.layers = pd.DataFrame(layers, columns=['Depth (m)', 'P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Reason'])
- else:
- self.layers = pd.DataFrame(columns=['Depth (m)', 'P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Reason'])
- def display_layers(self):
- for item in self.layers_tree.get_children():
- self.layers_tree.delete(item)
- columns = list(self.layers.columns)
- self.layers_tree["columns"] = columns
- for col in columns:
- self.layers_tree.heading(col, text=col)
- self.layers_tree.column(col, anchor=tk.CENTER, stretch=True)
- for idx, row in self.layers.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) for val in row]
- self.layers_tree.insert("", tk.END, values=formatted_row)
- for col in columns:
- data_lengths = [len(str(row.get(col, ""))) if pd.notna(row.get(col)) else 0 for _, row in self.layers.iterrows()]
- data_max = max(data_lengths, default=0)
- max_length = max(len(str(col)), data_max)
- min_width = 1200 if col == "Reason" else 150
- self.layers_tree.column(col, width=max(max_length * 20, min_width), minwidth=min_width, stretch=True)
- def display_analysis(self, analysis_df):
- for item in self.analysis_tree.get_children():
- self.analysis_tree.delete(item)
- columns = list(analysis_df.columns)
- self.analysis_tree["columns"] = columns
- for col in columns:
- self.analysis_tree.heading(col, text=col)
- self.analysis_tree.column(col, anchor=tk.CENTER)
- for idx, row in analysis_df.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- self.analysis_tree.insert("", tk.END, values=formatted_row)
- for col in columns:
- data_lengths = [len(str(row.get(col, ""))) if pd.notna(row.get(col)) else 0 for _, row in analysis_df.iterrows()]
- data_max = max(data_lengths, default=0)
- max_length = max(len(str(col)), data_max)
- self.analysis_tree.column(col, width=max_length * 10, minwidth=150)
- def compute_summary_stats(self, analysis_df):
- params = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio',
- 'Poisson\'s Ratio', 'Shear Modulus (GPa)', 'Bulk Modulus (GPa)',
- 'Young\'s Modulus (GPa)', 'Lame\'s Lambda (GPa)']
- summary_data = []
- for param in params:
- data = analysis_df[param]
- summary_data.append([
- param,
- data.mean(),
- data.median(),
- data.std(),
- data.min(),
- data.max()
- ])
- return pd.DataFrame(summary_data, columns=['Parameter', 'Mean', 'Median', 'Std Dev', 'Min', 'Max'])
- def display_summary(self, analysis_df):
- summary_df = self.compute_summary_stats(analysis_df)
- for item in self.summary_tree.get_children():
- self.summary_tree.delete(item)
- columns = list(summary_df.columns)
- self.summary_tree["columns"] = columns
- for col in columns:
- self.summary_tree.heading(col, text=col)
- self.summary_tree.column(col, anchor=tk.CENTER)
- for idx, row in summary_df.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) for val in row]
- self.summary_tree.insert("", tk.END, values=formatted_row)
- for col in columns:
- data_lengths = [len(str(row.get(col, ""))) if pd.notna(row.get(col)) else 0 for _, row in summary_df.iterrows()]
- data_max = max(data_lengths, default=0)
- max_length = max(len(str(col)), data_max)
- self.summary_tree.column(col, width=max_length * 10, minwidth=150)
- def plot_visualizations(self, df, analysis_df):
- if self.plot_canvas:
- self.plot_canvas.get_tk_widget().destroy()
- if self.current_fig:
- plt.close(self.current_fig)
- self.plot_label.pack_forget()
- try:
- min_depth = float(self.min_depth_var.get()) if self.min_depth_var.get() else analysis_df['Depth (m)'].min()
- max_depth = float(self.max_depth_var.get()) if self.max_depth_var.get() else analysis_df['Depth (m)'].max()
- if min_depth > max_depth:
- min_depth, max_depth = max_depth, min_depth
- except ValueError:
- min_depth = analysis_df['Depth (m)'].min()
- max_depth = analysis_df['Depth (m)'].max()
- plot_df = analysis_df[(analysis_df['Depth (m)'] >= min_depth) & (analysis_df['Depth (m)'] <= max_depth)]
- plot_deviations = self.deviations
- if plot_deviations is not None and not plot_deviations.empty:
- plot_deviations = plot_deviations[(plot_deviations['Depth (m)'] >= min_depth) & (plot_deviations['Depth (m)'] <= max_depth)]
- plots_to_show = [param for param, var in self.plot_options.items() if var.get()]
- if not plots_to_show:
- self.plot_label = ttk.Label(self.scrollable_frame, text="Select at least one parameter to plot.")
- self.plot_label.pack(fill=tk.BOTH, expand=True)
- return
- num_plots = len(plots_to_show)
- rows = (num_plots + 2) // 3
- fig, axes = plt.subplots(rows, 3, figsize=(15, 4 * rows))
- axes = axes.flatten()
- plot_index = 0
- plot_configs = [
- ("Vp/Vs Ratio", 'g-', 'Vp/Vs Ratio', 'Vp/Vs Ratio vs Depth'),
- ("Poisson's Ratio", 'b-', 'Poisson\'s Ratio', 'Poisson\'s Ratio vs Depth'),
- ("Shear Modulus", 'r-', 'Shear Modulus (GPa)', 'Shear Modulus vs Depth'),
- ("Bulk Modulus", 'c-', 'Bulk Modulus (GPa)', 'Bulk Modulus vs Depth'),
- ("Young's Modulus", 'm-', 'Young\'s Modulus (GPa)', 'Young\'s Modulus vs Depth'),
- ("Lame's Lambda", 'y-', 'Lame\'s Lambda (GPa)', 'Lame\'s Lambda vs Depth'),
- ("Travel Time Deviations", None, None, 'Travel Time Deviations')
- ]
- for param, color, y_label, title in plot_configs:
- if param not in plots_to_show:
- continue
- if param == "Travel Time Deviations":
- if plot_deviations is not None and not plot_deviations.empty:
- axes[plot_index].plot(plot_deviations['Depth (m)'], plot_deviations['First P-wave Time (µs)'], 'r-', label='P-wave Deviations')
- axes[plot_index].plot(plot_deviations['Depth (m)'], plot_deviations['First S-wave Time (µs)'], 'b-', label='S-wave Deviations')
- axes[plot_index].set_xlabel('Depth (m)', fontsize=12)
- axes[plot_index].set_ylabel('Travel Time (µs)', fontsize=12)
- axes[plot_index].set_title('Travel Time Deviations', fontsize=14)
- axes[plot_index].grid(True)
- axes[plot_index].legend()
- else:
- axes[plot_index].text(0.5, 0.5, 'No Deviations Detected', horizontalalignment='center', verticalalignment='center')
- axes[plot_index].set_title('Travel Time Deviations', fontsize=14)
- axes[plot_index].grid(True)
- else:
- axes[plot_index].plot(plot_df['Depth (m)'], plot_df[y_label], color, label=y_label)
- axes[plot_index].set_xlabel('Depth (m)', fontsize=12)
- axes[plot_index].set_ylabel(y_label, fontsize=12)
- axes[plot_index].set_title(title, fontsize=14)
- axes[plot_index].grid(True)
- axes[plot_index].legend()
- plot_index += 1
- for i in range(plot_index, len(axes)):
- axes[i].axis('off')
- plt.tight_layout()
- self.current_fig = fig
- self.plot_canvas = FigureCanvasTkAgg(fig, master=self.scrollable_frame)
- self.plot_canvas.draw()
- self.plot_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
- def update_plots(self):
- if self.current_data is None:
- return
- analysis_df = self.analyze_data(self.current_data)
- self.plot_visualizations(self.current_data, analysis_df)
- def save_plots(self):
- if self.current_fig is None:
- messagebox.showwarning("No Plot", "No plots available to save.")
- return
- file_path = filedialog.asksaveasfilename(defaultextension=".png", filetypes=[("PNG Files", "*.png"), ("All Files", "*.*")])
- if file_path:
- try:
- self.current_fig.savefig(file_path, dpi=300, bbox_inches='tight')
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Saved plot to {file_path} at {timestamp}")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to save plot: {str(e)}")
- def export_raw_data(self):
- if self.current_data is None:
- messagebox.showwarning("No Data", "Please load a valid file before exporting raw data.")
- return
- try:
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(self.current_file)[0]
- csv_filename = f"{input_filename}_raw_data_{timestamp}.csv"
- csv_path = os.path.join(output_dir, csv_filename)
- self.current_data.to_csv(csv_path, index=False)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Exported raw data to {csv_path} at {timestamp}")
- messagebox.showinfo("Success", f"Raw data exported successfully to {csv_path}.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to export raw data: {str(e)}")
- def export_quality_check(self):
- if self.quality_issues is None or self.quality_issues.empty:
- messagebox.showwarning("No Data", "No quality check data available to export.")
- return
- try:
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(self.current_file)[0]
- csv_filename = f"{input_filename}_quality_check_{timestamp}.csv"
- csv_path = os.path.join(output_dir, csv_filename)
- self.quality_issues.to_csv(csv_path, index=False)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Exported quality check to {csv_path} at {timestamp}")
- messagebox.showinfo("Success", f"Quality check exported successfully to {csv_path}.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to export quality check: {str(e)}")
- def export_summary(self):
- if self.current_data is None:
- messagebox.showwarning("No Data", "Please load a valid file before exporting summary.")
- return
- try:
- analysis_df = self.analyze_data(self.current_data)
- summary_df = self.compute_summary_stats(analysis_df)
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(self.current_file)[0]
- csv_filename = f"{input_filename}_summary_{timestamp}.csv"
- csv_path = os.path.join(output_dir, csv_filename)
- summary_df.to_csv(csv_path, index=False)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Exported summary to {csv_path} at {timestamp}")
- messagebox.showinfo("Success", f"Summary exported successfully to {csv_path}.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to export summary: {str(e)}")
- def export_layers(self):
- if self.layers is None or self.layers.empty:
- messagebox.showwarning("No Data", "No layer data available to export.")
- return
- try:
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(self.current_file)[0]
- csv_filename = f"{input_filename}_layers_{timestamp}.csv"
- csv_path = os.path.join(output_dir, csv_filename)
- self.layers.to_csv(csv_path, index=False)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Exported layers to {csv_path} at {timestamp}")
- messagebox.showinfo("Success", f"Layers exported successfully to {csv_path}.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to export layers: {str(e)}")
- def export_correlation(self):
- if self.correlation_results is None or self.correlation_results.empty:
- messagebox.showwarning("No Data", "No correlation data available to export.")
- return
- try:
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(self.current_file)[0]
- csv_filename = f"{input_filename}_correlation_{timestamp}.csv"
- csv_path = os.path.join(output_dir, csv_filename)
- self.correlation_results.to_csv(csv_path, index=False)
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Exported correlation data to {csv_path} at {timestamp}")
- messagebox.showinfo("Success", f"Correlation data exported successfully to {csv_path}.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to export correlation data: {str(e)}")
- def export_detailed_report(self):
- if self.current_data is None:
- messagebox.showwarning("No Data", "Please load a valid file before exporting detailed report.")
- return
- try:
- analysis_df = self.analyze_data(self.current_data)
- summary_df = self.compute_summary_stats(analysis_df)
- timestamp = datetime.now().strftime('%Y%m%d_%H%M')
- output_dir = os.path.join(self.output_base_dir, timestamp)
- os.makedirs(output_dir, exist_ok=True)
- input_filename = os.path.splitext(self.current_file)[0]
- tex_filename = f"{input_filename}_detailed_report_{timestamp}.tex"
- tex_path = os.path.join(output_dir, tex_filename)
- pdf_filename = f"{input_filename}_detailed_report_{timestamp}.pdf"
- pdf_path = os.path.join(output_dir, pdf_filename)
- latex_content = r"""
- \documentclass[a4paper,12pt]{article}
- \usepackage{booktabs}
- \usepackage{longtable}
- \usepackage[margin=1in]{geometry}
- \usepackage{amsmath}
- \usepackage{amsfonts}
- \usepackage{noto}
- \begin{document}
- \section*{Detailed Seismic Analysis Report}
- \subsection*{Raw Data}
- \begin{longtable}{@{}""" + "c" * len(self.current_data.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in self.current_data.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in self.current_data.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- \subsection*{Quality Check}
- """
- if self.quality_issues is not None and not self.quality_issues.empty:
- latex_content += r"""
- \begin{longtable}{@{}""" + "c" * len(self.quality_issues.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in self.quality_issues.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in self.quality_issues.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- """
- else:
- latex_content += r"No quality issues detected."
- latex_content += r"""
- \subsection*{Analysis}
- \begin{longtable}{@{}""" + "c" * len(analysis_df.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in analysis_df.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in analysis_df.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- \subsection*{Summary}
- \begin{longtable}{@{}""" + "c" * len(summary_df.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in summary_df.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in summary_df.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- \subsection*{Identified Layers}
- """
- if self.layers is not None and not self.layers.empty:
- latex_content += r"""
- \begin{longtable}{@{}""" + "c" * len(self.layers.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in self.layers.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in self.layers.iterrows():
- formatted_row = [f"{val:.2f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- """
- else:
- latex_content += r"No significant layer boundaries detected."
- latex_content += r"""
- \subsection*{Correlation Analysis}
- """
- if self.correlation_results is not None and not self.correlation_results.empty:
- latex_content += r"""
- \begin{longtable}{@{}""" + "c" * len(self.correlation_results.columns) + r"""@{}}
- \toprule
- """ + " & ".join([f"\\textbf{{{col}}}" for col in self.correlation_results.columns]) + r""" \\
- \midrule
- \endhead
- """
- for _, row in self.correlation_results.iterrows():
- formatted_row = [f"{val:.3f}" if isinstance(val, (int, float)) and pd.notna(val) else str(val) if pd.notna(val) else "" for val in row]
- latex_content += " & ".join(formatted_row) + r" \\ \midrule" + "\n"
- latex_content += r"""
- \bottomrule
- \end{longtable}
- """
- else:
- latex_content += r"No correlation analysis data available."
- latex_content += r"""
- \end{document}
- """
- with open(tex_path, 'w', encoding='utf-8') as f:
- f.write(latex_content)
- os.system(f'latexmk -pdf -outdir="{output_dir}" "{tex_path}"')
- if os.path.exists(pdf_path):
- ist = pytz.timezone('Asia/Kolkata')
- timestamp = datetime.now(ist).strftime("%I:%M %p IST")
- self.status_var.set(f"Exported detailed report to {pdf_path} at {timestamp}")
- messagebox.showinfo("Success", f"Detailed report exported successfully to {pdf_path}.")
- else:
- raise Exception("PDF compilation failed.")
- except Exception as e:
- messagebox.showerror("Error", f"Failed to export detailed report: {str(e)}")
- def clear_tabs(self):
- for tree in [self.raw_tree, self.quality_tree, self.analysis_tree, self.summary_tree, self.layers_tree, self.correlation_tree, self.realtime_tree]:
- if tree:
- for item in tree.get_children():
- tree.delete(item)
- if self.plot_canvas:
- self.plot_canvas.get_tk_widget().destroy()
- self.plot_canvas = None
- if self.current_fig:
- plt.close(self.current_fig)
- self.current_fig = None
- self.plot_label = ttk.Label(self.scrollable_frame, text="Select a file to view visualizations.")
- self.plot_label.pack(fill=tk.BOTH, expand=True)
- if hasattr(self, 'correlation_canvas') and self.correlation_canvas:
- self.correlation_canvas.get_tk_widget().destroy()
- self.correlation_canvas = None
- if hasattr(self, 'correlation_fig') and self.correlation_fig:
- plt.close(self.correlation_fig)
- self.correlation_fig = None
- self.correlation_plot_label = ttk.Label(self.correlation_tree.master, text="Upload SPT/Sonic Log file to view correlations.")
- self.correlation_plot_label.pack(fill=tk.BOTH, expand=True)
- if hasattr(self, 'realtime_canvas') and self.realtime_canvas:
- self.realtime_canvas.get_tk_widget().destroy()
- self.realtime_canvas = None
- if hasattr(self, 'realtime_fig') and self.realtime_fig:
- plt.close(self.realtime_fig)
- self.realtime_fig = None
- self.realtime_plot_label = ttk.Label(self.realtime_tree.master, text="Start real-time acquisition to view data.")
- self.realtime_plot_label.pack(fill=tk.BOTH, expand=True)
- self.current_data = None
- self.raw_data = None
- self.quality_issues = None
- self.layers = None
- self.correlation_results = None
- self.realtime_data = None
- self.selected_rows = {}
- if __name__ == "__main__":
- root = tk.Tk()
- app = SeismicWaveAnalyzer(root)
- root.mainloop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement