Python Archives - 77 Interactive http://77interactive.com/?cat=2 Rudy's Code snippets Fri, 06 Sep 2024 13:53:46 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.4 http://77interactive.com/wp-content/uploads/2023/05/cropped-77-32x32.png Python Archives - 77 Interactive http://77interactive.com/?cat=2 32 32 python loops http://77interactive.com/?p=392 Fri, 06 Sep 2024 13:53:35 +0000 http://77interactive.com/?p=392 The post python loops appeared first on 77 Interactive.

]]>
import psycopg2 import time # Database connection details db_config = { 'dbname': 'users', # Name of the database 'user': 'your_username', # Replace with your PostgreSQL username 'password': 'your_password', # Replace with your PostgreSQL password 'host': 'localhost', # Replace with your host if needed 'port': '5432' # PostgreSQL default port } # Function to connect to the PostgreSQL database def connect_to_db(config): try: conn = psycopg2.connect(**config) print("Connection established.") return conn except Exception as error: print(f"Error connecting to the database: {error}") return None # Function to perform the updates def perform_updates(conn): try: cursor = conn.cursor() for i in range(10): print(f"Iteration {i+1}/10") # Step a.i: Append 'ccc' to plain_text append_query = """ UPDATE dbo.paragraph SET plain_text = plain_text || 'ccc' WHERE id IS NOT NULL; """ cursor.execute(append_query) conn.commit() # Commit the change print("Appended 'ccc' to plain_text") # Wait for 5 seconds time.sleep(5) # Step c.i: Remove the last 3 characters from plain_text remove_query = """ UPDATE dbo.paragraph SET plain_text = substring(plain_text FROM 1 FOR length(plain_text) - 3) WHERE id IS NOT NULL; """ cursor.execute(remove_query) conn.commit() # Commit the change print("Removed last 3 characters from plain_text") except Exception as error: print(f"Error performing updates: {error}") conn.rollback() # Rollback in case of an error finally: cursor.close() # Main execution if __name__ == "__main__": # Connect to the database connection = connect_to_db(db_config) if connection is not None: # Perform the updates in a loop perform_updates(connection) # Close the connection connection.close() print("Connection closed.")

The post python loops appeared first on 77 Interactive.

]]>
PostgreSQL pg_dump backups via Python http://77interactive.com/?p=305 Wed, 21 Feb 2024 15:21:02 +0000 http://77interactive.com/?p=305 The following scripts backup your PostgreSQL databases via Python using pg_dump. The scripts will then compress the backup via zip. These backup scripts are linux specific. You’ll have to manage secrets either using a .pgpass file or environment variables. This is the first pass before using any type of password security. Alternative version that I’ve […]

The post PostgreSQL pg_dump backups via Python appeared first on 77 Interactive.

]]>
The following scripts backup your PostgreSQL databases via Python using pg_dump. The scripts will then compress the backup via zip. These backup scripts are linux specific.

You’ll have to manage secrets either using a .pgpass file or environment variables. This is the first pass before using any type of password security.

import subprocess
import psycopg2
from datetime import datetime
import os

# PostgreSQL connection parameters
db_params = {
    "dbname": "postgres",  # change to your database name if different
    "user": "your_username",
    "password": "your_password",
    "host": "localhost"
}

# Directory to store the backups
backup_dir = "/mnt/backups/"
os.makedirs(backup_dir, exist_ok=True)

def get_databases(connection_parameters):
    """Retrieve a list of database names."""
    conn = None
    databases = []
    try:
        conn = psycopg2.connect(**connection_parameters)
        cur = conn.cursor()
        cur.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
        databases = [db[0] for db in cur.fetchall()]
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not closed:
            conn.close()
    return databases

def backup_database(db_name, backup_directory):
    """Backup a single database and zip the backup file."""
    current_date = datetime.now().strftime('%Y%m%d')
    backup_file = os.path.join(backup_directory, f"{db_name}.sql")
    zip_file = f"{current_date}-{db_name}.zip"
    
    # Backup database to SQL file
    subprocess.run(f"pg_dump -U {db_params['user']} -w {db_name} > {backup_file}", shell=True, check=True)
    
    # Compress the SQL file
    subprocess.run(f"zip -j {backup_dir}{zip_file} {backup_file}", shell=True, check=True)
    
    # Remove the original SQL file
    os.remove(backup_file)

if __name__ == "__main__":
    databases = get_databases(db_params)
    for db_name in databases:
        print(f"Backing up database: {db_name}")
        backup_database(db_name, backup_dir)
    print("Backup process completed.")

Alternative version that I’ve used before

import os
import datetime
import subprocess

# Database connection details

HOST = "localhost"
PORT = 5432
USER = "postgres"
PASSWORD = "your_password"  # Replace with your actual password

# Backup directory

BACKUP_DIR = "/mnt/backups/"

# Get current date in YYYYMMDD format

current_date = datetime.datetime.now().strftime("%Y%m%d")


def main():
    # Connect to PostgreSQL server

    try:
        conn = psycopg2.connect(host=HOST, port=PORT, user=USER, password=PASSWORD)
    except psycopg2.Error as e:
        print("Error connecting to database:", e)
        return
    # Get all databases except system ones

    try:
        cur = conn.cursor()
        cur.execute(
            "SELECT datname FROM pg_database WHERE datistemplate = false AND datname NOT IN ('postgres', 'template')"
        )
        databases = [row[0] for row in cur.fetchall()]
    except psycopg2.Error as e:
        print("Error fetching databases:", e)
        return
    # Backup each database

    for db in databases:
        backup_database(db)
    # Close connection

    conn.close()


def backup_database(db_name):
    # Dump database

    dump_file = os.path.join(BACKUP_DIR, f"{db_name}.sql")
    command = [
        "pg_dump",
        "-h",
        HOST,
        "-p",
        str(PORT),
        "-U",
        USER,
        "-d",
        db_name,
        ">",
        dump_file,
    ]
    subprocess.run(command, check=True)

    # Zip the dump

    zip_file = os.path.join(BACKUP_DIR, f"{current_date}-{db_name}.zip")
    subprocess.run(["zip", "-r", zip_file, dump_file], check=True)

    # Remove the uncompressed dump

    os.remove(dump_file)


if __name__ == "__main__":
    main()

The post PostgreSQL pg_dump backups via Python appeared first on 77 Interactive.

]]>
Get Databases from a Server http://77interactive.com/?p=298 Fri, 15 Dec 2023 19:37:02 +0000 http://77interactive.com/?p=298 The following python script gets all databases from a PostgreSQL server.

The post Get Databases from a Server appeared first on 77 Interactive.

]]>
The following python script gets all databases from a PostgreSQL server.

import psycopg2

def get_databases(server_name, user, password, db_name='postgres'):
    try:
        conn = psycopg2.connect(
            host=server_name,
            user=user,
            password=password,
            dbname=db_name
        )
        cursor = conn.cursor()

        # Query to get the list of databases
        cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
        databases = [db[0] for db in cursor.fetchall()]

        return databases

    except Exception as e:
        print(f"An error occurred: {e}")
        return []

    finally:
        cursor.close()
        conn.close()

# Example usage (uncomment and modify with your details for testing)
# print(get_databases('your_server_name', 'your_username', 'your_password'))

The post Get Databases from a Server appeared first on 77 Interactive.

]]>
Read Postgres Configuration File http://77interactive.com/?p=266 Tue, 19 Sep 2023 21:52:53 +0000 http://77interactive.com/?p=266 Read and Display Postgres configuration information The following script reads the Postgresql configuration file and displays key settings. Results are sent to the console and an outfile. You can also specify the postresql.conf file location via parameter.

The post Read Postgres Configuration File appeared first on 77 Interactive.

]]>
Read and Display Postgres configuration information

The following script reads the Postgresql configuration file and displays key settings. Results are sent to the console and an outfile. You can also specify the postresql.conf file location via parameter.

import re
import sys
import subprocess
import json

def get_node_name():
    # Try to get the cluster name using the `pg_lsclusters` command
    try:
        output = subprocess.check_output(['pg_lsclusters', '-h'], universal_newlines=True)
        # Extract the cluster name (which is typically "main" in default installations)
        cluster_name = output.splitlines()[1].split()[-1]
        return cluster_name
    except:
        return "Unknown"

def read_postgres_conf(file_path):
    # Define the settings we're interested in
    settings = [
        "listen_addresses", "port", "max_connections", "shared_buffers", 
        "work_mem", "maintenance_work_mem", "wal_level", "max_wal_size", "min_wal_size"
        "archive_mode", "log_statement", "log_duration", "hot_standby","synchronous_commit",
        "autovacuum", "log_timezone", "timezone", "ssl", "Password_encryption",
        "effective_cache_size","random_page_cost", "seq_page_cost"
    ]
    
    # Initialize a dictionary with "Not Set" as default for all settings and empty comments
    values = {setting: {"value": "Not Set", "comment": ""} for setting in settings}

    try:
        with open(file_path, 'r') as file:
            for line in file:
                # Remove leading and trailing whitespaces
                line = line.strip()

                # Skip empty lines
                if not line:
                    continue

                for setting in settings:
                    if line.startswith(setting):
                        # Extract the value and possible comment using a regular expression
                        match = re.search(r'^\s*{}\s*=\s*([^#\s]*)\s*(?:#\s*(.*))?'.format(setting), line)
                        if match:
                            values[setting]["value"] = match.group(1).strip("'\"")
                            if match.group(2):
                                values[setting]["comment"] = match.group(2).strip()
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' was not found.")
        return

    return values

if __name__ == "__main__":
    default_path = "/etc/postgresql/15/main/postgresql.conf"
    output_file = "/tmp/linuxverification.json"
    
    if len(sys.argv) == 2 and sys.argv[1] in ["-h", "--help"]:
        print("Usage: python postgresverification.py [path_to_postgresql.conf]")
        print(f"Default path if not specified: {default_path}")
        print("Example: python postgresverification.py /path/to/your/postgresql.conf")
    else:
        file_path = sys.argv[1] if len(sys.argv) > 1 else default_path
        values = read_postgres_conf(file_path)

        if values:
            # Get the node name
            node_name = get_node_name()
            output_data = {
                "Postgres Node Name": node_name,
                "Settings": values
            }

           # Output to the console
            print(f"Postgres Node Name: {node_name}\n")
            print("{:<20} | {:<15} | {}".format("Setting", "Current Value", "Comment"))
            print("-" * 65)
            for setting, data in values.items():
                print("{:<20} | {:<15} | {}".format(setting, data["value"], data["comment"]))
       

            # Append the results to the file in JSON format
            with open(output_file, 'a') as json_file:
                json_file.write(json.dumps(output_data, indent=4))
                json_file.write('\n')  # Separate entries by a newline

            print(f"Results appended to: {output_file}")

The post Read Postgres Configuration File appeared first on 77 Interactive.

]]>
Read MongoDB Configuration File http://77interactive.com/?p=262 Wed, 13 Sep 2023 17:14:51 +0000 http://77interactive.com/?p=262 Find and read the file using Python The following python script reads mongod.conf file and returns some settings.

The post Read MongoDB Configuration File appeared first on 77 Interactive.

]]>
Find and read the file using Python

The following python script reads mongod.conf file and returns some settings.


import sys
import yaml
import json

def read_mongodb_conf(file_path):
    # Define the settings we're interested in reading from mongod.conf
    # If a setting is not set or commented out, then "Not Set" is displayed
    settings = [
        "net.bindIp", "net.tls.mode","replication.replSetName", "replication.oplogSizeMB", "net.port",
        "security.authorization", "security.keyFile", "storage.dbPath", "systemLog.path", "systemLog.logAppend",
        "storage.engine", "storage.journal.enabled", "storage.journal.commitIntervalMs",
        "operationProfiling.mode", "operationProfiling.slowOpThresholdMs", "operationProfiling.slowOpSampleRate",
        "storage.wiredTiger.engineConfig.cacheSizeGB", "storage.wiredTiger.engineConfig.directoryForIndexes"
    ]

    # Initialize a dictionary with "Not Set" as default for all settings and empty comments
    # These will be updated with the actual values, if they exist
    values = {setting: "Not Set" for setting in settings}

    try:
        # Open the file and load the YAML data
        with open(file_path, 'r') as file:
            config = yaml.safe_load(file)
            
            # Loop through the settings in the file
            for setting in settings:
                keys = setting.split('.')
                value = config
                
                # Navigate through the nested keys
                for key in keys:
                    if key in value:
                        value = value[key]
                    else:
                        value = None
                        break
                # If the value is not None, then update the dictionary
                # with the actual value.
                if value is not None:
                    values[setting] = value
    # Handle exceptions
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' was not found.")
        return
    except yaml.YAMLError:
        print(f"Error: Invalid YAML format in '{file_path}'.")
        return

    return values

if __name__ == "__main__":
    # Set the default path to the MongoDB configuration file
    # and the output file.
    default_path = "/etc/mongod.conf"
    output_file = "/tmp/mongoverification.json"
    
    # If the user passes -h or --help, then display the help message
    if len(sys.argv) == 2 and sys.argv[1] in ["-h", "--help"]:
        print("Usage: python script_name.py [path_to_mongod.conf]")
        print(f"Default path if not specified: {default_path}")
        print("Example: python script_name.py /path/to/your/mongod.conf")
    else:
        file_path = sys.argv[1] if len(sys.argv) > 1 else default_path
        values = read_mongodb_conf(file_path)

        if values:
            # Output to the console
            print("{:<30} | {}".format("Setting", "Current Value"))
            print("-" * 65)
            for setting, value in values.items():
                print("{:<30} | {}".format(setting, value))

            # Append the results to the file in JSON format
            with open(output_file, 'a') as json_file:
                json.dump(values, json_file, indent=4)
                json_file.write('\n')  # Separate entries by a newline

            print(f"\nResults appended to: {output_file}")

The post Read MongoDB Configuration File appeared first on 77 Interactive.

]]>
Linux Configuration Script in Python http://77interactive.com/?p=245 Wed, 23 Aug 2023 16:45:09 +0000 http://77interactive.com/?p=245 Verifying Linux Setting in Python The following Python script checks several Linux settings. Results of the check are written to the console. If the results are within the desired ranges and values, the results are displayed in green. If the values are outside of the desired ragnes, results are displayed in red. All results are […]

The post Linux Configuration Script in Python appeared first on 77 Interactive.

]]>
Verifying Linux Setting in Python

The following Python script checks several Linux settings. Results of the check are written to the console. If the results are within the desired ranges and values, the results are displayed in green. If the values are outside of the desired ragnes, results are displayed in red.

All results are written to a json outfile located at /tmp/linuxverification.json

Python code

import subprocess
import socket
import os
import json
import platform

def get_value_from_sys_path(path):
    try:
        with open(path, 'r') as f:
            return f.read().strip()
    except Exception:
        return None

# Check function for exact values
def check_value_exact(value, target):
    return str(value) == str(target)

# Check function for in_range column
# If the value is in range, then return True
def check_value_in_range(value, check_func):
    if value is None:
        return False
    return check_func(value)

# Get system total memory
def get_memory_info():
    with open("/proc/meminfo", "r") as f:
        for line in f.readlines():
            if "MemTotal" in line:
                memory_kb = int(line.split()[1])
                return memory_kb / (1024 * 1024)  # Convert KB to GB

# Checks the Transparent Huge Pages status
# The status can be one of the following:
# always, madvise, never
def check_thp_status():
    path = "/sys/kernel/mm/transparent_hugepage/enabled"
    thp_value = get_value_from_sys_path(path)
    if "[never]" in thp_value:
        return (True, "[never]")
    elif "[always]" in thp_value:
        return (False, "[always]")
    elif "[madvise]" in thp_value:
        return (False, "[madvise]")
    else:
        return (None, "Unknown")

# Get network adapters and their IP addresses
def get_ip_addresses():
    try:
        output = subprocess.check_output(["hostname", "--all-ip-addresses"]).decode('utf-8').strip()
        return output.split()
    except Exception as e:
        print(f"Error fetching IP addresses: {e}")
        return []

# Print colored text to console based acceptable ranges
def print_colored(text, color):
    endc = '\033[0m'
    print(f"{color}{text}{endc}")

def main():
    settings = {
        "vm.swappiness": {
            "path": "/proc/sys/vm/swappiness",
            "acceptable": "10 or less",
            "check_func": lambda x: int(x) <= 10
        },
        "vm.dirty_ratio": {
            "path": "/proc/sys/vm/dirty_ratio",
            "acceptable": "less than 15",
            "check_func": lambda x: int(x) < 15
        },
        "vm.dirty_background_ratio": {
            "path": "/proc/sys/vm/dirty_background_ratio",
            "acceptable": "5 or less",
            "check_func": lambda x: int(x) <= 5
        },
        "net.core.somaxconn": {
            "path": "/proc/sys/net/core/somaxconn",
            "acceptable": "4096",
            "check_func": lambda x: check_value_exact(x, 4096)
        },
        'net.ipv4.tcp_fin_timeout': {
            'path': '/proc/sys/net/ipv4/tcp_fin_timeout',
            'acceptable': 30,
            'check_func': lambda x: check_value_exact(x, 30)
        },
        'net.ipv4.tcp_keepalive_intvl': {
            'path': "/proc/sys/net/ipv4/tcp_keepalive_intvl",
            'acceptable': 30,
            'check_func': lambda x: check_value_exact(x, 30)
        },
        'net.ipv4.tcp_keepalive_time': {
            'path': "/proc/sys/net/ipv4/tcp_keepalive_time",
            'acceptable': 120,
            'check_func': lambda x: check_value_exact(x, 120)
        },
        'net.ipv4.tcp_max_syn_backlog': {
            'path': "/proc/sys/net/ipv4/tcp_max_syn_backlog",
            'acceptable': 4096,
            'check_func': lambda x: check_value_exact(x, 4096)
        },
        'net.ipv4.tcp_keepalive_probes': {
            'path': "/proc/sys/net/ipv4/tcp_keepalive_probes",
            'acceptable': 6,
            'check_func': lambda x: check_value_exact(x, 6)
        }
    }

    # Add the Server specs to the results dictionary
    # which will be used in the output file.
    results = {
        "Server Name": socket.gethostname(),
        "Processor Information": platform.processor(),
        "Number of Processor Cores": os.cpu_count(),
        "Total Memory (GB)": get_memory_info(),
        "IP Addresses": get_ip_addresses()
    }

    # Display the server specs on the terminal
    print(f"Server Name: {results['Server Name']}")
    print(f"Processor Information: {results['Processor Information']}")
    print(f"Number of Processor Cores: {results['Number of Processor Cores']}")
    print(f"Total Memory (GB): {results['Total Memory (GB)']:.2f}")
    for ip in results['IP Addresses']:
        print(f"IP Address: {ip}")
    print("-" * 91)

    # Print settings table header
    print("{:<30} {:<30} {:<20} {:<10}".format("Setting", "Acceptable Ranges", "Current Value", "In Range"))
    print("-" * 91)

    # Build the output dictionary
    output = {}

    # Loop over the settings dictionary
    # Determine the current value of the setting and if the
    # value is in the acceptable range. If so, then print
    # the setting in green, otherwise print it in red.
    # We also add the setting to the output dictionary
    for setting_name, setting_details in settings.items():
        value = get_value_from_sys_path(setting_details["path"])
        in_range = check_value_in_range(value, setting_details["check_func"])
        
        print_colored(
            "{:<30} {:<30} {:<20} {:<10}".format(
                setting_name, 
                str(setting_details["acceptable"]), 
                str(value),
                str(in_range)
            ),
            "\033[92m" if in_range else "\033[91m"
        )

        # Add the setting to the output dictionary
        # which will be used in the out file
        output[setting_name] = {
            "current_value": value,
            "acceptable_values": setting_details["acceptable"],
            "in_range": in_range
        }

    # Add the settings to the results dictionary
    results["Settings"] = output

    # Save results to a JSON file
    with open("/tmp/linuxverification.json", "w") as file:
        json.dump(results, file, indent=4)

if __name__ == "__main__":
    main()

The post Linux Configuration Script in Python appeared first on 77 Interactive.

]]>
Copy Directory and Contents in Python http://77interactive.com/?p=90 Fri, 09 Jun 2023 03:02:07 +0000 http://77interactive.com/?p=90 The following script copies a directory and all of its contents to another directory Call the script like this

The post Copy Directory and Contents in Python appeared first on 77 Interactive.

]]>
The following script copies a directory and all of its contents to another directory

import shutil
import os
import argparse

def copy_directory(src, dst):
    """
    This function copies a directory, including all of its content and hidden files, to another directory.

    Args:
        src (str): The source directory.
        dst (str): The destination directory.

    Returns:
        None
    """
    # Create the destination directory path including the source directory name
    dst = os.path.join(dst, os.path.basename(src))
    
    # Ensure the destination directory exists
    os.makedirs(dst, exist_ok=True)

    # Iterate over all the files in source directory
    for item in os.listdir(src):
        s = os.path.join(src, item)
        d = os.path.join(dst, item)
        
        # If item is a directory, recurse into it
        if os.path.isdir(s):
            print(f"Copying directory {s} to {d}")
            copy_directory(s, d)
        else:
            # Otherwise, copy it
            print(f"Copying file {s} to {d}")
            shutil.copy2(s, d)

def main():
    parser = argparse.ArgumentParser(description='Copy a directory including all of its content and hidden files to another directory.')
    parser.add_argument('src', type=str, help='The source directory')
    parser.add_argument('dst', type=str, help='The destination directory')

    args = parser.parse_args()

    copy_directory(args.src, args.dst)


if __name__ == "__main__":
    main()


Call the script like this

python3 copy_dir.py /path/to/source/directory /path/to/destination/directory

The post Copy Directory and Contents in Python appeared first on 77 Interactive.

]]>
Python – Execute command as of 4 weeks ago http://77interactive.com/?p=59 Tue, 16 May 2023 20:09:04 +0000 http://77interactive.com/?p=59 Delete backups older than 4 weeks ago The script uses pbm delete-backup and delete-pitr to clean up backups older than 4 weeks. The script then sends an email when complete Secrets are stored in a .env file Updated version with more parameters and a comment section. The .env file The environment file is of the […]

The post Python – Execute command as of 4 weeks ago appeared first on 77 Interactive.

]]>
Delete backups older than 4 weeks ago

The script uses pbm delete-backup and delete-pitr to clean up backups older than 4 weeks. The script then sends an email when complete

Secrets are stored in a .env file

Updated version with more parameters and a comment section.

#!/usr/bin/env python3

"""
purge_mongodb_backups.py

This script does the following:
1. Calculates the date based on the number of weeks to keep (specified by the 'weeks_to_keep' argument).
2. Connects to MongoDB and gets the replica set name.
3. Executes the 'pbm delete-pitr' command with a '--older-than' parameter set to the calculated date, if the 'pitr' argument is set to 1.
4. Executes the 'pbm delete-backup' command with a '--older-than' parameter set to the calculated date, if the 'pitr' argument is set to 0.
5. Sends an email notifying that the chosen command has been executed, the number of weeks specified to keep, and includes the replica set name in the email subject and body.

This script reads the MongoDB username and password from an external .env file.

Before running this script, make sure you have the necessary permissions to execute the commands and send emails.
Also, replace the placeholders in the email details section with your actual SMTP server details, email address, email password, and recipient's email address.

Usage: python3 purge_mongodb_backups.py --pitr {0,1} --weeks_to_keep {1,2,3,4}
"""

import datetime
import subprocess
import shlex
import argparse
from dotenv import load_dotenv
import os
import smtplib
from email.mime.text import MIMEText
from pymongo import MongoClient

# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument('--pitr', default=0, type=int, choices=[0, 1], help='Set to 1 to delete point-in-time recovery backups, or 0 to delete regular backups.')
parser.add_argument('--weeks_to_keep', default=4, type=int, choices=[1, 2, 3, 4], help='The number of weeks to keep. Valid values are 1, 2, 3, and 4.')
args = parser.parse_args()

# Load values from .env file
load_dotenv()

# Get the values from the .env file
user_name = os.getenv("USERNAME")
pass_word = os.getenv("PASSWORD")

# Calculate the date based on the number of weeks to keep
weeks_to_keep = args.weeks_to_keep
date_to_keep = (datetime.datetime.now() - datetime.timedelta(weeks=weeks_to_keep)).strftime('%Y-%m-%d')

# Prepare the commands
cmd1 = f"pbm delete-pitr -f --older-than {date_to_keep} --mongodb-uri='mongodb://{user_name}:{pass_word}@localhost:27017/admin?authSource=admin'"
cmd2 = f"pbm delete-backup -f --older-than {date_to_keep} --mongodb-uri='mongodb://{user_name}:{pass_word}@localhost:27017/admin?authSource=admin'"
cmd1 = shlex.split(cmd1)
cmd2 = shlex.split(cmd2)

# Execute the chosen command
if args.pitr == 1:
    subprocess.run(cmd1, check=True)
elif args.pitr == 0:
    subprocess.run(cmd2, check=True)

# Connect to MongoDB and get replica set name
client = MongoClient(f'mongodb://{user_name}:{pass_word}@localhost:27017/admin?authSource=admin')
replica_set_name = client.admin.command("isMaster")["setName"]

# Email details
smtp_server = 'smtp.yourserver.com'
smtp_port = 587  # or 465 if your server uses SSL
email_address = 'your_email_address'
email_password = 'your_email_password'
recipient_email = 'recipient_email_address'

# Email content
msg = MIMEText(f'The chosen command has been executed successfully on replica set: {replica_set_name}. The number of weeks to keep is: {weeks_to_keep}.')
msg['Subject'] = f'Command Execution Notification for {replica_set_name}'
msg['From'] = email_address
msg['To'] = recipient_email

# Send the email
s = smtplib.SMTP(smtp_server, smtp_port)
s.starttls()  # comment this line if your server uses SSL
s.login(email_address, email_password)
s.sendmail(email_address, [recipient_email], msg.as_string())
s.quit()

The .env file

The environment file is of the following format

USERNAME=yourusername
PASSWORD=yourpassword

Original version:

import datetime
import subprocess
import shlex
from dotenv import load_dotenv
import os
import smtplib
from email.mime.text import MIMEText
from pymongo import MongoClient

# Load values from .env file
load_dotenv()

# Get the values from the .env file
user_name = os.getenv("USERNAME")
pass_word = os.getenv("PASSWORD")

# Calculate the date four weeks ago
four_weeks_ago = (datetime.datetime.now() - datetime.timedelta(weeks=4)).strftime('%Y-%m-%d')

# Prepare the first command
cmd1 = f"pbm delete-pitr -f --older-than {four_weeks_ago} --mongodb-uri='mongodb://{user_name}:{pass_word}@localhost:27017/admin?authSource=admin'"

# Prepare the second command
cmd2 = f"pbm delete-backup -f --older-than {four_weeks_ago} --mongodb-uri='mongodb://{user_name}:{pass_word}@localhost:27017/admin?authSource=admin'"

# Use shlex to handle spaces in commands properly
cmd1 = shlex.split(cmd1)
cmd2 = shlex.split(cmd2)

