import pymongo
from pymongo import MongoClient
from datetime import datetime
# Replace the credentials and host with your own
source_uri = "mongodb://source_username:source_password@source_host:source_port/"
dest_uri = "mongodb://dest_username:dest_password@dest_host:dest_port/"
source_client = MongoClient(source_uri)
dest_client = MongoClient(dest_uri)
# Connect to the DBA database and CollectionCounts collection in the destination MongoDB server
dba = dest_client["DBA"]
collection_counts = dba["CollectionCounts"]
# Remove all existing documents in the CollectionCounts collection
collection_counts.delete_many({})
# Get the current local timestamp
timestamp = datetime.now()
for db_name in source_client.list_database_names():
if db_name not in ["admin", "local", "config"]:
db = source_client[db_name]
for collection_name in db.list_collection_names():
collection = db[collection_name]
count = collection.count_documents({})
document = {
"database": db_name,
"collection": collection_name,
"count": count,
"lastUpdated": timestamp
}
collection_counts.insert_one(document)
print("Collection counts with the lastUpdated field have been stored in the DBA.CollectionCounts collection.")
Version 2
import pymongo
from pymongo import MongoClient
from datetime import datetime
# Replace the credentials and host with your own
source_uri = "mongodb://source_username:source_password@source_host:source_port/"
dest_uri = "mongodb://dest_username:dest_password@dest_host:dest_port/"
source_client = MongoClient(source_uri)
dest_client = MongoClient(dest_uri)
# Connect to the DBA database and CollectionCounts collection in the destination MongoDB server
dba = dest_client["DBA"]
collection_counts = dba["CollectionCounts"]
# Remove all existing documents in the CollectionCounts collection
collection_counts.delete_many({})
# Get the current local timestamp
timestamp = datetime.now()
for db_name in source_client.list_database_names():
if db_name not in ["admin", "local", "config"]:
db = source_client[db_name]
# Filter collections to exclude views
collections = [coll_info['name'] for coll_info in db.list_collections() if 'viewOn' not in coll_info['options']]
for collection_name in collections:
collection = db[collection_name]
count = collection.count_documents({})
document = {
"database": db_name,
"collection": collection_name,
"count": count,
"lastUpdated": timestamp
}
collection_counts.insert_one(document)
print("Collection counts with the lastUpdated field have been stored in the DBA.CollectionCounts collection.")