且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

ansible2.x 版本api 调用(适用于web开发使用)

更新时间:2022-08-23 22:46:23

摘自jumpserver 中  ansible模块 重写runner.    ansible2.3版本  pip3 install ansible


目录结构如下,  

新建以下 三个文件。 例子在 runner 最后。


├── callback.py

├── __init__.py

├── inventory.py

└── runner.py


callback.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# ~*~ coding: utf-8 ~*~
 
from collections import defaultdict
from ansible.plugins.callback import CallbackBase
 
 
class CommandResultCallback(CallbackBase):
    def __init__(self, display=None):
        self.result_q = dict(contacted={}, dark={})
        super(CommandResultCallback, self).__init__(display)
 
    def gather_result(self, n, res):
        self.result_q[n][res._host.name] = {}
        self.result_q[n][res._host.name]['cmd'= res._result.get('cmd')
        self.result_q[n][res._host.name]['stderr'= res._result.get('stderr')
        self.result_q[n][res._host.name]['stdout'= res._result.get('stdout')
        self.result_q[n][res._host.name]['rc'= res._result.get('rc')
 
    def v2_runner_on_ok(self, result):
        self.gather_result("contacted", result)
 
    def v2_runner_on_failed(self, result, ignore_errors=False):
        self.gather_result("dark", result)
 
    def v2_runner_on_unreachable(self, result):
        self.gather_result("dark", result)
 
    def v2_runner_on_skipped(self, result):
        self.gather_result("dark", result)
 
 
class AdHocResultCallback(CallbackBase):
    """
    AdHoc result Callback
    """
    def __init__(self, display=None):
        self.result_q = dict(contacted={}, dark={})
        super(AdHocResultCallback, self).__init__(display)
 
    def gather_result(self, n, res):
        if res._host.name in self.result_q[n]:
            self.result_q[n][res._host.name].append(res._result)
        else:
            self.result_q[n][res._host.name] = [res._result]
 
    def v2_runner_on_ok(self, result):
        self.gather_result("contacted", result)
 
    def v2_runner_on_failed(self, result, ignore_errors=False):
        self.gather_result("dark", result)
 
    def v2_runner_on_unreachable(self, result):
        self.gather_result("dark", result)
 
    def v2_runner_on_skipped(self, result):
        self.gather_result("dark", result)
 
    def v2_playbook_on_task_start(self, task, is_conditional):
        pass
 
    def v2_playbook_on_play_start(self, play):
        pass
 
 
class PlaybookResultCallBack(CallbackBase):
    """
    Custom callback model for handlering the output data of
    execute playbook file,
    Base on the build-in callback plugins of ansible which named `json`.
    """
 
    CALLBACK_VERSION = 2.0
    CALLBACK_TYPE = 'stdout'
    CALLBACK_NAME = 'Dict'
 
    def __init__(self, display=None):
        super(PlaybookResultCallBack, self).__init__(display)
        self.results = []
        self.output = ""
        self.item_results = {}  # {"host": []}
 
    def _new_play(self, play):
        return {
            'play': {
                'name': play.name,
                'id'str(play._uuid)
            },
            'tasks': []
        }
 
    def _new_task(self, task):
        return {
            'task': {
                'name': task.get_name(),
            },
            'hosts': {}
        }
 
    def v2_playbook_on_no_hosts_matched(self):
        self.output = "skipping: No match hosts."
 
    def v2_playbook_on_no_hosts_remaining(self):
        pass
 
    def v2_playbook_on_task_start(self, task, is_conditional):
        self.results[-1]['tasks'].append(self._new_task(task))
 
    def v2_playbook_on_play_start(self, play):
        self.results.append(self._new_play(play))
 
    def v2_playbook_on_stats(self, stats):
        hosts = sorted(stats.processed.keys())
        summary = {}
        for in hosts:
            = stats.summarize(h)
            summary[h] = s
 
        if self.output:
            pass
        else:
            self.output = {
                'plays'self.results,
                'stats': summary
            }
 
    def gather_result(self, res):
        if res._task.loop and "results" in res._result and res._host.name in self.item_results:
            res._result.update({"results"self.item_results[res._host.name]})
            del self.item_results[res._host.name]
 
        self.results[-1]['tasks'][-1]['hosts'][res._host.name] = res._result
 
    def v2_runner_on_ok(self, res, **kwargs):
        if "ansible_facts" in res._result:
            del res._result["ansible_facts"]
 
        self.gather_result(res)
 
    def v2_runner_on_failed(self, res, **kwargs):
        self.gather_result(res)
 
    def v2_runner_on_unreachable(self, res, **kwargs):
        self.gather_result(res)
 
    def v2_runner_on_skipped(self, res, **kwargs):
        self.gather_result(res)
 
    def gather_item_result(self, res):
        self.item_results.setdefault(res._host.name, []).append(res._result)
 
    def v2_runner_item_on_ok(self, res):
        self.gather_item_result(res)
 
    def v2_runner_item_on_failed(self, res):
        self.gather_item_result(res)
 
    def v2_runner_item_on_skipped(self, res):
        self.gather_item_result(res)

inventory.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# ~*~ coding: utf-8 ~*~
from ansible.inventory import Inventory, Host, Group
from ansible.vars import VariableManager
from ansible.parsing.dataloader import DataLoader
 
 
class JMSHost(Host):
    def __init__(self, asset):
        self.asset = asset
        self.name = name = asset.get('hostname'or asset.get('ip')
        self.port = port = asset.get('port'or 22
        super(JMSHost, self).__init__(name, port)
        self.set_all_variable()
 
    def set_all_variable(self):
        asset = self.asset
        self.set_variable('ansible_host', asset['ip'])
        self.set_variable('ansible_port', asset['port'])
        self.set_variable('ansible_user', asset['username'])
 
        # 添加密码和秘钥
        if asset.get('password'):
            self.set_variable('ansible_ssh_pass', asset['password'])
        if asset.get('private_key'):
            self.set_variable('ansible_ssh_private_key_file', asset['private_key'])
 
        # 添加become支持
        become = asset.get("become"False)
        if become:
            self.set_variable("ansible_become"True)
            self.set_variable("ansible_become_method", become.get('method''sudo'))
            self.set_variable("ansible_become_user", become.get('user''root'))
            self.set_variable("ansible_become_pass", become.get('pass', ''))
        else:
            self.set_variable("ansible_become"False)
 
 
class JMSInventory(Inventory):
    """
    提供生成Ansible inventory对象的方法
    """
 
    def __init__(self, host_list=None):
        if host_list is None:
            host_list = []
        assert isinstance(host_list, list)
        self.host_list = host_list
        self.loader = DataLoader()
        self.variable_manager = VariableManager()
        super(JMSInventory, self).__init__(self.loader, self.variable_manager,
                                           host_list=host_list)
 
    def parse_inventory(self, host_list):
        """用于生成动态构建Ansible Inventory.
        self.host_list: [
            {"name": "asset_name",
             "ip": <ip>,
             "port": <port>,
             "user": <user>,
             "pass": <pass>,
             "key": <sshKey>,
             "groups": ['group1', 'group2'],
             "other_host_var": <other>},
             {...},
        ]
 
        :return: 返回一个Ansible的inventory对象
        """
 
        # TODO: 验证输入
        # 创建Ansible Group,如果没有则创建default组
        ungrouped = Group('ungrouped')
        all = Group('all')
        all.add_child_group(ungrouped)
        self.groups = dict(all=all, ungrouped=ungrouped)
 
        for asset in host_list:
            host = JMSHost(asset=asset)
            asset_groups = asset.get('groups')
            if asset_groups:
                for group_name in asset_groups:
                    if group_name not in self.groups:
                        group = Group(group_name)
                        self.groups[group_name] = group
                    else:
                        group = self.groups[group_name]
                    group.add_host(host)
            else:
                ungrouped.add_host(host)
            all.add_host(host)

runner.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# ~*~ coding: utf-8 ~*~
from __future__ import unicode_literals
 
import os
from collections import namedtuple, defaultdict
import sys
sys.path.append('hostinfo/ansible_runner/')
 
 
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.vars import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.playbook.play import Play
import ansible.constants as C
from ansible.utils.vars import load_extra_vars
from ansible.utils.vars import load_options_vars
 
from inventory import JMSInventory
from callback import AdHocResultCallback, PlaybookResultCallBack, \
    CommandResultCallback
#from common.utils import get_logger
 
 
__all__ = ["AdHocRunner""PlayBookRunner"]
 
C.HOST_KEY_CHECKING = False
 
#logger = get_logger(__name__)
 
 
# Jumpserver not use playbook
class PlayBookRunner(object):
    """
    用于执行AnsiblePlaybook的接口.简化Playbook对象的使用.
    """
    Options = namedtuple('Options', [
        'listtags''listtasks''listhosts''syntax''connection',
        'module_path''forks''remote_user''private_key_file''timeout',
        'ssh_common_args''ssh_extra_args''sftp_extra_args',
        'scp_extra_args''become''become_method''become_user',
        'verbosity''check''extra_vars'])
 
    def __init__(self,
                 hosts=None,
                 playbook_path=None,
                 forks=C.DEFAULT_FORKS,
                 listtags=False,
                 listtasks=False,
                 listhosts=False,
                 syntax=False,
                 module_path=None,
                 remote_user='root',
                 timeout=C.DEFAULT_TIMEOUT,
                 ssh_common_args=None,
                 ssh_extra_args=None,
                 sftp_extra_args=None,
                 scp_extra_args=None,
                 become=True,
                 become_method=None,
                 become_user="root",
                 verbosity=None,
                 extra_vars=None,
                 connection_type="ssh",
                 passwords=None,
                 private_key_file=None,
                 check=False):
 
        C.RETRY_FILES_ENABLED = False
        self.callbackmodule = PlaybookResultCallBack()
        if playbook_path is None or not os.path.exists(playbook_path):
            raise AnsibleError(
                "Not Found the playbook file: %s." % playbook_path)
        self.playbook_path = playbook_path
        self.loader = DataLoader()
        self.variable_manager = VariableManager()
        self.passwords = passwords or {}
        self.inventory = JMSInventory(hosts)
 
        self.options = self.Options(
            listtags=listtags,
            listtasks=listtasks,
            listhosts=listhosts,
            syntax=syntax,
            timeout=timeout,
            connection=connection_type,
            module_path=module_path,
            forks=forks,
            remote_user=remote_user,
            private_key_file=private_key_file,
            ssh_common_args=ssh_common_args or "",
            ssh_extra_args=ssh_extra_args or "",
            sftp_extra_args=sftp_extra_args,
            scp_extra_args=scp_extra_args,
            become=become,
            become_method=become_method,
            become_user=become_user,
            verbosity=verbosity,
            extra_vars=extra_vars or [],
            check=check
        )
 
        self.variable_manager.extra_vars = load_extra_vars(loader=self.loader,
                                                           options=self.options)
        self.variable_manager.options_vars = load_options_vars(self.options)
 
        self.variable_manager.set_inventory(self.inventory)
 
        # 初始化playbook的executor
        self.runner = PlaybookExecutor(
            playbooks=[self.playbook_path],
            inventory=self.inventory,
            variable_manager=self.variable_manager,
            loader=self.loader,
            options=self.options,
            passwords=self.passwords)
 
        if self.runner._tqm:
            self.runner._tqm._stdout_callback = self.callbackmodule
 
    def run(self):
        if not self.inventory.list_hosts('all'):
            raise AnsibleError('Inventory is empty')
        self.runner.run()
        self.runner._tqm.cleanup()
        return self.callbackmodule.output
 
 
class AdHocRunner(object):
    """
    ADHoc接口
    """
    Options = namedtuple("Options", [
        'connection''module_path''private_key_file'"remote_user",
        'timeout''forks''become''become_method''become_user',
        'check''extra_vars',
        ]
    )
 
    results_callback_class = AdHocResultCallback
 
    def __init__(self,
                 hosts=C.DEFAULT_HOST_LIST,
                 forks=C.DEFAULT_FORKS,  # 5
                 timeout=C.DEFAULT_TIMEOUT,  # SSH timeout = 10s
                 remote_user=C.DEFAULT_REMOTE_USER,  # root
                 module_path=None,  # dirs of custome modules
                 connection_type="smart",
                 become=None,
                 become_method=None,
                 become_user=None,
                 check=False,
                 passwords=None,
                 extra_vars=None,
                 private_key_file=None,
                 gather_facts='no'):
 
        self.pattern = ''
        self.variable_manager = VariableManager()
        self.loader = DataLoader()
        self.gather_facts = gather_facts
        self.results_callback = AdHocRunner.results_callback_class()
        self.options = self.Options(
            connection=connection_type,
            timeout=timeout,
            module_path=module_path,
            forks=forks,
            become=become,
            become_method=become_method,
            become_user=become_user,
            check=check,
            remote_user=remote_user,
            extra_vars=extra_vars or [],
            private_key_file=private_key_file,
        )
 
        self.variable_manager.extra_vars = load_extra_vars(self.loader,
                                                           options=self.options)
        self.variable_manager.options_vars = load_options_vars(self.options)
        self.passwords = passwords or {}
        self.inventory = JMSInventory(hosts)
        self.variable_manager.set_inventory(self.inventory)
        self.tasks = []
        self.play_source = None
        self.play = None
        self.runner = None
 
    @staticmethod
    def check_module_args(module_name, module_args=''):
        if module_name in C.MODULE_REQUIRE_ARGS and not module_args:
            err = "No argument passed to '%s' module." % module_name
            print(err)
            return False
        return True
 
    def run(self, task_tuple, pattern='all', task_name='Ansible Ad-hoc'):
        """
        :param task_tuple:  (('shell', 'ls'), ('ping', ''))
        :param pattern:
        :param task_name:
        :return:
        """
        for module, args in task_tuple:
            if not self.check_module_args(module, args):
                return
            self.tasks.append(
                dict(action=dict(
                    module=module,
                    args=args,
                ))
            )
 
        self.play_source = dict(
            name=task_name,
            hosts=pattern,
            gather_facts=self.gather_facts,
            tasks=self.tasks
        )
 
        self.play = Play().load(
            self.play_source,
            variable_manager=self.variable_manager,
            loader=self.loader,
        )
 
        self.runner = TaskQueueManager(
            inventory=self.inventory,
            variable_manager=self.variable_manager,
            loader=self.loader,
            options=self.options,
            passwords=self.passwords,
            stdout_callback=self.results_callback,
        )
 
        if not self.inventory.list_hosts("all"):
            raise AnsibleError("Inventory is empty.")
 
        if not self.inventory.list_hosts(self.pattern):
            raise AnsibleError(
                "pattern: %s  dose not match any hosts." % self.pattern)
 
        try:
            self.runner.run(self.play)
        except Exception as e:
            logger.warning(e)
        else:
            #logger.debug(self.results_callback.result_q)
            return self.results_callback.result_q
        finally:
            if self.runner:
                self.runner.cleanup()
            if self.loader:
                self.loader.cleanup_all_tmp_files()
 
    def clean_result(self):
        """
        :return: {
            "success": ['hostname',],
            "failed": [('hostname', 'msg'), {}],
        }
        """
        result = {'success': [], 'failed': []}
        for host in self.results_callback.result_q['contacted']:
            result['success'].append(host)
 
        for host, msgs in self.results_callback.result_q['dark'].items():
            msg = '\n'.join(['{} {}: {}'.format(
                msg.get('module_stdout', ''),
                msg.get('invocation', {}).get('module_name'),
                msg.get('msg', '')) for msg in msgs])
            result['failed'].append((host, msg))
        return result
 
 
def test_run():
    assets = [
        {
                "hostname""192.168.244.129",
                "ip""192.168.244.129",
                "port"22,
                "username""root",
                "password""redhat",
        },
    ]
    task_tuple = (('shell''ls'),)  ##例子,调用普通的模块命令
    hoc = AdHocRunner(hosts=assets)
    hoc.results_callback = CommandResultCallback()
    ret = hoc.run(task_tuple)
    print(ret)
 
    task_tuple = (('setup',''),)  ##例子,调用setup,获取资产信息
    runner = AdHocRunner(assets)
    result = runner.run(task_tuple=task_tuple,pattern='all', task_name='Ansible Ad-hoc')
    print(result)
 
    #play = PlayBookRunner(assets, playbook_path='/tmp/some.yml')  ##yml
    """
    # /tmp/some.yml
    ---
    - name: Test the plabybook API.
      hosts: all
      remote_user: root
      gather_facts: yes
      tasks:
       - name: exec uptime
         shell: uptime
    """
    #play.run()
 
 
if __name__ == "__main__":
    test_run()









本文转自 295631788 51CTO博客,原文链接:http://blog.51cto.com/hequan/1948209,如需转载请自行联系原作者