# Execute the first command
subprocess.run(cmd1, check=True)

# Execute the second command
subprocess.run(cmd2, check=True)

# Connect to MongoDB and get replica set name
client = MongoClient(f'mongodb://{user_name}:{pass_word}@localhost:27017/admin?authSource=admin')
replica_set_name = client.admin.command("isMaster")["setName"]

# Email details
smtp_server = 'smtp.yourserver.com'
smtp_port = 587  # or 465 if your server uses SSL
email_address = 'your_email_address'
email_password = 'your_email_password'
recipient_email = 'recipient_email_address'

# Email content
msg = MIMEText(f'Both commands have been executed successfully on replica set: {replica_set_name}.')
msg['Subject'] = f'Command Execution Notification for {replica_set_name}'
msg['From'] = email_address
msg['To'] = recipient_email

# Send the email
s = smtplib.SMTP(smtp_server, smtp_port)
s.starttls()  # comment this line if your server uses SSL
s.login(email_address, email_password)
s.sendmail(email_address, [recipient_email], msg.as_string())
s.quit()

The post Python – Execute command as of 4 weeks ago appeared first on 77 Interactive.

]]>
Python – Upload Sunday Files via S3 http://77interactive.com/?p=55 Tue, 16 May 2023 15:14:45 +0000 http://77interactive.com/?p=55 Upload Python Script The following code uploads files and directories via S3. This version gets files and directories created on a Sunday How to call from Command line We can then call the function from the command line like this: First of the month version The following version returns values for the first of the […]

The post Python – Upload Sunday Files via S3 appeared first on 77 Interactive.

]]>
Upload Python Script

The following code uploads files and directories via S3. This version gets files and directories created on a Sunday

import os
import datetime
import re
import subprocess
import argparse

def get_newest_in_sunday(path, s3_target_path):
    """
    This function scans a given directory for files and directories that were created on the last Sunday.
    It then uploads the newest file and directory to a specified S3 target path using the s3cmd tool.

    Args:
        path (str): The path of the directory to scan.
        s3_target_path (str): The S3 path to upload the file and directory to.

    Returns:
        Tuple[str, str]: The names of the newest file and directory, if they exist.
    """
    sunday_files = []
    sunday_dirs = []

    today = datetime.datetime.now()

    # Calculate the last Sunday
    days_since_sunday = (today.weekday() - 6) % 7
    last_sunday = today - datetime.timedelta(days=days_since_sunday)

    pattern = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z(\.pbm\.json)?$")

    for name in os.listdir(path):
        # Skip names that do not match the pattern
        if not pattern.match(name):
            continue

        full_path = os.path.join(path, name)
        timestamp_str = name.split('T')[0]  # Extract the date part from the name
        timestamp = datetime.datetime.strptime(timestamp_str, "%Y-%m-%d")

        # Check if it's the last Sunday
        if timestamp.date() == last_sunday.date():
            if os.path.isfile(full_path):
                sunday_files.append((timestamp, name))
            elif os.path.isdir(full_path):
                sunday_dirs.append((timestamp, name))

    # Sort by timestamp (newest first) and get the first name
    sunday_files.sort(reverse=True)
    sunday_dirs.sort(reverse=True)

    newest_file = sunday_files[0][1] if sunday_files else None
    newest_dir = sunday_dirs[0][1] if sunday_dirs else None

    # If both the file and directory exist, upload them
    if newest_file and newest_dir:
        s3_config_path = "./s3config.txt"
        subprocess.call(["s3cmd", "put", "--config", s3_config_path, os.path.join(path, newest_file), s3_target_path])
        subprocess.call(["s3cmd", "put", "--recursive", "--config", s3_config_path, os.path.join(path, newest_dir), s3_target_path])
        print(f"Uploaded file: {newest_file}")
        print(f"Uploaded directory: {newest_dir}")
    else:
        print("Nothing was uploaded.")

    return newest_file, newest_dir

def main():
    parser = argparse.ArgumentParser(description='Process some paths.')
    parser.add_argument('path', type=str, help='The path of the directory to scan')
    parser.add_argument('s3_target_path', type=str, help='The S3 path to upload the file and directory to')

    args = parser.parse_args()

    get_newest_in_sunday(args.path, args.s3_target_path)


if __name__ == "__main__":
    main()

How to call from Command line

We can then call the function from the command line like this:

python3 scan_upload.py /mnt/backups/server/rs /weekly-backup-folder/

First of the month version

The following version returns values for the first of the month.

import os
import datetime
import re
import subprocess
import argparse

