Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tkinter as tk
- from tkinter import ttk, messagebox, filedialog
- from pathlib import Path
- import shutil, pandas as pd, matplotlib.pyplot as plt, numpy as np, datetime, pytz, scipy.stats, threading, time
- from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
- try:
- from PIL import Image, ImageTk, ImageResampling
- LANCZOS = ImageResampling.LANCZOS
- except:
- from PIL import Image, ImageTk
- LANCZOS = Image.LANCZOS
- import subprocess, logging
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s-%(levelname)s-%(message)s')
- class SeismicWaveAnalyzer:
- def __init__(self, root):
- self.root = root
- self.root.withdraw()
- self.data_dir = Path.home() / "SeismicWaveData"
- self.out_dir = self.data_dir / "Outputs"
- self.data_dir.mkdir(exist_ok=True)
- self.out_dir.mkdir(exist_ok=True)
- self.show_splash()
- def show_splash(self):
- s = tk.Toplevel()
- s.title("Welcome")
- s.configure(bg="#fff")
- s.attributes('-fullscreen', True)
- s.overrideredirect(True)
- ttk.Button(s, text="Skip", command=lambda: self.launch_main(s), style="Custom.TButton").place(relx=.95, rely=.95, anchor="se")
- try:
- img = Image.open(self.data_dir / "picture1.png").resize((200, 200), LANCZOS)
- p = ImageTk.PhotoImage(img)
- tk.Label(s, image=p, bg="#fff").place(x=20, y=20)
- img_label = tk.Label(s, image=p)
- img_label.image = p
- except:
- tk.Label(s, text="[Image 1]", width=20, height=10, bg="#d3d3d3").place(x=20, y=20)
- try:
- img = Image.open(self.data_dir / "picture2.png").resize((200, 200), LANCZOS)
- p = ImageTk.PhotoImage(img)
- tk.Label(s, image=p, bg="#fff").place(relx=1.0, y=20, anchor="ne")
- img_label = tk.Label(s, image=p)
- img_label.image = p
- except:
- tk.Label(s, text="[Image 2]", width=20, height=10, bg="#d3d3d3").place(relx=1.0, y=20, anchor="ne")
- f = tk.Frame(s, bg="#fff")
- f.place(relx=.5, rely=.5, anchor="center")
- tk.Label(f, text="Seismic Wave Analysis", font=("Segoe UI", 20, "bold"), bg="#fff").pack(pady=10)
- tk.Label(f, text="FTT Mode Project", font=("Segoe UI", 16), bg="#fff").pack(pady=5)
- tk.Label(f, text="Instrumentation & Eng. Geophysics", font=("Segoe UI", 14), bg="#fff").pack(pady=5)
- tk.Label(f, text="Dr N Satyavani, Lead", font=("Segoe UI", 14, "italic"), bg="#fff").pack(pady=5)
- s.after(6000, lambda: self.launch_main(s))
- def launch_main(self, s):
- s.destroy()
- self.root.deiconify()
- self.root.title("Seismic Wave Analysis Tool")
- self.root.geometry("1000x700")
- self.root.state('zoomed')
- self.theme = {"bg": "#fff", "fg": "#212121", "entry_bg": "#f5f5f5", "accent": "#003087", "btn_fg": "#fff"}
- self.root.configure(bg=self.theme["bg"])
- self.raw_tree = self.qual_tree = self.ana_tree = self.sum_tree = self.lay_tree = self.corr_tree = self.rt_tree = self.plot_canvas = self.cur_fig = None
- self.cur_data = self.raw_data = self.cur_file = self.deviations = self.qual_issues = self.layers = self.corr_data = self.rt_data = None
- self.plot_label = ttk.Label(self.root)
- self.sel_rows = {}
- self.min_d_var = tk.StringVar()
- self.max_d_var = tk.StringVar()
- self.rt_running = False
- self.plot_opts = {k: tk.BooleanVar(value=True) for k in ["Vp/Vs Ratio", "Poisson's Ratio", "Shear Modulus", "Bulk Modulus", "Young's Modulus", "Travel Time Deviations"]}
- self.setup_gui()
- self.update_clock()
- def setup_gui(self):
- m = tk.Menu(self.root)
- self.root.config(menu=m)
- fm = tk.Menu(m, tearoff=0)
- m.add_cascade(label="File", menu=fm)
- fm.add_command(label="Exit", command=self.root.quit)
- ff = ttk.Frame(self.root)
- ff.pack(pady=10, padx=10, fill=tk.X)
- ttk.Label(ff, text="Upload CHST:", font=("Segoe UI", 10)).pack(side=tk.LEFT, padx=5)
- ttk.Button(ff, text="Upload", command=self.upload_file, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(ff, text="Create Template", command=self.create_template, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(ff, text="Import CSV to PDF", command=self.import_csv_pdf, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(ff, text="Refresh", command=self.refresh_files, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Label(ff, text="Select File:", font=("Segoe UI", 10)).pack(side=tk.LEFT, padx=5)
- self.file_var = tk.StringVar()
- self.file_cb = ttk.Combobox(ff, textvariable=self.file_var, state="readonly")
- self.file_cb.pack(side=tk.LEFT, padx=5)
- self.file_cb.bind("<<ComboboxSelected>>", self.load_file)
- self.nb = ttk.Notebook(self.root)
- self.nb.pack(pady=10, padx=10, fill=tk.BOTH, expand=True)
- self.raw_f = ttk.Frame(self.nb)
- self.nb.add(self.raw_f, text="Raw Data")
- self.setup_raw_tab()
- self.qual_f = ttk.Frame(self.nb)
- self.nb.add(self.qual_f, text="Quality Check")
- self.setup_qual_tab()
- self.ana_f = ttk.Frame(self.nb)
- self.nb.add(self.ana_f, text="Analysis")
- self.setup_ana_tab()
- self.sum_f = ttk.Frame(self.nb)
- self.nb.add(self.sum_f, text="Summary")
- self.setup_sum_tab()
- self.lay_f = ttk.Frame(self.nb)
- self.nb.add(self.lay_f, text="Layers")
- self.setup_lay_tab()
- self.corr_f = ttk.Frame(self.nb)
- self.nb.add(self.corr_f, text="Correlation Analysis")
- self.setup_corr_tab()
- self.rt_f = ttk.Frame(self.nb)
- self.nb.add(self.rt_f, text="Real-Time")
- self.setup_rt_tab()
- self.viz_f = ttk.Frame(self.nb)
- self.nb.add(self.viz_f, text="Visualizations")
- self.setup_viz_tab()
- ef = ttk.Frame(self.root)
- ef.pack(pady=5, fill=tk.X)
- for t, c in [("Raw Data", self.export_raw), ("Quality Check", self.export_qual), ("Analysis to PDF", self.export_ana_pdf), ("Summary", self.export_sum), ("Layers", self.export_lay), ("Correlation", self.export_corr), ("Detailed Report", self.export_report)]:
- ttk.Button(ef, text=f"Export {t} to CSV", command=c, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- sf = ttk.Frame(self.root)
- sf.pack(side=tk.BOTTOM, fill=tk.X)
- self.status_var = tk.StringVar()
- tk.Label(sf, textvariable=self.status_var, bd=1, relief=tk.SUNKEN, anchor=tk.W, font=("Segoe UI", 10)).pack(side=tk.LEFT, fill=tk.X, expand=True)
- self.clock_var = tk.StringVar()
- tk.Label(sf, textvariable=self.clock_var, bd=1, relief=tk.SUNKEN, anchor=tk.E, font=("Segoe UI", 10)).pack(side=tk.RIGHT)
- self.apply_theme()
- self.refresh_files()
- def apply_theme(self):
- s = ttk.Style()
- s.theme_use('clam')
- s.configure("Custom.TButton", background=self.theme["accent"], foreground=self.theme["btn_fg"], font=("Segoe UI", 10), padding=8)
- s.map("Custom.TButton", background=[('active', '#002070')], foreground=[('active', '#fff')])
- s.configure("TCombobox", fieldbackground=self.theme["entry_bg"], foreground=self.theme["fg"])
- s.configure("Treeview", background=self.theme["entry_bg"], foreground=self.theme["fg"], fieldbackground=self.theme["entry_bg"])
- s.configure("Treeview.Heading", background=self.theme["bg"], foreground=self.theme["fg"])
- def update_clock(self):
- self.clock_var.set(datetime.now(pytz.timezone('Asia/Kolkata')).strftime("%a, %b %d, %Y %I:%M %p IST"))
- self.root.after(1000, self.update_clock)
- def setup_raw_tab(self):
- sf = ttk.Frame(self.raw_f)
- sf.pack(fill=tk.X, pady=5)
- ttk.Label(sf, text="Depth Range (m):").pack(side=tk.LEFT, padx=5)
- ttk.Entry(sf, textvariable=self.min_d_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Label(sf, text="to").pack(side=tk.LEFT)
- ttk.Entry(sf, textvariable=self.max_d_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Button(sf, text="Apply", command=self.apply_sel, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(sf, text="Reset", command=self.reset_sel, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- f = ttk.Frame(self.raw_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- c = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=c.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=c.xview)
- sf = ttk.Frame(c)
- sf.bind("<Configure>", lambda e: c.configure(scrollregion=c.bbox("all")))
- c.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- c.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- c.create_window((0, 0), window=sf, anchor="nw")
- self.raw_tree = ttk.Treeview(sf, show="headings")
- self.raw_tree.pack(fill=tk.BOTH, expand=True)
- def setup_qual_tab(self):
- f = ttk.Frame(self.qual_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- c = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=c.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=c.xview)
- sf = ttk.Frame(c)
- sf.bind("<Configure>", lambda e: c.configure(scrollregion=c.bbox("all")))
- c.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- c.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- c.create_window((0, 0), window=sf, anchor="nw")
- self.qual_tree = ttk.Treeview(sf, show="headings")
- self.qual_tree.pack(fill=tk.BOTH, expand=True)
- def setup_ana_tab(self):
- f = ttk.Frame(self.ana_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- cf = ttk.Frame(f)
- cf.pack(fill=tk.X, pady=5)
- b = ttk.Button(cf, text="Run Monte Carlo", command=self.run_mc, style="Custom.TButton")
- b.pack(side=tk.LEFT, padx=5)
- b.bind("<Enter>", lambda e: self.show_tip(b, "Runs 1000 iterations with 1% noise"))
- b.bind("<Leave>", lambda e: self.hide_tip())
- c = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=c.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=c.xview)
- sf = ttk.Frame(c)
- sf.bind("<Configure>", lambda e: c.configure(scrollregion=c.bbox("all")))
- c.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- c.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- c.create_window((0, 0), window=sf, anchor="nw")
- self.ana_tree = ttk.Treeview(sf, show="headings")
- self.ana_tree.pack(fill=tk.BOTH, expand=True)
- def show_tip(self, w, t):
- self.tip = tk.Toplevel(w)
- self.tip.wm_overrideredirect(True)
- self.tip.wm_geometry(f"+{w.winfo_rootx()+20}+{w.winfo_rooty()+20}")
- tk.Label(self.tip, text=t, background="yellow", relief="solid", borderwidth=1, font=("Segoe UI", 10)).pack()
- def hide_tip(self):
- if hasattr(self, 'tip'):
- self.tip.destroy()
- def setup_sum_tab(self):
- f = ttk.Frame(self.sum_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- c = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=c.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=c.xview)
- sf = ttk.Frame(c)
- sf.bind("<Configure>", lambda e: c.configure(scrollregion=c.bbox("all")))
- c.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- c.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- c.create_window((0, 0), window=sf, anchor="nw")
- self.sum_tree = ttk.Treeview(sf, show="headings")
- self.sum_tree.pack(fill=tk.BOTH, expand=True)
- def setup_lay_tab(self):
- f = ttk.Frame(self.lay_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- c = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=c.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=c.xview)
- sf = ttk.Frame(c)
- sf.bind("<Configure>", lambda e: c.configure(scrollregion=c.bbox("all")))
- c.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- c.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- c.create_window((0, 0), window=sf, anchor="nw")
- self.lay_tree = ttk.Treeview(sf, show="headings")
- self.lay_tree.pack(fill=tk.BOTH, expand=True)
- def setup_corr_tab(self):
- f = ttk.Frame(self.corr_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- cf = ttk.Frame(f)
- cf.pack(fill=tk.X, pady=5)
- ttk.Button(cf, text="Upload SPT/Sonic", command=self.upload_corr, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(cf, text="Run Correlation", command=self.run_corr, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- c = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=c.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=c.xview)
- sf = ttk.Frame(c)
- sf.bind("<Configure>", lambda e: c.configure(scrollregion=c.bbox("all")))
- c.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- c.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- c.create_window((0, 0), window=sf, anchor="nw")
- self.corr_tree = ttk.Treeview(sf, show="headings")
- self.corr_tree.pack(fill=tk.BOTH, expand=True)
- self.corr_plot_label = ttk.Label(sf, text="Upload SPT/Sonic to view correlations.")
- self.corr_plot_label.pack(fill=tk.BOTH, expand=True)
- def setup_rt_tab(self):
- f = ttk.Frame(self.rt_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- cf = ttk.Frame(f)
- cf.pack(fill=tk.X, pady=5)
- ttk.Button(cf, text="Start Real-Time", command=self.start_rt, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(cf, text="Stop Real-Time", command=self.stop_rt, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- c = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=c.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=c.xview)
- sf = ttk.Frame(c)
- sf.bind("<Configure>", lambda e: c.configure(scrollregion=c.bbox("all")))
- c.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- c.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- c.create_window((0, 0), window=sf, anchor="nw")
- self.rt_tree = ttk.Treeview(sf, show="headings")
- self.rt_tree.pack(fill=tk.BOTH, expand=True)
- self.rt_plot_label = ttk.Label(sf, text="Start real-time to view data.")
- self.rt_plot_label.pack(fill=tk.BOTH, expand=True)
- def setup_viz_tab(self):
- f = ttk.Frame(self.viz_f)
- f.pack(pady=10, fill=tk.BOTH, expand=True)
- of = ttk.Frame(f)
- of.pack(fill=tk.X, pady=5)
- ttk.Label(of, text="Plot Params:", font=("Segoe UI", 10)).pack(side=tk.LEFT, padx=5)
- for p, v in self.plot_opts.items():
- ttk.Checkbutton(of, text=p, variable=v, command=self.update_plots).pack(side=tk.LEFT, padx=5)
- df = ttk.Frame(of)
- df.pack(side=tk.LEFT, padx=10)
- ttk.Label(df, text="Depth Range (m):").pack(side=tk.LEFT)
- ttk.Entry(df, textvariable=self.min_d_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Label(df, text="to").pack(side=tk.LEFT)
- ttk.Entry(df, textvariable=self.max_d_var, width=10).pack(side=tk.LEFT, padx=5)
- ttk.Button(df, text="Update Plot", command=self.update_plots, style="Custom.TButton").pack(side=tk.LEFT, padx=5)
- ttk.Button(of, text="Save Plots", command=self.save_plots, style="Custom.TButton").pack(side=tk.RIGHT, padx=5)
- self.canvas = tk.Canvas(f)
- sy = ttk.Scrollbar(f, orient=tk.VERTICAL, command=self.canvas.yview)
- sx = ttk.Scrollbar(f, orient=tk.HORIZONTAL, command=self.canvas.xview)
- self.scroll_f = ttk.Frame(self.canvas)
- self.scroll_f.bind("<Configure>", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all")))
- self.canvas.configure(yscrollcommand=sy.set, xscrollcommand=sx.set)
- sy.pack(side=tk.RIGHT, fill=tk.Y)
- sx.pack(side=tk.BOTTOM, fill=tk.X)
- self.canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- self.canvas.create_window((0, 0), window=self.scroll_f, anchor="nw")
- self.plot_label.pack(fill=tk.BOTH, expand=True)
- def export_report(self):
- if not shutil.which('latexmk'):
- messagebox.showerror("Error", "LaTeX not installed.")
- return
- try:
- adf = self.analyze_data(self.cur_data)
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- tp = od / f"{fn}_detailed_report_{ts}.tex"
- lc = r"""\documentclass[a4paper,12pt]{article}
- \usepackage{booktabs,longtable,geometry,amsmath,amsfonts,noto}
- \geometry{margin=1in}
- \begin{document}
- \section*{Detailed Seismic Analysis Report}
- \subsection*{Analysis Data}
- \begin{longtable}{@{}""" + "c" * len(adf.columns) + r"""@{}}
- \toprule """ + r" & ".join([f"\\textbf{{{c}}}" for c in adf.columns]) + r"""\\\midrule\endhead"""
- for _, r in adf.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) if pd.notna(v) else "" for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- if self.qual_issues is not None and not self.qual_issues.empty:
- lc += r"\section*{Quality Check Issues}\begin{longtable}{@{}" + "c" * len(self.qual_issues.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in self.qual_issues.columns]) + r"\\\midrule\endhead"
- for _, r in self.qual_issues.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- p = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio', 'Poisson’s Ratio', 'Shear Modulus (MPa)', 'Bulk Modulus (MPa)', 'Young’s Modulus (MPa)']
- vd = adf[p].dropna()
- sd = {
- 'Parameter': p,
- 'Mean': [vd[c].mean() if not vd[c].empty else np.nan for c in p],
- 'Median': [vd[c].median() if not vd[c].empty else np.nan for c in p],
- 'Std Dev': [vd[c].std() if not vd[c].empty else np.nan for c in p],
- 'Min': [vd[c].min() if not vd[c].empty else np.nan for c in p],
- 'Max': [vd[c].max() if not vd[c].empty else np.nan for c in p]
- }
- sdf = pd.DataFrame(sd)
- lc += r"\section*{Summary Statistics}\begin{longtable}{@{}" + "c" * len(sdf.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in sdf.columns]) + r"\\\midrule\endhead"
- for _, r in sdf.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- if self.layers is not None and not self.layers.empty:
- lc += r"\section*{Identified Layers}\begin{longtable}{@{}" + "c" * len(self.layers.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in self.layers.columns]) + r"\\\midrule\endhead"
- for _, r in self.layers.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- if self.corr_res is not None and not self.corr_res.empty:
- lc += r"\section*{Correlation Results}\begin{longtable}{@{}" + "c" * len(self.corr_res.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in self.corr_res.columns]) + r"\\\midrule\endhead"
- for _, r in self.corr_res.iterrows():
- lc += r" & ".join([f"{v:.3f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- lc += r"\end{document}"
- with open(tp, 'w', encoding='utf-8') as f:
- f.write(lc)
- subprocess.run(['latexmk', '-pdf', f'-outdir={od}', str(tp)], capture_output=True, text=True, check=True)
- pp = od / f"{fn}_detailed_report_{ts}.pdf"
- if pp.exists():
- self.status_var.set(f"Exported report to {pp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Report to {pp}")
- else:
- raise Exception("PDF failed")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Report export failed: {str(e)}")
- def export_ana_pdf(self):
- if not shutil.which('latexmk'):
- messagebox.showerror("Error", "LaTeX not installed.")
- return
- if self.cur_data is None:
- messagebox.showwarning("No Data", "Load file.")
- return
- try:
- df = self.analyze_data(self.cur_data)
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- tp = od / f"{fn}_analysis_{ts}.tex"
- lc = r"\documentclass[a4paper,12pt]{article}\usepackage{booktabs,longtable,geometry,amsmath,amsfonts,noto}\geometry{margin=1in}\begin{document}\section*{Analysis Report}\begin{longtable}{@{}" + "c" * len(df.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in df.columns]) + r"\\\midrule\endhead"
- for _, r in df.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) if pd.notna(v) else "" for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}\end{document}"
- with open(tp, 'w', encoding='utf-8') as f:
- f.write(lc)
- subprocess.run(['latexmk', '-pdf', f'-outdir={od}', str(tp)], capture_output=True, text=True, check=True)
- pp = od / f"{fn}_analysis_{ts}.pdf"
- if pp.exists():
- self.status_var.set(f"Exported analysis to PDF: {pp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Exported to {pp}")
- else:
- raise Exception("PDF failed")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Analysis PDF failed: {str(e)}")
- def refresh_files(self):
- files = [f for f in self.data_dir.glob("*.[cx][sv][l]*")]
- self.file_cb['values'] = [f.name for f in files]
- self.file_var.set("")
- self.clear_tabs()
- self.status_var.set("Select file.")
- def upload_file(self):
- fp = filedialog.askopenfilename(filetypes=[("Excel/CSV", "*.xlsx *.csv"), ("Excel", "*.xlsx"), ("CSV", "*.csv"), ("All", "*.*")])
- if fp:
- try:
- shutil.copy(fp, self.data_dir / Path(fp).name)
- self.refresh_files()
- self.status_var.set(f"Uploaded: {Path(fp).name} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- except Exception as e:
- messagebox.showerror("Error", f"Upload failed: {str(e)}")
- logging.error(f"Upload failed: {str(e)}")
- def create_template(self):
- try:
- df = pd.DataFrame([[3.0, 100.0, 7.91, 16.81], [4.5, 98.5, 8.0, 17.0], [6.0, 97.0, 8.5, 17.5]],
- columns=["Depth (m)", "Elevation (m)", "P wave time (ms)", "S wave time (ms)"])
- p = self.data_dir / "template_seismic_data.xlsx"
- df.to_excel(p, index=False)
- self.refresh_files()
- self.status_var.set(f"Created template: template_seismic_data.xlsx at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Template created: template_seismic_data.xlsx")
- except Exception as e:
- messagebox.showerror("Error", f"Template failed: {str(e)}")
- logging.error(f"Template failed: {str(e)}")
- def find_header_row(self, df):
- for i, r in df.iterrows():
- rv = [str(v).strip() if pd.notna(v) else "" for v in r.values]
- if "Depth" in rv:
- di = rv.index("Depth")
- if i + 1 < len(df) and str(df.iloc[i + 1, di]).strip() == "(m)":
- return i
- return None
- def load_file(self, e=None):
- sf = self.file_var.get()
- if not sf:
- return
- self.clear_tabs()
- self.cur_file = sf
- fp = self.data_dir / sf
- try:
- if sf.endswith('.xlsx'):
- xl = pd.ExcelFile(fp)
- for sn in xl.sheet_names:
- df = pd.read_excel(fp, sheet_name=sn, header=None)
- hr = self.find_header_row(df)
- if hr is not None:
- df = pd.read_excel(fp, sheet_name=sn, header=[hr, hr + 1])
- break
- else:
- raise ValueError("No 'Depth' followed by '(m)'")
- cn = []
- cc = {}
- for c in df.columns:
- cn1 = f"{c[0]} {c[1]}".strip() if c[1] and pd.notna(c[1]) else c[0]
- cc[cn1] = cc.get(cn1, 0) + 1
- cn.append(f"{cn1} {cc[cn1]}" if cc[cn1] > 1 else cn1)
- df.columns = cn
- elif sf.endswith('.csv'):
- dt = pd.read_csv(fp, header=None)
- hr = self.find_header_row(dt)
- if hr is None:
- raise ValueError("No 'Depth' followed by '(m)'")
- df = pd.read_csv(fp, header=[hr, hr + 1])
- cn = []
- cc = {}
- for c in df.columns:
- cn1 = f"{c[0]} {c[1]}".strip() if c[1] and pd.notna(c[1]) else c[0]
- cc[cn1] = cc.get(cn1, 0) + 1
- cn.append(f"{cn1} {cc[cn1]}" if cc[cn1] > 1 else cn1)
- df.columns = cn
- else:
- raise ValueError("Unsupported format")
- df.columns = [c.replace('\n', ' ') for c in df.columns]
- df.columns = ['Depth (m)' if c.startswith('Depth') else c for c in df.columns]
- ec = [c for c in df.columns if c.startswith('Elevation')]
- if ec:
- ve = next((c for c in ec if df[c].notna().any()), None)
- if ve:
- df.rename(columns={ve: 'Elevation (m)'}, inplace=True)
- df.drop(columns=[c for c in ec if c != ve], inplace=True)
- df = df.dropna(axis=1, how='all')
- df = df.loc[:, (df.notna().any()) | (df.columns == 'Depth (m)')].dropna(subset=df.columns.difference(['Depth (m)']), how='all')
- self.raw_data = df[df['Depth (m)'].notna()].copy()
- self.cur_data = self.raw_data.copy()
- if not all(c in df.columns for c in ['Depth (m)', 'Elevation (m)']):
- raise ValueError("Missing Depth/Elevation")
- self.display_raw(self.raw_data)
- adf = self.analyze_data(self.cur_data)
- self.display_ana(adf)
- self.check_qual(self.cur_data)
- self.display_qual()
- self.display_sum(adf)
- self.identify_layers(adf)
- self.display_lay()
- self.plot_viz(self.cur_data, adf)
- self.status_var.set(f"Loaded: {sf} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- logging.info(f"Loaded file {sf} with {len(self.cur_data)} rows")
- except Exception as e:
- messagebox.showerror("Error", f"Load failed: {str(e)}")
- logging.error(f"Load failed: {str(e)}")
- self.clear_tabs()
- def clear_tabs(self):
- for t in [self.raw_tree, self.qual_tree, self.ana_tree, self.sum_tree, self.lay_tree, self.corr_tree, self.rt_tree]:
- if t:
- for i in t.get_children():
- t.delete(i)
- if hasattr(self, 'plot_canvas') and self.plot_canvas:
- self.plot_canvas.get_tk_widget().destroy()
- if hasattr(self, 'cur_fig') and self.cur_fig:
- plt.close(self.cur_fig)
- self.plot_label.pack(fill=tk.BOTH, expand=True)
- self.cur_data = self.raw_data = self.cur_file = self.deviations = self.qual_issues = self.layers = self.corr_data = self.rt_data = self.cur_fig = None
- self.sel_rows = {}
- def on_select_row(self, e):
- i = self.raw_tree.identify_row(e.y)
- if not i:
- return
- idx = int(self.raw_tree.index(i))
- self.sel_rows[idx] = not self.sel_rows.get(idx, False)
- self.raw_tree.set(i, "Select", "✔" if self.sel_rows[idx] else "")
- def display_raw(self, df):
- for i in self.raw_tree.get_children():
- self.raw_tree.delete(i)
- c = ["Select"] + list(df.columns)
- self.raw_tree["columns"] = c
- for col in c:
- self.raw_tree.heading(col, text=col)
- self.raw_tree.column(col, anchor=tk.CENTER, stretch=True)
- self.sel_rows = {}
- for idx, r in df.iterrows():
- self.raw_tree.insert("", tk.END, values=[""] + [str(v) if pd.notna(v) else "" for v in r])
- self.sel_rows[idx] = False
- self.raw_tree.column("Select", width=50, minwidth=50)
- self.raw_tree.bind("<Button-1>", self.on_select_row)
- for col in c[1:]:
- dl = [len(str(r.get(col, ""))) for _, r in df.iterrows()][:100]
- self.raw_tree.column(col, width=max(dl, default=0) * 10, minwidth=150)
- def apply_sel(self):
- if self.raw_data is None:
- return
- df = self.raw_data.copy()
- try:
- md = float(self.min_d_var.get()) if self.min_d_var.get().strip() else df['Depth (m)'].min()
- xd = float(self.max_d_var.get()) if self.max_d_var.get().strip() else df['Depth (m)'].max()
- if md > xd:
- md, xd = xd, md
- df = df[(df['Depth (m)'] >= md) & (df['Depth (m)'] <= xd)]
- except ValueError:
- messagebox.showwarning("Invalid", "Enter valid depth.")
- return
- si = [i for i, s in self.sel_rows.items() if s]
- if si:
- df = df.iloc[si]
- self.cur_data = df
- adf = self.analyze_data(df)
- self.display_ana(adf)
- self.check_qual(df)
- self.display_qual()
- self.display_sum(adf)
- self.identify_layers(adf)
- self.display_lay()
- self.plot_viz(df, adf)
- self.status_var.set(f"Selected: {len(df)} rows at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- def reset_sel(self):
- if self.raw_data is None:
- return
- self.cur_data = self.raw_data.copy()
- self.min_d_var.set("")
- self.max_d_var.set("")
- self.sel_rows = {i: False for i in range(len(self.raw_data))}
- for i in self.raw_tree.get_children():
- self.raw_tree.set(i, "Select", "")
- adf = self.analyze_data(self.cur_data)
- self.display_ana(adf)
- self.check_qual(self.cur_data)
- self.display_qual()
- self.display_sum(adf)
- self.identify_layers(adf)
- self.display_lay()
- self.plot_viz(self.cur_data, adf)
- self.status_var.set(f"Reset: {len(self.cur_data)} rows at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- def check_qual(self, df):
- pc = [c for c in df.columns if "P wave time" in c]
- sc = [c for c in df.columns if "S wave time" in c]
- if not pc or not sc:
- self.qual_issues = pd.DataFrame(columns=['Depth (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)', 'Flag Reason'])
- logging.warning("No P/S wave columns found.")
- return
- df['First P-wave Time (µs)'] = df[pc].min(axis=1) * 1000
- df['First S-wave Time (µs)'] = df[sc].min(axis=1) * 1000
- pm = df['First P-wave Time (µs)'].median()
- ps = df['First P-wave Time (µs)'].std()
- sm = df['First S-wave Time (µs)'].median()
- ss = df['First S-wave Time (µs)'].std()
- dd = df['Depth (m)'].diff()
- nmi = dd[dd <= 0].index
- qi = []
- for i, r in df.iterrows():
- f = []
- if pd.notna(r['First P-wave Time (µs)']):
- if r['First P-wave Time (µs)'] < 0.5:
- f.append("P-wave time too small")
- if abs(r['First P-wave Time (µs)'] - pm) > 2 * ps:
- f.append(f"P-wave outlier (value={r['First P-wave Time (µs)']:.2f}, median={pm:.2f})")
- if pd.notna(r['First S-wave Time (µs)']):
- if r['First S-wave Time (µs)'] < 0.5:
- f.append("S-wave time too small")
- if abs(r['First S-wave Time (µs)'] - sm) > 2 * ss:
- f.append(f"S-wave outlier (value={r['First S-wave Time (µs)']:.2f}, median={sm:.2f})")
- if i in nmi and i > 0:
- f.append("Depth not monotonic")
- if f:
- qi.append([r['Depth (m)'], r['First P-wave Time (µs)'], r['First S-wave Time (µs)'], "; ".join(f)])
- logging.info(f"Flagged depth {r['Depth (m)']}: {'; '.join(f)}")
- self.qual_issues = pd.DataFrame(qi, columns=['Depth (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)', 'Flag Reason']) if qi else pd.DataFrame(columns=['Depth (m)', 'First P-wave Time (µs)', 'First S-wave Time (µs)', 'Flag Reason'])
- logging.info(f"Quality check processed {len(df)} depths, found {len(qi)} issues.")
- def display_qual(self):
- for i in self.qual_tree.get_children():
- self.qual_tree.delete(i)
- c = list(self.qual_issues.columns)
- self.qual_tree["columns"] = c
- for col in c:
- self.qual_tree.heading(col, text=col)
- self.qual_tree.column(col, anchor=tk.CENTER)
- for _, r in self.qual_issues.iterrows():
- self.qual_tree.insert("", tk.END, values=[f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r])
- for col in c:
- dl = [len(str(r.get(col, ""))) for _, r in self.qual_issues.iterrows()][:100]
- self.qual_tree.column(col, width=max(dl, default=0) * 10, minwidth=300 if col == "Flag Reason" else 150)
- def run_mc(self):
- if self.cur_data is None:
- messagebox.showwarning("No Data", "Load file.")
- return
- try:
- df = self.cur_data.copy()
- ni = 1000
- tns = 0.01
- pc = [c for c in df.columns if "P wave time" in c]
- sc = [c for c in df.columns if "S wave time" in c]
- if not pc or not sc:
- raise ValueError("No P/S wave columns")
- if df[pc + sc].isna().all().all():
- raise ValueError("All P/S wave times are missing")
- res = {'P-wave Velocity (m/s)': [], 'S-wave Velocity (m/s)': [], 'Vp/Vs Ratio': []}
- for _ in range(ni):
- ds = df.copy()
- for c in pc + sc:
- ds[c] = ds[c] * (1 + np.random.normal(0, tns, len(ds)))
- as_ = self.analyze_data(ds)
- for p in res:
- res[p].append(as_[p].values)
- for p in res:
- res[p] = np.array(res[p])
- df[f'{p} Mean'] = np.nanmean(res[p], axis=0)
- df[f'{p} CI Lower'] = np.nanpercentile(res[p], 2.5, axis=0)
- df[f'{p} CI Upper'] = np.nanpercentile(res[p], 97.5, axis=0)
- self.cur_data = df
- self.display_ana(df)
- self.status_var.set(f"Monte Carlo done at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- logging.info(f"Monte Carlo processed {len(df)} depths with {ni} iterations.")
- except Exception as e:
- messagebox.showerror("Error", f"Monte Carlo failed: {str(e)}")
- logging.error(f"Monte Carlo failed: {str(e)}")
- def upload_corr(self):
- fp = filedialog.askopenfilename(filetypes=[("Excel/CSV", "*.xlsx *.csv"), ("Excel", "*.xlsx"), ("CSV", "*.csv"), ("All", "*.*")])
- if fp:
- try:
- self.corr_data = pd.read_excel(fp) if fp.endswith('.xlsx') else pd.read_csv(fp)
- if 'Depth (m)' not in self.corr_data.columns:
- raise ValueError("SPT/Sonic needs Depth (m)")
- self.status_var.set(f"Uploaded SPT/Sonic: {Path(fp).name} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- except Exception as e:
- messagebox.showerror("Error", f"Upload failed: {str(e)}")
- logging.error(f"Corr upload failed: {str(e)}")
- def run_corr(self):
- if self.cur_data is None or self.corr_data is None:
- messagebox.showwarning("No Data", "Load CHST and SPT/Sonic.")
- return
- try:
- sdf = self.analyze_data(self.cur_data)
- mdf = pd.merge(sdf, self.corr_data, on='Depth (m)', how='inner')
- if mdf.empty:
- raise ValueError("No matching depths")
- cr = []
- sp = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio']
- cp = [c for c in self.corr_data.columns if c != 'Depth (m)']
- for s in sp:
- for c in cp:
- vd = mdf[[s, c]].dropna()
- if len(vd) > 1:
- cr.append([s, c, *scipy.stats.pearsonr(vd[s], vd[c])])
- self.corr_res = pd.DataFrame(cr, columns=['Seismic Param', 'Corr Param', 'Pearson Corr', 'P-Value'])
- self.display_corr()
- self.plot_corr(mdf)
- self.status_var.set(f"Correlation done at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- except Exception as e:
- messagebox.showerror("Error", f"Correlation failed: {str(e)}")
- logging.error(f"Correlation failed: {str(e)}")
- def display_corr(self):
- for i in self.corr_tree.get_children():
- self.corr_tree.delete(i)
- c = list(self.corr_res.columns)
- self.corr_tree["columns"] = c
- for col in c:
- self.corr_tree.heading(col, text=col)
- self.corr_tree.column(col, anchor=tk.CENTER)
- for _, r in self.corr_res.iterrows():
- self.corr_tree.insert("", tk.END, values=[f"{v:.3f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r])
- for col in c:
- dl = [len(str(r.get(col, ""))) for _, r in self.corr_res.iterrows()][:100]
- self.corr_tree.column(col, width=max(dl, default=0) * 10, minwidth=150)
- def plot_corr(self, mdf):
- if hasattr(self, 'corr_canvas') and self.corr_canvas:
- self.corr_canvas.get_tk_widget().destroy()
- if hasattr(self, 'corr_fig') and self.corr_fig:
- plt.close(self.corr_fig)
- self.corr_plot_label.pack_forget()
- sp = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)']
- cp = [c for c in mdf.columns if c not in sp + ['Depth (m)']]
- if not cp:
- self.corr_plot_label = ttk.Label(self.corr_tree.master, text="No corr params.")
- self.corr_plot_label.pack(fill=tk.BOTH, expand=True)
- return
- np = len(sp) * len(cp)
- r = (np + 2) // 3
- self.corr_fig, ax = plt.subplots(r, 3, figsize=(15, 4 * r))
- ax = ax.flatten()
- pi = 0
- for s in sp:
- for c in cp:
- vd = mdf[[s, c]].dropna()
- if len(vd) > 1:
- ax[pi].scatter(vd[s], vd[c], c='b', alpha=0.5)
- ax[pi].set_xlabel(s)
- ax[pi].set_ylabel(c)
- ax[pi].set_title(f'{s} vs {c}')
- ax[pi].grid(True)
- pi += 1
- for i in range(pi, len(ax)):
- ax[i].axis('off')
- plt.tight_layout()
- self.corr_canvas = FigureCanvasTkAgg(self.corr_fig, master=self.corr_tree.master)
- self.corr_canvas.draw()
- self.corr_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
- def start_rt(self):
- if self.rt_running:
- return
- self.rt_running = True
- self.rt_data = pd.DataFrame(columns=['Depth (m)', 'Elevation (m)', 'P wave time (ms)', 'S wave time (ms)'])
- self.display_rt()
- self.rt_thread = threading.Thread(target=self.rt_acq, daemon=True)
- self.rt_thread.start()
- self.status_var.set("Started real-time.")
- def stop_rt(self):
- self.rt_running = False
- self.status_var.set("Stopped real-time.")
- def rt_acq(self):
- rf = self.data_dir / "realtime_chst_data.csv"
- while self.rt_running:
- try:
- if rf.exists():
- dn = pd.read_csv(rf)
- if not dn.empty:
- self.rt_data = pd.concat([self.rt_data, dn]).drop_duplicates(subset=['Depth (m)']).reset_index(drop=True)
- self.root.after(0, self.update_rt_display)
- time.sleep(1)
- except Exception as e:
- logging.error(f"Real-time error: {str(e)}")
- def update_rt_display(self):
- self.display_rt()
- self.plot_rt()
- def display_rt(self):
- for i in self.rt_tree.get_children():
- self.rt_tree.delete(i)
- c = list(self.rt_data.columns)
- self.rt_tree["columns"] = c
- for col in c:
- self.rt_tree.heading(col, text=col)
- self.rt_tree.column(col, anchor=tk.CENTER)
- for _, r in self.rt_data.iterrows():
- self.rt_tree.insert("", tk.END, values=[f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r])
- for col in c:
- dl = [len(str(r.get(col, ""))) for _, r in self.rt_data.iterrows()][:100]
- self.corr_tree.column(col, width=max(dl, default=0) * 10, minwidth=150)
- def plot_rt(self):
- if hasattr(self, 'rt_canvas') and self.rt_canvas:
- self.rt_canvas.get_tk_widget().destroy()
- if hasattr(self, 'rt_fig') and self.rt_fig:
- plt.close(self.rt_fig)
- self.rt_plot_label.pack_forget()
- if self.rt_data.empty:
- self.rt_plot_label = ttk.Label(self.rt_tree.master, text="No real-time data.")
- self.rt_plot_label.pack(fill=tk.BOTH, expand=True)
- return
- adf = self.analyze_data(self.rt_data)
- self.rt_fig, ax = plt.subplots(figsize=(10, 5))
- ax.plot(adf['P-wave Velocity (m/s)'], adf['Depth (m)'], 'b-', label='P-wave')
- ax.plot(adf['S-wave Velocity (m/s)'], adf['Depth (m)'], 'r-', label='S-wave')
- ax.set_xlabel('Velocity (m/s)')
- ax.set_ylabel('Depth (m)')
- ax.set_title('Real-Time Velocities')
- ax.invert_yaxis()
- ax.legend()
- ax.grid(True)
- self.rt_canvas = FigureCanvasTkAgg(self.rt_fig, master=self.rt_tree.master)
- self.rt_canvas.draw()
- self.rt_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
- def analyze_data(self, df):
- df = df.copy()
- df['Depth (m)'] = pd.to_numeric(df['Depth (m)'], errors='coerce')
- pc = [c for c in df.columns if "P wave time" in c]
- sc = [c for c in df.columns if "S wave time" in c]
- if not pc or not sc:
- raise ValueError("No P/S wave columns")
- for c in pc + sc:
- df[c] = pd.to_numeric(df[c], errors='coerce')
- df['First P-wave Time (µs)'] = df[pc].min(axis=1) * 1000
- df['First S-wave Time (µs)'] = df[sc].min(axis=1) * 1000
- df['P-wave Time Diff'] = df['First P-wave Time (µs)'].diff()
- df['S-wave Time Diff'] = df['First S-wave Time (µs)'].diff()
- df['P-wave Velocity (m/s)'] = np.nan
- df['S-wave Velocity (m/s)'] = np.nan
- mt = 0.001
- for i in range(1, len(df)):
- dd = df['Depth (m)'].iloc[i] - df['Depth (m)'].iloc[i-1]
- dpt = df['P-wave Time Diff'].iloc[i] / 10**6 if pd.notna(df['P-wave Time Diff'].iloc[i]) else np.nan
- dst = df['S-wave Time Diff'].iloc[i] / 10**6 if pd.notna(df['S-wave Time Diff'].iloc[i]) else np.nan
- if pd.notna(dpt) and abs(dpt) > mt and dd > 0:
- pv = abs(dd / dpt)
- if 10 <= pv <= 10000:
- df['P-wave Velocity (m/s)'].iloc[i] = pv
- else:
- logging.warning(f"Invalid P-wave velocity {pv:.2f} at depth {df['Depth (m)'].iloc[i]}")
- if pd.notna(dst) and abs(dst) > mt and dd > 0:
- sv = abs(dd / dst)
- if 10 <= sv <= 10000:
- df['S-wave Velocity (m/s)'].iloc[i] = sv
- else:
- logging.warning(f"Invalid S-wave velocity {sv:.2f} at depth {df['Depth (m)'].iloc[i]}")
- df['Vp/Vs Ratio'] = df['P-wave Velocity (m/s)'] / df['S-wave Velocity (m/s)']
- df['Poisson’s Ratio'] = (df['Vp/Vs Ratio']**2 - 2) / (2 * (df['Vp/Vs Ratio']**2 - 1))
- d = 2000
- df['Shear Modulus (MPa)'] = d * df['S-wave Velocity (m/s)']**2 / 10**6
- df['Bulk Modulus (MPa)'] = d * (df['P-wave Velocity (m/s)']**2 - (4/3) * df['S-wave Velocity (m/s)']**2) / 10**6
- df['Young’s Modulus (MPa)'] = df['Shear Modulus (MPa)'] * (3 * df['Bulk Modulus (MPa)'] + df['Shear Modulus (MPa)']) / (df['Bulk Modulus (MPa)'] + df['Shear Modulus (MPa)'])
- logging.info(f"Analyzed {len(df)} depths, P-wave NaNs: {df['P-wave Velocity (m/s)'].isna().sum()}, S-wave NaNs: {df['S-wave Velocity (m/s)'].isna().sum()}")
- return df
- def display_ana(self, df):
- for i in self.ana_tree.get_children():
- self.ana_tree.delete(i)
- c = ['Depth (m)', 'P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio', 'Poisson’s Ratio', 'Shear Modulus (MPa)', 'Bulk Modulus (MPa)', 'Young’s Modulus (MPa)'] + \
- [c for c in df.columns if c.endswith('Mean') or c.endswith('CI Lower') or c.endswith('CI Upper')]
- self.ana_tree["columns"] = c
- for col in c:
- self.ana_tree.heading(col, text=col)
- self.ana_tree.column(col, anchor=tk.CENTER)
- for _, r in df.iterrows():
- self.ana_tree.insert("", tk.END, values=[f"{r.get(col, np.nan):.2f}" if isinstance(r.get(col), (int, float)) and pd.notna(r.get(col)) else "" for col in c])
- for col in c:
- dl = [len(str(r.get(col, ""))) for _, r in df.iterrows()][:100]
- self.ana_tree.column(col, width=max(dl, default=0) * 10, minwidth=150)
- def identify_layers(self, df):
- vt = 0.2
- l = []
- cl = {'Start Depth': df['Depth (m)'].iloc[0], 'End Depth': df['Depth (m)'].iloc[0], 'P-wave Velocity': [], 'S-wave Velocity': []}
- for i in range(1, len(df)):
- pv = df['P-wave Velocity (m/s)'].iloc[i]
- sv = df['S-wave Velocity (m/s)'].iloc[i]
- if pd.notna(pv) and pd.notna(sv) and 10 <= pv <= 10000 and 10 <= sv <= 10000:
- cl['P-wave Velocity'].append(pv)
- cl['S-wave Velocity'].append(sv)
- cl['End Depth'] = df['Depth (m)'].iloc[i]
- if i < len(df) - 1:
- npv = df['P-wave Velocity (m/s)'].iloc[i + 1]
- if pd.notna(npv) and abs(pv - npv) / pv > vt:
- ld = {
- 'Start Depth (m)': cl['Start Depth'],
- 'End Depth (m)': cl['End Depth'],
- 'Avg P-wave Velocity (m/s)': np.mean(cl['P-wave Velocity']) if cl['P-wave Velocity'] else np.nan,
- 'Avg S-wave Velocity (m/s)': np.mean(cl['S-wave Velocity']) if cl['S-wave Velocity'] else np.nan
- }
- l.append(ld)
- cl = {'Start Depth': df['Depth (m)'].iloc[i + 1], 'End Depth': df['Depth (m)'].iloc[i + 1], 'P-wave Velocity': [], 'S-wave Velocity': []}
- else:
- logging.warning(f"Skipped layer at depth {df['Depth (m)'].iloc[i]}: P-wave={pv}, S-wave={sv}")
- if cl['P-wave Velocity']:
- ld = {
- 'Start Depth (m)': cl['Start Depth'],
- 'End Depth (m)': cl['End Depth'],
- 'Avg P-wave Velocity (m/s)': np.mean(cl['P-wave Velocity']) if cl['P-wave Velocity'] else np.nan,
- 'Avg S-wave Velocity (m/s)': np.mean(cl['S-wave Velocity']) if cl['S-wave Velocity'] else np.nan
- }
- l.append(ld)
- self.layers = pd.DataFrame(l) if l else pd.DataFrame(columns=['Start Depth (m)', 'End Depth (m)', 'Avg P-wave Velocity (m/s)', 'Avg S-wave Velocity (m/s)'])
- logging.info(f"Identified {len(l)} layers, checked depths {df['Depth (m)'].min()} to {df['Depth (m)'].max()}")
- def display_lay(self):
- for i in self.lay_tree.get_children():
- self.lay_tree.delete(i)
- c = list(self.layers.columns)
- self.lay_tree["columns"] = c
- for col in c:
- self.lay_tree.heading(col, text=col)
- self.lay_tree.column(col, anchor=tk.CENTER)
- for _, r in self.layers.iterrows():
- self.lay_tree.insert("", tk.END, values=[f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r])
- for col in c:
- dl = [len(str(r.get(col, ""))) for _, r in self.layers.iterrows()][:100]
- self.lay_tree.column(col, width=max(dl, default=0) * 10, minwidth=200)
- def display_sum(self, df):
- for i in self.sum_tree.get_children():
- self.sum_tree.delete(i)
- p = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio', 'Poisson’s Ratio', 'Shear Modulus (MPa)', 'Bulk Modulus (MPa)', 'Young’s Modulus (MPa)']
- vd = df[p].dropna()
- sd = {
- 'Parameter': p,
- 'Mean': [vd[c].mean() if not vd[c].empty else np.nan for c in p],
- 'Median': [vd[c].median() if not vd[c].empty else np.nan for c in p],
- 'Std Dev': [vd[c].std() if not vd[c].empty else np.nan for c in p],
- 'Min': [vd[c].min() if not vd[c].empty else np.nan for c in p],
- 'Max': [vd[c].max() if not vd[c].empty else np.nan for c in p]
- }
- sdf = pd.DataFrame(sd)
- self.sum_tree["columns"] = list(sdf.columns)
- for col in sdf.columns:
- self.sum_tree.heading(col, text=col)
- self.sum_tree.column(col, anchor=tk.CENTER)
- for _, r in sdf.iterrows():
- self.sum_tree.insert("", tk.END, values=[f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r])
- for col in sdf.columns:
- dl = [len(str(r.get(col, ""))) for _, r in sdf.iterrows()][:100]
- self.sum_tree.column(col, width=max(dl, default=0) * 10, minwidth=150)
- logging.info(f"Summary stats for P-wave Velocity: Mean={sd['Mean'][0]:.2f}, Median={sd['Median'][0]:.2f}, Min={sd['Min'][0]:.2f}, Max={sd['Max'][0]:.2f}")
- def plot_viz(self, rdf, adf):
- if hasattr(self, 'plot_canvas') and self.plot_canvas:
- self.plot_canvas.get_tk_widget().destroy()
- if hasattr(self, 'cur_fig') and self.cur_fig:
- plt.close(self.cur_fig)
- self.plot_label.pack_forget()
- if adf.empty:
- self.plot_label = ttk.Label(self.scroll_f, text="No data for viz.")
- self.plot_label.pack(fill=tk.BOTH, expand=True)
- return
- try:
- md = float(self.min_d_var.get()) if self.min_d_var.get().strip() else adf['Depth (m)'].min()
- xd = float(self.max_d_var.get()) if self.max_d_var.get().strip() else adf['Depth (m)'].max()
- if md > xd:
- md, xd = xd, md
- pdf = adf[(adf['Depth (m)'] >= md) & (adf['Depth (m)'] <= xd)]
- except ValueError:
- pdf = adf
- ap = [p for p, v in self.plot_opts.items() if v.get()]
- if not ap:
- self.plot_label = ttk.Label(self.scroll_f, text="No params selected.")
- self.plot_label.pack(fill=tk.BOTH, expand=True)
- return
- np = len(ap)
- r = (np + 2) // 3
- self.cur_fig, ax = plt.subplots(r, min(np, 3), figsize=(15, 4 * r))
- ax = [ax] if np == 1 else ax.flatten()
- pi = 0
- for p in ap:
- if p == "Travel Time Deviations":
- if self.qual_issues is not None and not self.qual_issues.empty:
- ax[pi].scatter(self.qual_issues['First P-wave Time (µs)'], self.qual_issues['Depth (m)'], c='r', label='P-wave Dev', alpha=0.5)
- ax[pi].scatter(self.qual_issues['First S-wave Time (µs)'], self.qual_issues['Depth (m)'], c='b', label='S-wave Dev', alpha=0.5)
- ax[pi].set_xlabel('Time (µs)')
- ax[pi].set_ylabel('Depth (m)')
- ax[pi].set_title('Travel Time Deviations')
- ax[pi].invert_yaxis()
- ax[pi].legend()
- ax[pi].grid(True)
- pi += 1
- else:
- if p in pdf.columns:
- ax[pi].plot(pdf[p], pdf['Depth (m)'], 'b-')
- ax[pi].set_xlabel(p)
- ax[pi].set_ylabel('Depth (m)')
- ax[pi].set_title(p)
- ax[pi].invert_yaxis()
- ax[pi].grid(True)
- pi += 1
- for i in range(pi, len(ax)):
- ax[i].axis('off')
- plt.tight_layout()
- self.plot_canvas = FigureCanvasTkAgg(self.cur_fig, master=self.scroll_f)
- self.plot_canvas.draw()
- self.plot_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
- def update_plots(self):
- if self.cur_data is not None:
- adf = self.analyze_data(self.cur_data)
- self.plot_viz(self.cur_data, adf)
- def save_plots(self):
- if self.cur_fig is None:
- messagebox.showwarning("No Plot", "No plot to save.")
- return
- fp = filedialog.asksaveasfilename(defaultextension=".png", filetypes=[("PNG", "*.png"), ("All", "*.*")])
- if fp:
- try:
- self.cur_fig.savefig(fp, dpi=300)
- self.status_var.set(f"Saved plot to {fp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- except Exception as e:
- messagebox.showerror("Error", f"Save failed: {str(e)}")
- logging.error(f"Plot save failed: {str(e)}")
- def export_raw(self):
- if self.raw_data is None:
- messagebox.showwarning("No Data", "Load file.")
- return
- try:
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- cp = od / f"{fn}_raw_data_{ts}.csv"
- self.raw_data.to_csv(cp, index=False)
- self.status_var.set(f"Exported raw to {cp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Raw data to {cp}")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Raw export failed: {str(e)}")
- def export_qual(self):
- if self.qual_issues is None:
- messagebox.showwarning("No Data", "Load file and check quality.")
- return
- try:
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- cp = od / f"{fn}_quality_check_{ts}.csv"
- self.qual_issues.to_csv(cp, index=False)
- self.status_var.set(f"Exported quality to {cp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Quality to {cp}")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Quality export failed: {str(e)}")
- def export_sum(self):
- if self.cur_data is None:
- messagebox.showwarning("No Data", "Load file.")
- return
- try:
- adf = self.analyze_data(self.cur_data)
- p = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio', 'Poisson’s Ratio', 'Shear Modulus (MPa)', 'Bulk Modulus (MPa)', 'Young’s Modulus (MPa)']
- vd = adf[p].dropna()
- sd = {
- 'Parameter': p,
- 'Mean': [vd[c].mean() if not vd[c].empty else np.nan for c in p],
- 'Median': [vd[c].median() if not vd[c].empty else np.nan for c in p],
- 'Std Dev': [vd[c].std() if not vd[c].empty else np.nan for c in p],
- 'Min': [vd[c].min() if not vd[c].empty else np.nan for c in p],
- 'Max': [vd[c].max() if not vd[c].empty else np.nan for c in p]
- }
- sdf = pd.DataFrame(sd)
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- cp = od / f"{fn}_summary_{ts}.csv"
- sdf.to_csv(cp, index=False)
- self.status_var.set(f"Exported summary to {cp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Summary to {cp}")
- logging.info(f"Exported summary: Mean P-wave={sd['Mean'][0]:.2f}, Median P-wave={sd['Median'][0]:.2f}")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Summary export failed: {str(e)}")
- def export_lay(self):
- if self.layers is None:
- messagebox.showwarning("No Data", "Load file and identify layers.")
- return
- try:
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- cp = od / f"{fn}_layers_{ts}.csv"
- self.layers.to_csv(cp, index=False)
- self.status_var.set(f"Exported layers to {cp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Layers to {cp}")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Layers export failed: {str(e)}")
- def export_corr(self):
- if self.corr_res is None:
- messagebox.showwarning("No Data", "Run correlation.")
- return
- try:
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- cp = od / f"{fn}_correlation_{ts}.csv"
- self.corr_res.to_csv(cp, index=False)
- self.status_var.set(f"Exported correlation to {cp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Correlation to {cp}")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Correlation export failed: {str(e)}")
- def export_report(self):
- if self.cur_data is None:
- messagebox.showwarning("No Data", "Load file.")
- return
- if not shutil.which('latexmk'):
- messagebox.showerror("Error", "LaTeX not installed.")
- return
- try:
- adf = self.analyze_data(self.cur_data)
- ts = datetime.now().strftime('%Y%m%d_%H%M')
- od = self.out_dir / ts
- od.mkdir(exist_ok=True)
- fn = Path(self.cur_file).stem
- tp = od / f"{fn}_detailed_report_{ts}.tex"
- lc = r"""\documentclass[a4paper,12pt]{article}
- \usepackage{booktabs,longtable,geometry,amsmath,amsfonts,noto}
- \geometry{margin=1in}
- \begin{document}
- \section*{Detailed Seismic Analysis Report}
- \subsection*{Analysis Data}
- \begin{longtable}{@{}""" + "c" * len(adf.columns) + r"""@{}}
- """
- \toprule " + r" & ".join([f"\\textbf{{{c}}}" for c in adf.columns]) + r"\\\midrule\endhead"
- for _, r in adf.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) if pd.notna(v) else "" for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- if self.qual_issues is not None and not self.qual_issues.empty:
- lc += r"\section*{Quality Check Issues}\begin{longtable}{@{}" + "c" * len(self.qual_issues.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in self.qual_issues.columns]) + r"\\\midrule\endhead"
- for _, r in self.qual_issues.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- p = ['P-wave Velocity (m/s)', 'S-wave Velocity (m/s)', 'Vp/Vs Ratio', 'Poisson’s Ratio', 'Shear Modulus (MPa)', 'Bulk Modulus (MPa)', 'Young’s Modulus (MPa)']
- vd = adf[p].dropna()
- sd = {
- 'Parameter': p,
- 'Mean': [vd[c].mean() if not vd[c].empty else np.nan for c in p],
- 'Median': [vd[c].median() if not vd[c].empty else np.nan for c in p],
- 'Std Dev': [vd[c].std() if not vd[c].empty else np.nan for c in p],
- 'Min': [vd[c].min() if not vd[c].empty else np.nan for c in p],
- 'Max': [vd[c].max() if not vd[c].empty else np.nan for c in p]
- }
- sdf = pd.DataFrame(sd)
- lc += r"\section*{Summary Statistics}\begin{longtable}{@{}" + "c" * len(sdf.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in sdf.columns]) + r"\\\midrule\endhead"
- for _, r in sdf.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- if self.layers is not None and not self.layers.empty:
- lc += r"\section*{Identified Layers}\begin{longtable}{@{}" + "c" * len(self.layers.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in self.layers.columns]) + r"\\\midrule\endhead"
- for _, r in self.layers.iterrows():
- lc += r" & ".join([f"{v:.2f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- if self.corr_res is not None and not self.corr_res.empty:
- lc += r"\section*{Correlation Results}\begin{longtable}{@{}" + "c" * len(self.corr_res.columns) + r"@{}}\toprule" + r" & ".join([f"\\textbf{{{c}}}" for c in self.corr_res.columns]) + r"\\\midrule\endhead"
- for _, r in self.corr_res.iterrows():
- lc += r" & ".join([f"{v:.3f}" if isinstance(v, (int, float)) and pd.notna(v) else str(v) for v in r]) + r"\\\midrule" + "\n"
- lc += r"\bottomrule\end{longtable}"
- lc += r"\end{document}"
- with open(tp, 'w', encoding='utf-8') as f:
- f.write(lc)
- subprocess.run(['latexmk', '-pdf', f'-outdir={od}', str(tp)], capture_output=True, text=True, check=True)
- pp = od / f"{fn}_detailed_report_{ts}.pdf"
- if pp.exists():
- self.status_var.set(f"Exported report to {pp} at {datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%I:%M %p IST')}")
- messagebox.showinfo("Success", f"Report to {pp}")
- else:
- raise Exception("PDF failed")
- except Exception as e:
- messagebox.showerror("Error", f"Export failed: {str(e)}")
- logging.error(f"Report export failed: {str(e)}")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement