Back to home

SSH long time remote command

背景

重建MySQL Replication的过程中,需要导入数据库文件。过程耗时约4个小时。使用Fabric脚本自动执行重建过程中,出现因为长时间没有响应导致超时,链接断开。
且MySQL 导入失败,但是并没有报错,而且能够成功建立Master/Slave关系(碰到出错需要一定概率,碰到未完全导入完成的表时才会报错)。

MySQL Replication 自动重建脚本

@task
def rebuild_slave(slave_host, slave_user, slave_password,
        master_host, master_user, master_password,
        slave_host_user=None, slave_host_password="",
        slave_tmp="/tmp/", master_tmp="/tmp/"):
    """Rebuild the MySQL replication


    :slave_host: Slave host
    :slave_user: Slave mysql user name
    :slave_password: Slave mysql user password
    :master_host: Master host
    :master_user: Master mysql user name
    :master_password: Master mysql user password
    :slave_host_user: Slave host user name, for rsync the dump file from the master to slave
    :slave_host_password: Slave host user password
    :slave_tmp: MySQL DB dump file placeholder in the slave host
    :master_tmp: MySQL db dump file placeholder in the master host
    :returns: None


    example:


        $ fab db.rebuild_slave:slave_host=rpm-etl02.stg,slave_user=root,slave_password=XXX,master_host=oltpdb01.stg,slave_host_user=root,master_tmp=/export/dump/,slave_tmp=/export/


    """
    if master_user is None:
        master_user = slave_user
    if master_password is None:
        master_password = slave_password
    if slave_host_user is None:
        slave_host_user = env.user
    env.prompts["{user}@{host}'s password: ".format(user=slave_host_user, host=slave_host)] = slave_host_password


    result = execute(run, 'mysql -u{slave_user} -p{slave_password} --batch -e "SHOW SLAVE STATUS"'.format(
            slave_user=slave_user,
            slave_password=slave_password,
        ), host=slave_host)[slave_host]
    with closing(StringIO(result)) as result_io:
        slave_info = dict(zip(*csv.reader(result_io, delimiter="\t")))
    databases = slave_info["Replicate_Do_DB"].split(",")
    SQL_FILE_NAME = "{master_host}_{slave_host}_{timestamp}.sql".format(
            master_host=master_host.upper(),
            slave_host=slave_host.upper(),
            timestamp=datetime.now().strftime("%Y%m%d%H%M%S")
            )
    # combine the slave hostname and export folder
    slave_path = slave_host
    if slave_host_user:
        # if special username (default is same to master)
        slave_path = "%s@%s" % (slave_host_user, slave_path)
    slave_path = "%s:%s" % (slave_path, slave_tmp)

    MYSQL_DUMP_ARGUMENTS = "--single-transaction --comments --no-autocommit --master-data=2 --single-transaction --add-drop-table --quick --routines"
    execute(run, 'mysqldump -u{master_user} -p{master_password} {MYSQL_DUMP_ARGUMENTS} --databases {databases} > {export_file_path}'.format(
        master_user=master_user,
        master_password=master_password,
        MYSQL_DUMP_ARGUMENTS=MYSQL_DUMP_ARGUMENTS,
        databases=" ".join(databases),
        export_file_path=os.path.join(master_tmp, SQL_FILE_NAME)
        ), host=master_host)

    execute(run, "rsync -ave 'ssh -o StrictHostKeyChecking=no' --progress --partial {export_file_path} {slave_path}".format(
        export_file_path=os.path.join(master_tmp, SQL_FILE_NAME),
        slave_path=slave_path
        ), host=master_host)

    # make sure slave is stopped
    if slave_info["Slave_IO_Running"] != "No" or slave_info["Slave_SQL_Running"] != "No":
        execute(run, 'mysql -u{slave_user} -p{slave_password} -e "STOP SLAVE"'.format(
            slave_user=slave_user,
            slave_password=slave_password,
            ), host=slave_host)
    else:
        puts(green("Slave already stopped"))


    # grep "MASTER_LOG_POS"
    master_mark = execute(run, 'grep -m 1 "CHANGE" {slave_tmp}'.format(
            slave_tmp=os.path.join(slave_tmp, SQL_FILE_NAME)
        ), host=slave_host)[slave_host].lstrip("-- ")
    # need long time executing
    execute(run, 'mysql -u{slave_user} -p{slave_password} < {slave_tmp}'.format(
        slave_user=slave_user,
        slave_password=slave_password,
        slave_tmp=os.path.join(slave_tmp, SQL_FILE_NAME)
        ), host=slave_host)
    execute(run, 'mysql -u{slave_user} -p{slave_password} -e "RESET SLAVE;{master_mark};START SLAVE;SHOW SLAVE STATUS"'.format(
        slave_user=slave_user,
        slave_password=slave_password,
        master_mark=master_mark,
        ), host=slave_host)

