PeaceSheep's blog PeaceSheep's blog
首页
  • 分类
  • 标签
  • 归档
相关链接
提建议&咨询&赞赏
GitHub (opens new window)

PeaceSheep

以最简洁、易懂的话解决问题
首页
  • 分类
  • 标签
  • 归档
相关链接
提建议&咨询&赞赏
GitHub (opens new window)
  • 01 说明
  • 环境安装与配置

    • ubuntu换源
    • centos安装配置docker
    • k8s介绍及安装教程
    • github actions自动部署前端项目
    • 服务器创建git仓库,部署自己的博客
    • 使用python批量管理linux设备
      • 代码
        • 准备
        • 使用
    • 关于如何使用机房电脑批量烧录U盘的奇思妙想
    • 自建Zerotier节点
    • ubuntu安装g++
    • conda从古老版本升级
    • openkylin国产麒麟操作系统安装zerotier
    • nodejs、yarn安装
  • 常用命令与配置文件

  • 常用代码

  • 常用操作
  • 环境安装与配置
PeaceSheep
2023-05-01
目录

使用python批量管理linux设备

为了省赛比赛过程中批量管理选手机,写了一个python文件,记录一下方便以后使用。

# 代码

from fabric import Connection
from typing import List
from threading import Thread,Lock
from datetime import datetime
import time
import os
PRIVATE_KEY = "id_rsa"
ADMIN_USER = "liyaning"
NORMAL_USER = "jscpc"
SUDO_PASSWORD = "123"
UNLOCK_COMMAND = {'command':"export DISPLAY=:0; gnome-screensaver; gnome-screensaver-command -d;",'user':NORMAL_USER}
LOCK_COMMAND = {'command':"export DISPLAY=:0; gnome-screensaver; gnome-screensaver-command -l;",'user':ADMIN_USER}
LOGOUT_COMMAND = {'command':"gnome-session-quit --force",'user':NORMAL_USER}
CLEAR_COMMAND = {'command':["rm -rf /tmp/*", "rm -rf /var/tmp/*", "rsync -av --delete /opt/backup/jscpc-backup/jscpc /home"],'user':ADMIN_USER}


COMMAND = CLEAR_COMMAND


class Result:
    def __init__(self,server,success,msg):
        self.server = server
        self.success = success
        self.msg = msg


def generage_servers()->List[Connection]:
    return [f'192.168.{x}.{y}' for x in range(1,2) for y in range(103,104)]


def fun(server:str):
    conn = Connection(server,user = COMMAND['user'])
    try:
        if COMMAND['user'] == ADMIN_USER:
            for comm in COMMAND['command']:
                msg:str = str(conn.sudo(comm,hide="both",password=SUDO_PASSWORD))
        else:
            msg:str = str(conn.run(COMMAND['command'],hide="both"))
        msg = msg.replace("\n","").replace("\r","")
        result = Result(server,True,msg)
        success_lock.acquire()
        successes.append(result)
        success_lock.release()
    except Exception as err:
        msg = str(err)
        msg = msg.replace("\n","").replace("\r","")
        result = Result(server,False,msg)
        fail_lock.acquire()
        fails.append(result)
        fail_lock.release()

success_lock = Lock()
fail_lock = Lock()

successes:List[Result] = list()
fails:List[Result] = list()



if __name__ == "__main__":

    start_time = time.time()
    threads:List[Thread] = list()
    servers = generage_servers()
    for server in servers:
        t = Thread(target=fun,args=(server,))
        t.start()
        threads.append(t)
    
    for  t in threads:
        t.join()

    print(f"总数量:{len(servers)},成功数量:{len(successes)},失败数量:{len(fails)}")


    end_time = time.time()
    print(f"程序运行时间为:{end_time - start_time}秒")

    fmt_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S.txt")

    successes.sort(key=lambda x: x.server)
    fails.sort(key=lambda x: x.server)

    if not os.path.exists("log"):
        os.mkdir("log")
    with open(f"log/success-{fmt_time}","w",encoding="utf8") as f:
        f.write(f"command:{COMMAND}\n")
        for suc in successes:
            content = f"{suc.server}\n"
            f.write(content)
    
    with open(f"log/fail-{fmt_time}","w",encoding="utf8") as f:
        f.write(f"command:{COMMAND}\n")
        for fail in fails:
            content = f"{fail.server}:{fail.msg}\n"
            f.write(content)
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

# 准备

  1. 修改一些变量:

PRIVATE_KEY = 私钥文件的路径 ADMIN_USER = 具有root权限的用户名,注意只要具有root权限,其命令就会以sudo执行 NORMAL_USER = 普通用户的用户名 SUDO_PASSWORD = sudo用户的密码

  1. 保与私钥对应的公钥已被添加进目标主机。

  2. 修改generage_servers()函数,生成需要控制的ip地址。

# 使用

修改COMMAND=XX,为你要执行的命令,直接运行,main.py即可。

编辑 (opens new window)
上次更新: 2025/04/15, 10:52:45
服务器创建git仓库,部署自己的博客
关于如何使用机房电脑批量烧录U盘的奇思妙想

← 服务器创建git仓库,部署自己的博客 关于如何使用机房电脑批量烧录U盘的奇思妙想→

最近更新
01
ubuntu安装g++显示已有但是输入g++又找不到命令
04-15
02
使用cloudflare-r2搭建webdav
04-08
03
LLM聚合平台客户端对比
03-29
更多文章>
Theme by Vdoing | Copyright © 2022-2025 PeaceSheep
冀ICP备2022004632号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式