#!/usr/bin/env python # -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ mysqlpdump is a wrapper to mysqldump to process it in paralel """ __title__ = "mysqlpdump" __version__ = "0.1" __author__= " Carles Amigo" __email__= "fr3nd at fr3nd dot net" __website__= " http://www.fr3nd.net/projects/mysqlpdump" import threading, Queue import MySQLdb from optparse import OptionParser import commands import sys import os class Log: """Simple class for logging""" def __init__(self, verbose): self.verbose = verbose def log(self, line): """Logs an especified line""" if self.verbose: sys.stderr.write (" - " + str(line) + "\n") class Database: """Class to handle database connection""" def __init__(self, log, mysqluser, mysqlpass, mysqlhost): self.user = mysqluser self.password = mysqlpass self.host = mysqlhost self.log = log self.log.log("Connecting to database") self.db=MySQLdb.connect(user=mysqluser,passwd=mysqlpass,host=mysqlhost) self.cursor = self.db.cursor() def close(self): self.log.log("Closing database connection") self.db.close() def lock(self): """Locks all tables for read/write""" self.log.log("Locking all tables") self.cursor.execute("FLUSH TABLES WITH READ LOCK;") def unlock(self): """Unlocks all tables in the database""" self.log.log("Unlocking all tables") self.cursor.execute("UNLOCK TABLES") def get_databases(self): """Return all the databases""" self.cursor.execute("show databases;") result = self.cursor.fetchall() databases = [] for database in result: databases.append(database[0]) return databases def get_tables(self, database): """Return all tables for a given database""" self.cursor.execute("show tables from " + str(database) + ";") result = self.cursor.fetchall() tables = [] for table in result: tables.append(table[0]) return tables def mysqldump(self, database, table, custom_parameters=None): cmd="mysqldump" if custom_parameters: cmd = cmd + " " + custom_parameters cmd = cmd + " -u" + self.user + " -p" + self.password + " -h" + self.host + " " + database + " " + table return commands.getstatusoutput(cmd) class Worker(threading.Thread): def __init__(self, queue, log, db, event_dict): threading.Thread.__init__(self) self.queue = queue self.log = log self.db = db self.event_dict = event_dict def run(self): self.log.log("Worker " + self.getName() + " started") while True: num, database, table = self.queue.get() self.event_dict[num] = threading.Event() self.event_dict[num].clear() status, output = self.db.mysqldump(database, table) if num > 0: while not self.event_dict[num-1].isSet(): self.event_dict[num-1].wait() self.log.log(database + " " + table) print output self.event_dict[num].set() self.queue.task_done() def main(): usage = "usage: %prog [options]\n Run mysqldump in paralel" parser = OptionParser(usage, version=__version__) parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False, help="verbose output.") parser.add_option("-u", "--user", action="store", dest="user", type="string", default=os.getlogin(), help="User for login.") parser.add_option("-p", "--password", action="store", dest="password", type="string", default='', help="Password for login.") parser.add_option("-H", "--host", action="store", dest="host", type="string", default='localhost', help="Connect to host.") parser.add_option("-t", "--threads", action="store", dest="threads", type="int", default=5, help="Threads used. Default = 5") (options, args) = parser.parse_args() log = Log(options.verbose) db = Database(log, options.user, options.password, options.host) db.lock() queue = Queue.Queue() event_dict = {} for i in range(options.threads): t = Worker(queue, log, db, event_dict) t.setDaemon(True) t.start() x = 0 for database in db.get_databases(): for table in db.get_tables(database): queue.put([x,database,table]) x = x + 1 queue.join() db.unlock() db.close() if __name__ == "__main__": main()