Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import base64
- import pickle
- from google.auth.transport.requests import Request
- from googleapiclient.discovery import build
- def get_email_service():
- with open("token.pkl", "rb") as token:
- creds = pickle.load(token)
- if creds and creds.expired and creds.refresh_token:
- creds.refresh(Request())
- service = build("gmail", "v1", credentials = creds)
- return service
- def fetch_unread_emails_from_label(model, label_name="Test"):
- service = get_email_service()
- label_test = get_label_id(service, label_name)
- label_ham = get_label_id(service, "ham")
- label_spam = get_label_id(service, "spam2")
- response = service.users().messages().list(
- userId='me',
- labelIds=[label_test, 'UNREAD'],
- maxResults=100
- ).execute()
- messages = response.get('messages', [])
- email_list = []
- for msg in messages:
- msg_id = msg['id']
- message = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
- payload = message.get('payload', {})
- headers = payload.get('headers', [])
- subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '')
- body = get_message_body(payload)
- full_text = f"{subject} {body.strip()}"
- prediction = predict(model, full_text)
- add_labels = [label_spam if prediction == 'spam' else label_ham]
- remove_labels = [label_test]
- service.users().messages().modify(
- userId='me',
- id=msg_id,
- body={
- 'addLabelIds': add_labels,
- 'removeLabelIds': remove_labels
- }
- ).execute()
- print(f"[ZMIANA] Wiadomość '{subject[:40]}...' → {prediction.upper()} (etykieta zmieniona)")
- def get_label_id(service, label_name):
- labels = service.users().labels().list(userId='me').execute().get('labels', [])
- for label in labels:
- if label['name'].lower() == label_name.lower():
- return label['id']
- return None
- def get_message_body(payload):
- parts = payload.get('parts')
- if parts:
- for part in parts:
- if part['mimeType'] == 'text/plain':
- data = part['body'].get('data')
- if data:
- return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
- else:
- body_data = payload['body'].get('data')
- if body_data:
- return base64.urlsafe_b64decode(body_data).decode('utf-8', errors='ignore')
- return "(brak treści)"
Add Comment
Please, Sign In to add comment