gandalfbialy

Untitled

May 31st, 2025
30
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.48 KB | None | 0 0
  1. import base64
  2. import pickle
  3. from google.auth.transport.requests import Request
  4. from googleapiclient.discovery import build
  5.  
  6. def get_email_service():
  7. with open("token.pkl", "rb") as token:
  8. creds = pickle.load(token)
  9.  
  10. if creds and creds.expired and creds.refresh_token:
  11. creds.refresh(Request())
  12.  
  13. service = build("gmail", "v1", credentials = creds)
  14. return service
  15.  
  16. def fetch_unread_emails_from_label(model, label_name="Test"):
  17. service = get_email_service()
  18.  
  19. label_test = get_label_id(service, label_name)
  20. label_ham = get_label_id(service, "ham")
  21. label_spam = get_label_id(service, "spam2")
  22.  
  23. response = service.users().messages().list(
  24. userId='me',
  25. labelIds=[label_test, 'UNREAD'],
  26. maxResults=100
  27. ).execute()
  28.  
  29. messages = response.get('messages', [])
  30. email_list = []
  31.  
  32. for msg in messages:
  33. msg_id = msg['id']
  34. message = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
  35. payload = message.get('payload', {})
  36. headers = payload.get('headers', [])
  37.  
  38. subject = next((h['value'] for h in headers if h['name'] == 'Subject'), '')
  39. body = get_message_body(payload)
  40. full_text = f"{subject} {body.strip()}"
  41.  
  42. prediction = predict(model, full_text)
  43.  
  44. add_labels = [label_spam if prediction == 'spam' else label_ham]
  45. remove_labels = [label_test]
  46.  
  47. service.users().messages().modify(
  48. userId='me',
  49. id=msg_id,
  50. body={
  51. 'addLabelIds': add_labels,
  52. 'removeLabelIds': remove_labels
  53. }
  54. ).execute()
  55.  
  56. print(f"[ZMIANA] Wiadomość '{subject[:40]}...' → {prediction.upper()} (etykieta zmieniona)")
  57.  
  58. def get_label_id(service, label_name):
  59. labels = service.users().labels().list(userId='me').execute().get('labels', [])
  60. for label in labels:
  61. if label['name'].lower() == label_name.lower():
  62. return label['id']
  63. return None
  64.  
  65.  
  66. def get_message_body(payload):
  67. parts = payload.get('parts')
  68. if parts:
  69. for part in parts:
  70. if part['mimeType'] == 'text/plain':
  71. data = part['body'].get('data')
  72. if data:
  73. return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
  74. else:
  75. body_data = payload['body'].get('data')
  76. if body_data:
  77. return base64.urlsafe_b64decode(body_data).decode('utf-8', errors='ignore')
  78. return "(brak treści)"
  79.  
  80.  
Add Comment
Please, Sign In to add comment