Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import hassapi as hass
- from influxdb import InfluxDBClient
- import datetime
- import pandas as pd
- from langchain_openai import OpenAI
- class InfluxDBTelemetry(hass.Hass):
- def initialize(self):
- """Initialize the InfluxDB connection and settings."""
- # Get configuration from AppDaemon args
- self.influx_host = self.args["influx_host"]
- self.influx_port = self.args["influx_port"]
- self.influx_user = self.args["influx_user"]
- self.influx_password = self.args["influx_password"]
- self.database = self.args["database"]
- # Get OpenAI configuration
- self.openai_api_key = self.args["openai_api_key"]
- self.role = self.args["role"]
- self.instructions = self.args["instructions"]
- # Initialize OpenAI client
- try:
- self.openai = OpenAI(
- api_key=self.openai_api_key,
- temperature=0
- )
- self.log("Successfully initialized OpenAI client")
- except Exception as e:
- self.error(f"Failed to initialize OpenAI client: {str(e)}")
- return
- # Initialize InfluxDB client
- try:
- self.client = InfluxDBClient(
- host=self.influx_host,
- port=self.influx_port,
- username=self.influx_user,
- password=self.influx_password,
- database=self.database
- )
- self.log("Successfully connected to InfluxDB")
- self.query_and_process_data()
- except Exception as e:
- self.error(f"Failed to connect to InfluxDB: {str(e)}")
- return
- def query_and_process_data(self):
- """Query data from InfluxDB, process it using pandas, and send to OpenAI"""
- try:
- self.log("Querying data from InfluxDB")
- self.query = self.args["query"]
- self.result = self.client.query(self.query)
- # Convert query results to pandas DataFrame
- self.df = pd.DataFrame(self.result.get_points())
- if not self.df.empty:
- # Get all columns except 'time'
- self.value_columns = [col for col in self.df.columns if col != 'time']
- # Check if there are any non-null values in columns other than timestamp
- self.df['has_values'] = self.df[self.value_columns].notna().any(axis=1)
- # Filter out rows that only contain timestamp
- self.filtered_df = self.df[self.df['has_values']].drop('has_values', axis=1)
- self.log(f"Original dataset shape: {self.df.shape}")
- self.log(f"Filtered dataset shape: {self.filtered_df.shape}")
- self.log(f"Removed {self.df.shape[0] - self.filtered_df.shape[0]} rows with only timestamp values")
- # Prepare prompt for OpenAI
- self.prompt = f"{self.role}\n\n{self.instructions}\n\nData:\n{self.filtered_df.to_string()}"
- # Send to OpenAI
- try:
- self.response = self.openai.predict(self.prompt)
- self.log("Successfully received response from OpenAI")
- self.log(f"OpenAI Response: {self.response}")
- return self.response
- except Exception as e:
- self.error(f"OpenAI API call failed: {str(e)}")
- return None
- else:
- self.log("Query returned no data")
- return None
- except Exception as e:
- self.error(f"Query or data processing failed: {str(e)}")
- return None
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement