hl10502的博客

ceph-deploy源码分析(四)——osd模块

ceph-deploy的osd.py模块是用来管理osd守护进程,主要是创建与激活OSD。

osd 子命令格式如下

1
ceph-deploy osd [-h] {list,create,prepare,activate} ...

  • list: 显示osd列表信息
  • create: 创建OSD,包含prepare与activate
  • prepare: 准备OSD,通过格式化/分区磁盘
  • activate: 激活准备的OSD

OSD管理

make函数

  • priority为50
  • osd子命令默认执行函数为osd
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    @priority(50)
    def make(parser):
    """
    Prepare a data disk on remote host.
    """
    sub_command_help = dedent("""
    Manage OSDs by preparing a data disk on remote host.
    For paths, first prepare and then activate:
    ceph-deploy osd prepare {osd-node-name}:/path/to/osd
    ceph-deploy osd activate {osd-node-name}:/path/to/osd
    For disks or journals the `create` command will do prepare and activate
    for you.
    """
    )
    parser.formatter_class = argparse.RawDescriptionHelpFormatter
    parser.description = sub_command_help
    osd_parser = parser.add_subparsers(dest='subcommand')
    osd_parser.required = True
    osd_list = osd_parser.add_parser(
    'list',
    help='List OSD info from remote host(s)'
    )
    osd_list.add_argument(
    'disk',
    nargs='+',
    metavar='HOST:DISK[:JOURNAL]',
    type=colon_separated,
    help='remote host to list OSDs from'
    )
    osd_create = osd_parser.add_parser(
    'create',
    help='Create new Ceph OSD daemon by preparing and activating disk'
    )
    osd_create.add_argument(
    '--zap-disk',
    action='store_true',
    help='destroy existing partition table and content for DISK',
    )
    osd_create.add_argument(
    '--fs-type',
    metavar='FS_TYPE',
    choices=['xfs',
    'btrfs'
    ],
    default='xfs',
    help='filesystem to use to format DISK (xfs, btrfs)',
    )
    osd_create.add_argument(
    '--dmcrypt',
    action='store_true',
    help='use dm-crypt on DISK',
    )
    osd_create.add_argument(
    '--dmcrypt-key-dir',
    metavar='KEYDIR',
    default='/etc/ceph/dmcrypt-keys',
    help='directory where dm-crypt keys are stored',
    )
    osd_create.add_argument(
    '--bluestore',
    action='store_true', default=None,
    help='bluestore objectstore',
    )
    osd_create.add_argument(
    'disk',
    nargs='+',
    metavar='HOST:DISK[:JOURNAL]',
    type=colon_separated,
    help='host and disk to prepare',
    )
    osd_prepare = osd_parser.add_parser(
    'prepare',
    help='Prepare a disk for use as Ceph OSD by formatting/partitioning disk'
    )
    osd_prepare.add_argument(
    '--zap-disk',
    action='store_true',
    help='destroy existing partition table and content for DISK',
    )
    osd_prepare.add_argument(
    '--fs-type',
    metavar='FS_TYPE',
    choices=['xfs',
    'btrfs'
    ],
    default='xfs',
    help='filesystem to use to format DISK (xfs, btrfs)',
    )
    osd_prepare.add_argument(
    '--dmcrypt',
    action='store_true',
    help='use dm-crypt on DISK',
    )
    osd_prepare.add_argument(
    '--dmcrypt-key-dir',
    metavar='KEYDIR',
    default='/etc/ceph/dmcrypt-keys',
    help='directory where dm-crypt keys are stored',
    )
    osd_prepare.add_argument(
    '--bluestore',
    action='store_true', default=None,
    help='bluestore objectstore',
    )
    osd_prepare.add_argument(
    'disk',
    nargs='+',
    metavar='HOST:DISK[:JOURNAL]',
    type=colon_separated,
    help='host and disk to prepare',
    )
    osd_activate = osd_parser.add_parser(
    'activate',
    help='Start (activate) Ceph OSD from disk that was previously prepared'
    )
    osd_activate.add_argument(
    'disk',
    nargs='+',
    metavar='HOST:DISK[:JOURNAL]',
    type=colon_separated,
    help='host and disk to activate',
    )
    parser.set_defaults(
    func=osd,
    )

osd函数,osd子命令list,create,prepare,activate分别对应的函数为osd_list、prepare、prepare、activate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def osd(args):
cfg = conf.ceph.load(args)
if args.subcommand == 'list':
osd_list(args, cfg)
elif args.subcommand == 'prepare':
prepare(args, cfg, activate_prepared_disk=False)
elif args.subcommand == 'create':
prepare(args, cfg, activate_prepared_disk=True)
elif args.subcommand == 'activate':
activate(args, cfg)
else:
LOG.error('subcommand %s not implemented', args.subcommand)
sys.exit(1)

OSD列表

命令行格式为:ceph-deploy osd list [-h] HOST:DISK[:JOURNAL] [HOST:DISK[:JOURNAL] …]

osd_list函数

  • 执行ceph --cluster=ceph osd tree --format=json命令获取OSD信息
  • 执行ceph-disk list命令获取磁盘、分区信息
  • 根据两个命令结果以及osd目录下文件信息,组装输出OSD列表数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    def osd_list(args, cfg):
    monitors = mon.get_mon_initial_members(args, error_on_empty=True, _cfg=cfg)
    # get the osd tree from a monitor host
    mon_host = monitors[0]
    distro = hosts.get(
    mon_host,
    username=args.username,
    callbacks=[packages.ceph_is_installed]
    )
    # 执行ceph --cluster=ceph osd tree --format=json命令获取osd信息
    tree = osd_tree(distro.conn, args.cluster)
    distro.conn.exit()
    interesting_files = ['active', 'magic', 'whoami', 'journal_uuid']
    for hostname, disk, journal in args.disk:
    distro = hosts.get(hostname, username=args.username)
    remote_module = distro.conn.remote_module
    #获取OSD的目录/var/run/ceph/osd下的osd名称
    osds = distro.conn.remote_module.listdir(constants.osd_path)
    # 执行ceph-disk list命令获取磁盘、分区信息
    ceph_disk_executable = system.executable_path(distro.conn, 'ceph-disk')
    output, err, exit_code = remoto.process.check(
    distro.conn,
    [
    ceph_disk_executable,
    'list',
    ]
    )
    # 循环OSD
    for _osd in osds:
    # osd路径,比如/var/run/ceph/osd/ceph-0
    osd_path = os.path.join(constants.osd_path, _osd)
    # journal路径
    journal_path = os.path.join(osd_path, 'journal')
    # OSD的id
    _id = int(_osd.split('-')[-1]) # split on dash, get the id
    osd_name = 'osd.%s' % _id
    metadata = {}
    json_blob = {}
    # piggy back from ceph-disk and get the mount point
    # ceph-disk list的结果与osd名称匹配,获取磁盘设备
    device = get_osd_mount_point(output, osd_name)
    if device:
    metadata['device'] = device
    # read interesting metadata from files
    # 获取OSD下的active, magic, whoami, journal_uuid文件信息
    for f in interesting_files:
    osd_f_path = os.path.join(osd_path, f)
    if remote_module.path_exists(osd_f_path):
    metadata[f] = remote_module.readline(osd_f_path)
    # do we have a journal path?
    # 获取 journal path
    if remote_module.path_exists(journal_path):
    metadata['journal path'] = remote_module.get_realpath(journal_path)
    # is this OSD in osd tree?
    for blob in tree['nodes']:
    if blob.get('id') == _id: # matches our OSD
    json_blob = blob
    # 输出OSD信息
    print_osd(
    distro.conn.logger,
    hostname,
    osd_path,
    json_blob,
    metadata,
    )
    distro.conn.exit()

创建OSD&准备OSD

创建OSD的命令行格式为:ceph-deploy osd create [-h] [--zap-disk] [--fs-type FS_TYPE] [--dmcrypt] [--dmcrypt-key-dir KEYDIR] [--bluestore] HOST:DISK[:JOURNAL] [HOST:DISK[:JOURNAL] …]

准备OSD的命令行格式为:ceph-deploy osd prepare [-h] [--zap-disk] [--fs-type FS_TYPE] [--dmcrypt] [--dmcrypt-key-dir KEYDIR] [--bluestore] HOST:DISK[:JOURNAL] [HOST:DISK[:JOURNAL] …]

prepare函数,参数activate_prepared_disk为True是创建OSD,为False是准备OSD

  • 调用exceeds_max_osds函数,单台主机超过20个OSD,将会warning
  • 调用get_bootstrap_osd_key函数,获取当前目录下的ceph.bootstrap-osd.keyring
  • 循环disk
    • 配置写入 /etc/ceph/ceph.conf
    • 创建并写入 /var/lib/ceph/bootstrap-osd/ceph.keyring
    • 调用prepare_disk函数,准备OSD
    • 校验OSD状态,并将信息非正常状态信息写入warning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def prepare(args, cfg, activate_prepared_disk):
LOG.debug(
'Preparing cluster %s disks %s',
args.cluster,
' '.join(':'.join(x or '' for x in t) for t in args.disk),
)
# 单台主机超过20个OSD,将会warning
hosts_in_danger = exceeds_max_osds(args)
if hosts_in_danger:
LOG.warning('if ``kernel.pid_max`` is not increased to a high enough value')
LOG.warning('the following hosts will encounter issues:')
for host, count in hosts_in_danger.items():
LOG.warning('Host: %8s, OSDs: %s' % (host, count))
# 获取当前目录下的ceph.bootstrap-osd.keyring
key = get_bootstrap_osd_key(cluster=args.cluster)
bootstrapped = set()
errors = 0
for hostname, disk, journal in args.disk:
try:
if disk is None:
raise exc.NeedDiskError(hostname)
distro = hosts.get(
hostname,
username=args.username,
callbacks=[packages.ceph_is_installed]
)
LOG.info(
'Distro info: %s %s %s',
distro.name,
distro.release,
distro.codename
)
if hostname not in bootstrapped:
bootstrapped.add(hostname)
LOG.debug('Deploying osd to %s', hostname)
conf_data = conf.ceph.load_raw(args)
# 配置写入/etc/ceph/ceph.conf
distro.conn.remote_module.write_conf(
args.cluster,
conf_data,
args.overwrite_conf
)
# 创建并写入 /var/lib/ceph/bootstrap-osd/ceph.keyring
create_osd_keyring(distro.conn, args.cluster, key)
LOG.debug('Preparing host %s disk %s journal %s activate %s',
hostname, disk, journal, activate_prepared_disk)
storetype = None
if args.bluestore:
storetype = 'bluestore'
# 准备OSD
prepare_disk(
distro.conn,
cluster=args.cluster,
disk=disk,
journal=journal,
activate_prepared_disk=activate_prepared_disk,
init=distro.init,
zap=args.zap_disk,
fs_type=args.fs_type,
dmcrypt=args.dmcrypt,
dmcrypt_dir=args.dmcrypt_key_dir,
storetype=storetype,
)
# give the OSD a few seconds to start
time.sleep(5)
# 校验OSD状态,并将信息非正常状态信息写入warning
catch_osd_errors(distro.conn, distro.conn.logger, args)
LOG.debug('Host %s is now ready for osd use.', hostname)
distro.conn.exit()
except RuntimeError as e:
LOG.error(e)
errors += 1
if errors:
raise exc.GenericError('Failed to create %d OSDs' % errors)

prepare_disk函数

  • 执行 ceph-disk -v prepare 命令准备OSD
  • 如果activate_prepared_disk为True,设置ceph服务开机启动
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    def prepare_disk(
    conn,
    cluster,
    disk,
    journal,
    activate_prepared_disk,
    init,
    zap,
    fs_type,
    dmcrypt,
    dmcrypt_dir,
    storetype):
    """
    Run on osd node, prepares a data disk for use.
    """
    ceph_disk_executable = system.executable_path(conn, 'ceph-disk')
    args = [
    ceph_disk_executable,
    '-v',
    'prepare',
    ]
    if zap:
    args.append('--zap-disk')
    if dmcrypt:
    args.append('--dmcrypt')
    if dmcrypt_dir is not None:
    args.append('--dmcrypt-key-dir')
    args.append(dmcrypt_dir)
    if storetype:
    args.append('--' + storetype)
    args.extend([
    '--cluster',
    cluster,
    '--fs-type',
    fs_type,
    '--',
    disk,
    ])
    if journal is not None:
    args.append(journal)
    # 执行 ceph-disk -v prepare 命令
    remoto.process.run(
    conn,
    args
    )
    # 是否激活,激活即设置ceph服务开机启动
    if activate_prepared_disk:
    # we don't simply run activate here because we don't know
    # which partition ceph-disk prepare created as the data
    # volume. instead, we rely on udev to do the activation and
    # just give it a kick to ensure it wakes up. we also enable
    # ceph.target, the other key piece of activate.
    if init == 'systemd':
    system.enable_service(conn, "ceph.target")
    elif init == 'sysvinit':
    system.enable_service(conn, "ceph")

激活OSD

命令行格式为:ceph-deploy osd activate [-h] HOST:DISK[:JOURNAL] [HOST:DISK[:JOURNAL] …]

activate函数

  • 执行 ceph-disk -v activate 命令激活OSD
  • 校验OSD状态,并将信息非正常状态信息写入warning
  • 设置ceph服务开机启动
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    def activate(args, cfg):
    LOG.debug(
    'Activating cluster %s disks %s',
    args.cluster,
    # join elements of t with ':', t's with ' '
    # allow None in elements of t; print as empty
    ' '.join(':'.join((s or '') for s in t) for t in args.disk),
    )
    for hostname, disk, journal in args.disk:
    distro = hosts.get(
    hostname,
    username=args.username,
    callbacks=[packages.ceph_is_installed]
    )
    LOG.info(
    'Distro info: %s %s %s',
    distro.name,
    distro.release,
    distro.codename
    )
    LOG.debug('activating host %s disk %s', hostname, disk)
    LOG.debug('will use init type: %s', distro.init)
    ceph_disk_executable = system.executable_path(distro.conn, 'ceph-disk')
    # 执行 ceph-disk -v activate 命令激活OSD
    remoto.process.run(
    distro.conn,
    [
    ceph_disk_executable,
    '-v',
    'activate',
    '--mark-init',
    distro.init,
    '--mount',
    disk,
    ],
    )
    # give the OSD a few seconds to start
    time.sleep(5)
    # 校验OSD状态,并将信息非正常状态信息写入warning
    catch_osd_errors(distro.conn, distro.conn.logger, args)
    # 设置ceph服务开机启动
    if distro.init == 'systemd':
    system.enable_service(distro.conn, "ceph.target")
    elif distro.init == 'sysvinit':
    system.enable_service(distro.conn, "ceph")
    distro.conn.exit()

手工管理OSD

以ceph-231上磁盘sdb为例,创建osd。

创建OSD&准备OSD

准备OSD

1
[root@ceph-231 ~]# ceph-disk -v prepare --zap-disk --cluster ceph --fs-type xfs -- /dev/sdb

创建OSD多一个操作,设置ceph服务开机启动

1
[root@ceph-231 ~]# systemctl enable ceph.target

激活OSD

查看init

1
2
[root@ceph-231 ~]# cat /proc/1/comm
systemd

激活OSD

1
[root@ceph-231 ~]# ceph-disk -v activate --mark-init systemd --mount /dev/sdb1

设置ceph服务开机启动

1
[root@ceph-231 ~]# systemctl enable ceph.target