SSH 长时间远程执行指令

当长时间执行远程指令时,可能因为没有设置SSH心跳导致自动关闭。

Host *
    # If the network disappears your connection will hang, but if it then re-appears with 3 minutes it will resume working
    TCPKeepAlive no
    ServerAliveInterval 60
    ServerAliveCountMax 3

也有可能网络中出现突发情况导致连接中断。总之维持一个SSH连接来Handle这个指令执行非常不稳定。那么为什么不能保留我们的执行进程(任务)?

BASH 在会话退出时对所有子任务的处理

在 BASH 的帮助文档中有这样一段:

If the huponexit shell option has been set with shopt, bash sends a SIGHUP to all jobs when an interactive login shell exits.

原来BASH在交互式会话结束时会发送一个SIGHUP的信号给所有任务,导致目标退出。所以我们需要使用 daemon 或者建立一个 screen/tmux 隔离SSH会话,然后执行指令。

在系统中还有其他更合适的工具

nohup

nohup' runs the given COMMAND with hangup signals ignored, so that the command can continue running in the background after you log out.

不同的SHELL有不同的nohup版本,一般而言都能忽略hangup信号,执行调用指令。

trap

trap [-lp] [[arg] sigspec …] The command arg is to be read and executed when the shell receives signal(s) sigspec.

在当前的BASH会话中,对特定的信号处理。

$ bash -c 'SECONDS=0; sleep 4; echo $SECONDS'
^C
$ bash -c 'trap : INT; SECONDS=0; sleep 4; echo $SECONDS'
^C3

$ bash -c 'trap "" INT; SECONDS=0; sleep 4; echo $SECONDS'
^C^C^C4

上例中 trap "" INT; 就是忽略SIGINT信号,没有任何执行。trap : INT; 则是执行一个空操作,这样组织Ctrl-C打断整段指令的执行,让其继续执行后面的echo

实现

最后还是决定简单的包装一个nohup执行:

def long_time_execute(command, *args, **kws):
    """ Using `nohup` to ignore the SIGHUP avoid the command stop at the shell connection break


    :command: shell command, multi commands please using `&&` to combine
    :*args: to `run` function
    :**kws: to `run` function
    :returns: command execute resulte


    """
    output_file = sha1(command).hexdigest()
    with time_measure(command):
        with settings(hide('running', 'output', 'warnings'), warn_only=True):
            pid = run("nohup {shell} '{command}' 2>&1 >> {output_file} &\necho $! && sleep .01".format(
                    shell=env.shell,
                    command=command,
                    output_file=output_file,
                ), *args, **kws)


        executing_label = padding_label("waiting")
        with settings(hide('running', 'output', 'warnings'), warn_only=True):
            while run("kill -s 0 {pid}".format(
                    pid=pid,
                    )) == "":
                time.sleep(1)
                sys.stdout.write("%s\r" % executing_label.next())
                sys.stdout.flush()
            sys.stdout.write("\r")


        with hide('everything'):
            output = run("cat {output_file}; rm {output_file}".format(
                    output_file=output_file,
                ))
        puts(output)


        return output

将一开始示例里的长时间调用语句,改用这个函数实现即可。

long_time_execute(run, 'mysql -u{slave_user} -p{slave_password} < {slave_tmp}'.format(

pty/tty=False (default)

例子中为了防止挂入后台处理后,指令没有正常开始执行就被返回(SIGHUP在被忽略前给执行了),加入了一个延时尾巴 && sleep .01,具体原因请看Fabric#395

结论

深入脚本背后,光从脚本出发可能根本不会想到的问题,放到系统层面,进程生命周期中就显而易见。