def get_newest_from_first_of_month(path, s3_target_path):
    """
    This function scans a given directory for files and directories that were created on the first day of the current month.
    It then uploads the newest file and directory to a specified S3 target path using the s3cmd tool.

    Args:
        path (str): The path of the directory to scan.
        s3_target_path (str): The S3 path to upload the file and directory to.

    Returns:
        Tuple[str, str]: The names of the newest file and directory, if they exist.
    """
    monthly_files = []
    monthly_dirs = []

    today = datetime.datetime.now()

    # Calculate the first day of the current month
    first_of_month = today.replace(day=1)

    pattern = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z(\.pbm\.json)?$")

    for name in os.listdir(path):
        # Skip names that do not match the pattern
        if not pattern.match(name):
            continue

        full_path = os.path.join(path, name)
        timestamp_str = name.split('T')[0]  # Extract the date part from the name
        timestamp = datetime.datetime.strptime(timestamp_str, "%Y-%m-%d")

        # Check if it's the first day of the current month
        if timestamp.date() == first_of_month.date():
            if os.path.isfile(full_path):
                monthly_files.append((timestamp, name))
            elif os.path.isdir(full_path):
                monthly_dirs.append((timestamp, name))

    # Sort by timestamp (newest first) and get the first name
    monthly_files.sort(reverse=True)
    monthly_dirs.sort(reverse=True)

    newest_file = monthly_files[0][1] if monthly_files else None
    newest_dir = monthly_dirs[0][1] if monthly_dirs else None

    # If both the file and directory exist, upload them
    if newest_file and newest_dir:
        s3_config_path = "./s3config.txt"
        subprocess.call(["s3cmd", "put", "--config", s3_config_path, os.path.join(path, newest_file), s3_target_path])
        subprocess.call(["s3cmd", "put", "--recursive", "--config", s3_config_path, os.path.join(path, newest_dir), s3_target_path])
        print(f"Uploaded file: {newest_file}")
        print(f"Uploaded directory: {newest_dir}")
    else:
        print("Nothing was uploaded.")

    return newest_file, newest_dir

def main():
    parser = argparse.ArgumentParser(description='Process some paths.')
    parser.add_argument('path', type=str, help='The path of the directory to scan')
    parser.add_argument('s3_target_path', type=str, help='The S3 path to upload the file and directory to')

    args = parser.parse_args()

    get_newest_from_first_of_month(args.path, args.s3_target_path)

if __name__ == "__main__":
    main()

The post Python – Upload Sunday Files via S3 appeared first on 77 Interactive.

]]>
Python – Upload latest via S3 http://77interactive.com/?p=53 Mon, 15 May 2023 19:41:21 +0000 http://77interactive.com/?p=53 The following python script uploads a file and directory via S3. The file and directory must match a certain pattern. If the file or folder do not exist, an error is returned.

The post Python – Upload latest via S3 appeared first on 77 Interactive.

]]>
The following python script uploads a file and directory via S3. The file and directory must match a certain pattern.

If the file or folder do not exist, an error is returned.

import os
import datetime
import re
import subprocess

def get_newest_in_current_day(path):
    current_day_files = []
    current_day_dirs = []

    current_day = datetime.datetime.now()
    pattern = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z(\.pbm\.json)?$")

    for name in os.listdir(path):
        # Skip names that do not match the pattern
        if not pattern.match(name):
            continue

        full_path = os.path.join(path, name)
        timestamp_str = name.split('T')[0]  # Extract the date part from the name
        timestamp = datetime.datetime.strptime(timestamp_str, "%Y-%m-%d")

        # Check if it's the current day
        if timestamp.date() == current_day.date():
            if os.path.isfile(full_path):
                current_day_files.append((timestamp, name))
            elif os.path.isdir(full_path):
                current_day_dirs.append((timestamp, name))

    # Sort by timestamp (newest first) and get the first name
    current_day_files.sort(reverse=True)
    current_day_dirs.sort(reverse=True)

    newest_file = current_day_files[0][1] if current_day_files else None
    newest_dir = current_day_dirs[0][1] if current_day_dirs else None

    # If both the file and directory exist, upload them
    if newest_file and newest_dir:
        s3_target_path = "/weekly-prd/database/"
        s3_config_path = "./s3config.txt"
        subprocess.call(["s3cmd", "put", "--config", s3_config_path, os.path.join(path, newest_file), s3_target_path])
        subprocess.call(["s3cmd", "put", "--recursive", "--config", s3_config_path, os.path.join(path, newest_dir), s3_target_path])
        print("File and directory were uploaded.")
    else:
        print("Nothing was uploaded.")

    return newest_file, newest_dir


path = "/mnt/backups/0300/rs"
get_newest_in_current_day(path)

The post Python – Upload latest via S3 appeared first on 77 Interactive.

]]>