Module nepse_tools.utils.notification_mediums.email

Expand source code
import os
import smtplib
import ssl
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

from decouple import config


class EmailManager:
    def __init__(self):
        self._sender_email: str = config("MAIL_EMAIL")
        self._mail_password: str = config("MAIL_PASSWORD")

        self.smtp_server_address: str = config("MAIL_SMTP_SERVER_ADDRESS")
        self.smtp_server_port: int = int(config("MAIL_SMTP_SERVER_PORT"))
        self.email_context = ssl.create_default_context()

    @staticmethod
    def get_send_mail_kwargs(
            subject: str,
            plain_message: str,
            html_message: str,
            receiver_emails: list[str],
            attachment_file_paths: list[str] = None,
    ) -> dict:
        return {
            "subject": subject,
            "plain_message": plain_message,
            "html_message": html_message,
            "receiver_emails": receiver_emails,
            "attachment_file_paths": attachment_file_paths
        }

    @staticmethod
    def attach_documents_to_email(message: MIMEMultipart, attachment_file_paths: list[str]):
        for attachment_file_path in attachment_file_paths:
            with open(attachment_file_path, "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())

            encoders.encode_base64(part)
            part.add_header(
                "Content-Disposition",
                f"attachment; filename= {os.path.basename(attachment_file_path)}",
            )
            message.attach(part)

    def _send_email(
            self,
            subject: str,
            plain_message: str,
            receiver_email: list[str],
            html_message: str = None,
            attachment_file_paths: list[str] = None,
            server: smtplib.SMTP_SSL = None,
    ):
        server.login(self._sender_email, self._mail_password)

        message = MIMEMultipart("alternative")

        if attachment_file_paths is not None:
            self.attach_documents_to_email(message, attachment_file_paths)

        message["Subject"] = subject
        message["From"] = self._sender_email
        message["To"] = ",".join(receiver_email)

        message.attach(MIMEText(plain_message, "plain"))
        if html_message:
            # Second one will try to render first in client side.
            message.attach(MIMEText(html_message, "html"))

        server.sendmail(
            self._sender_email, receiver_email, message.as_string()
        )

    def send_email(
            self,
            subject: str,
            plain_message: str,
            receiver_email: str | list[str],
            html_message: str = None,
            attachment_file_paths: list[str] = None,
            server: smtplib.SMTP_SSL = None
    ) -> bool:
        if type(receiver_email) is str:
            receiver_email = [receiver_email]

        if server is None:
            with smtplib.SMTP_SSL(
                    self.smtp_server_address,
                    self.smtp_server_port,
                    context=self.email_context
            ) as server:
                self._send_email(
                    subject,
                    plain_message,
                    receiver_email,
                    html_message,
                    attachment_file_paths,
                    server
                )
        else:
            self._send_email(
                subject,
                plain_message,
                receiver_email,
                html_message,
                attachment_file_paths,
                server
            )

        return True

Classes

class EmailManager
Expand source code
class EmailManager:
    def __init__(self):
        self._sender_email: str = config("MAIL_EMAIL")
        self._mail_password: str = config("MAIL_PASSWORD")

        self.smtp_server_address: str = config("MAIL_SMTP_SERVER_ADDRESS")
        self.smtp_server_port: int = int(config("MAIL_SMTP_SERVER_PORT"))
        self.email_context = ssl.create_default_context()

    @staticmethod
    def get_send_mail_kwargs(
            subject: str,
            plain_message: str,
            html_message: str,
            receiver_emails: list[str],
            attachment_file_paths: list[str] = None,
    ) -> dict:
        return {
            "subject": subject,
            "plain_message": plain_message,
            "html_message": html_message,
            "receiver_emails": receiver_emails,
            "attachment_file_paths": attachment_file_paths
        }

    @staticmethod
    def attach_documents_to_email(message: MIMEMultipart, attachment_file_paths: list[str]):
        for attachment_file_path in attachment_file_paths:
            with open(attachment_file_path, "rb") as attachment:
                part = MIMEBase("application", "octet-stream")
                part.set_payload(attachment.read())

            encoders.encode_base64(part)
            part.add_header(
                "Content-Disposition",
                f"attachment; filename= {os.path.basename(attachment_file_path)}",
            )
            message.attach(part)

    def _send_email(
            self,
            subject: str,
            plain_message: str,
            receiver_email: list[str],
            html_message: str = None,
            attachment_file_paths: list[str] = None,
            server: smtplib.SMTP_SSL = None,
    ):
        server.login(self._sender_email, self._mail_password)

        message = MIMEMultipart("alternative")

        if attachment_file_paths is not None:
            self.attach_documents_to_email(message, attachment_file_paths)

        message["Subject"] = subject
        message["From"] = self._sender_email
        message["To"] = ",".join(receiver_email)

        message.attach(MIMEText(plain_message, "plain"))
        if html_message:
            # Second one will try to render first in client side.
            message.attach(MIMEText(html_message, "html"))

        server.sendmail(
            self._sender_email, receiver_email, message.as_string()
        )

    def send_email(
            self,
            subject: str,
            plain_message: str,
            receiver_email: str | list[str],
            html_message: str = None,
            attachment_file_paths: list[str] = None,
            server: smtplib.SMTP_SSL = None
    ) -> bool:
        if type(receiver_email) is str:
            receiver_email = [receiver_email]

        if server is None:
            with smtplib.SMTP_SSL(
                    self.smtp_server_address,
                    self.smtp_server_port,
                    context=self.email_context
            ) as server:
                self._send_email(
                    subject,
                    plain_message,
                    receiver_email,
                    html_message,
                    attachment_file_paths,
                    server
                )
        else:
            self._send_email(
                subject,
                plain_message,
                receiver_email,
                html_message,
                attachment_file_paths,
                server
            )

        return True

Static methods

def attach_documents_to_email(message: email.mime.multipart.MIMEMultipart, attachment_file_paths: list)
Expand source code
@staticmethod
def attach_documents_to_email(message: MIMEMultipart, attachment_file_paths: list[str]):
    for attachment_file_path in attachment_file_paths:
        with open(attachment_file_path, "rb") as attachment:
            part = MIMEBase("application", "octet-stream")
            part.set_payload(attachment.read())

        encoders.encode_base64(part)
        part.add_header(
            "Content-Disposition",
            f"attachment; filename= {os.path.basename(attachment_file_path)}",
        )
        message.attach(part)
def get_send_mail_kwargs(subject: str, plain_message: str, html_message: str, receiver_emails: list, attachment_file_paths: list = None) ‑> dict
Expand source code
@staticmethod
def get_send_mail_kwargs(
        subject: str,
        plain_message: str,
        html_message: str,
        receiver_emails: list[str],
        attachment_file_paths: list[str] = None,
) -> dict:
    return {
        "subject": subject,
        "plain_message": plain_message,
        "html_message": html_message,
        "receiver_emails": receiver_emails,
        "attachment_file_paths": attachment_file_paths
    }

Methods

def send_email(self, subject: str, plain_message: str, receiver_email: list[str] | str, html_message: str = None, attachment_file_paths: list = None, server: smtplib.SMTP_SSL = None) ‑> bool
Expand source code
def send_email(
        self,
        subject: str,
        plain_message: str,
        receiver_email: str | list[str],
        html_message: str = None,
        attachment_file_paths: list[str] = None,
        server: smtplib.SMTP_SSL = None
) -> bool:
    if type(receiver_email) is str:
        receiver_email = [receiver_email]

    if server is None:
        with smtplib.SMTP_SSL(
                self.smtp_server_address,
                self.smtp_server_port,
                context=self.email_context
        ) as server:
            self._send_email(
                subject,
                plain_message,
                receiver_email,
                html_message,
                attachment_file_paths,
                server
            )
    else:
        self._send_email(
            subject,
            plain_message,
            receiver_email,
            html_message,
            attachment_file_paths,
            server
        )

    return True