#!/usr/bin/env python # -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ mysqlpdump is a wrapper to mysqldump to process it in paralel It uses various threads to connect to the MySQL server so it can perform the dump much faster than in traditional way. mysqlpdump tries to implement some of the suggestions that appeared in MySQL performance weblog: http://www.mysqlperformanceblog.com/2007/05/22/wishes-for-mysqldump/ Changelog: - 0.1 - First version - 0.2 - Store dumps to files directly instead to stdout - Can compress files - Dump each table in its own file - Can pass parameters directly to mysqldump """ __title__ = "mysqlpdump" __version__ = "0.2" __author__= "Carles Amigo" __email__= "fr3nd at fr3nd dot net" __website__= " http://www.fr3nd.net/projects/mysqlpdump" import threading, Queue import MySQLdb from optparse import OptionParser import commands import sys import os class Log: """Simple class for logging""" def __init__(self, verbose): self.verbose = verbose def log(self, line): """Logs an especified line""" if self.verbose: sys.stderr.write (" - " + str(line) + "\n") class Database: """Class to handle database connection""" def __init__(self, log, mysqluser, mysqlpass, mysqlhost): self.user = mysqluser self.password = mysqlpass self.host = mysqlhost self.log = log self.log.log("Connecting to database") self.db=MySQLdb.connect(user=mysqluser,passwd=mysqlpass,host=mysqlhost) self.cursor = self.db.cursor() def close(self): self.log.log("Closing database connection") self.db.close() def lock(self): """Locks all tables for read/write""" self.log.log("Locking all tables") self.cursor.execute("FLUSH TABLES WITH READ LOCK;") def unlock(self): """Unlocks all tables in the database""" self.log.log("Unlocking all tables") self.cursor.execute("UNLOCK TABLES") def get_databases(self): """Return all the databases""" self.cursor.execute("show databases;") result = self.cursor.fetchall() databases = [] for database in result: databases.append(database[0]) return databases def get_tables(self, database): """Return all tables for a given database""" self.cursor.execute("show tables from " + str(database) + ";") result = self.cursor.fetchall() tables = [] for table in result: tables.append(table[0]) return tables def get_slave_status(self): """Return slave status""" self.cursor.execute("show slave status;") result = self.cursor.fetchall() return result def mysqldump(self, database, table, destination, custom_parameters="", stdout=False, gzip=False, mysqldump="/usr/bin/mysqldump"): """Dumps a specified table. It can dump it to a file or just return all the dumped data. It can waste a lot of memory if its returning a big table.""" cmd=mysqldump if custom_parameters != "": cmd = cmd + " " + custom_parameters cmd = cmd + " -u" + self.user + " -p" + self.password + " -h" + self.host + " " + database + " " + table if stdout: return commands.getstatusoutput(cmd) else: file = destination + "/" + database + "-" + table + ".sql" if gzip: cmd = cmd + " | gzip -c > " + file + ".gz" else: cmd = cmd + " > " + file os.system(cmd) return (None, None) class Worker(threading.Thread): def __init__(self, queue, log, db, event_dict, destination, custom_parameters="", stdout=False, gzip=False, ): threading.Thread.__init__(self) self.queue = queue self.log = log self.db = db self.event_dict = event_dict self.stdout = stdout self.gzip = gzip self.destination = destination self.custom_parameters = custom_parameters def run(self): self.log.log("Worker " + self.getName() + " started") while True: num, database, table = self.queue.get() self.event_dict[num] = threading.Event() self.event_dict[num].clear() self.log.log(self.getName() + " dumping " + database + " " + table) status, output = self.db.mysqldump(database, table, custom_parameters=self.custom_parameters, stdout=self.stdout, gzip=self.gzip, destination=self.destination) if self.stdout: if num > 0: while not self.event_dict[num-1].isSet(): self.event_dict[num-1].wait() self.log.log(self.getName() + " dumped " + database + " " + table) if output: print output self.event_dict[num].set() self.queue.task_done() def main(): usage = "usage: %prog [options]\n Run mysqldump in paralel" parser = OptionParser(usage, version=__version__) parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose output.") parser.add_option("-u", "--user", action="store", dest="user", type="string", default=os.getlogin(), help="User for login.") parser.add_option("-p", "--password", action="store", dest="password", type="string", default='', help="Password for login.") parser.add_option("-H", "--host", action="store", dest="host", type="string", default='localhost', help="Connect to host.") parser.add_option("-t", "--threads", action="store", dest="threads", type="int", default=5, help="Threads used. Default = 5") parser.add_option("-s", "--stdout", action="store_true", dest="stdout", default=False, help="Output dumps to stdout instead to files.") parser.add_option("-g", "--gzip", action="store_true", dest="gzip", default=False, help="Add gzip compression to files.") parser.add_option("-d", "--destination", action="store", dest="destination", type="string", default=".", help="Path where to store generated dumps.") parser.add_option("-P", "--parameters", action="store", dest="parameters", type="string", default="", help="Pass parameters directly to mysqldump.") (options, args) = parser.parse_args() log = Log(options.verbose) try: db = Database(log, options.user, options.password, options.host) except: parser.error("Cannot connect to database") db.lock() queue = Queue.Queue() event_dict = {} for i in range(options.threads): t = Worker(queue, log, db, event_dict, custom_parameters=options.parameters, stdout=options.stdout, gzip=options.gzip, destination=options.destination) t.setDaemon(True) t.start() x = 0 for database in db.get_databases(): for table in db.get_tables(database): queue.put([x,database,table]) x = x + 1 queue.join() db.unlock() db.close() if __name__ == "__main__": main()