目录

python-在进程中动态加载模块

python 在进程中动态加载模块

背景

在web 运维平台, 我们有时候希望创建部署一个运维脚本, 此时,传统方式: 新建脚本 》部署脚本》重启服务

很多时刻我们不想重启服务, 那么有没有不重启服务依旧可以加载新建的脚本呢? 接下来就是解决方案:

原理

  1. 获取脚本code
  2. 通过compile 加载code 到进程中
  3. 通过python 的反射机制获取脚本中的方法并执行

结构

  • base_handler.py – 通过反射获取function
  • schedul.py – 加载code 并调度
  • tese.py – 测试脚本

base_handler.py

import inspect
import sys


class BaseHandler(object):
    """
    所有脚本的基类BaseHandler
    """

    # 继承类必须实现的方法
    callbacks = ['execute', 'callback']

    @classmethod
    def _run_func(cls, function, *arguments):
        """
        执行指定函数
        """
        args, varargs, keywords, defaults = inspect.getargspec(function)
        ret = function(*arguments[:len(args) - 1])
        return ret

    @classmethod
    def _run_task(cls, task, response, callback):
        """
        找对对应函数
        """
        if not hasattr(cls, callback):
            raise NotImplementedError("self.%s() not implemented!" % callback)

        function = getattr(cls, callback)
        return cls._run_func(function, task, response)

    @classmethod
    def run_task(cls, module, task, response, callback):
        """
        舆情脚本运行当前任务,获取异常日志,返回ProcessorResult对象
        """
        exception = None
        stdout = sys.stdout
        results = []
        sub_tasks = []
        try:
            results, sub_tasks = cls._run_task(task, response, callback)

        except Exception as e:
            exception = e
        finally:
            sys.stdout = stdout
            # logs = list(module.log_buffer)
            # module.log_buffer[:] = []
        return results, sub_tasks

schedul.py

# coding: utf-8

import imp
import inspect
import linecache
import sys
import six


class ProjectLoader(object):
    """
    加载并执行字符串类型的python 代码
    project is dict
        示例:
        project = { 'script_name': 'xxx', 'code': code}
    m = ProjectLoader(project)
    """

    def __init__(self, project, mod=None):
        self.project = project
        self.name = project.get('script_name')
        self.mod = mod

    def load_module(self):
        if self.mod is None:
            self.mod = mod = imp.new_module(self.name)
        else:
            mod = self.mod
        mod.__file__ = '<%s>' % self.name
        mod.__loader__ = self
        mod.__project__ = self.project
        mod.__package__ = ''
        # 获取code
        code = self.get_code()
        # 执行
        six.exec_(code, mod.__dict__)
        # 清除缓存
        linecache.clearcache()
        if sys.version_info[:2] == (3, 3):
            sys.modules[self.name] = mod
        return mod

    def get_code(self):
        return compile(self.get_source(), '<%s>' % self.name, 'exec')

    def get_source(self):
        script = self.project['code']
        if isinstance(script, six.text_type):
            return script.encode('utf8')
        return script


if __name__ == '__main__':
    from celery_app import base_handler
    
    # 这里测试, 我直接从文件中读, 你可以试着通过API 从数据库中获取脚本code
    with open('test.py', 'r', encoding="utf-8") as f:
        code = f.read()
    # 测试code info
    obj = {
        'script_name': 'test',
        'code': code
    }
    # 加载code 到进程
    instance = ProjectLoader(obj)
    module = instance.load_module()
    # 获取脚本类
    if '__handler_cls__' not in module.__dict__:
        BaseHandler = module.__dict__.get('BaseHandler',
                                          base_handler.BaseHandler)
        for each in list(six.itervalues(module.__dict__)):
            if inspect.isclass(each) and each is not BaseHandler \
                    and issubclass(each, BaseHandler):
                module.__dict__['__handler_cls__'] = each
    instance = module.__dict__.get('__handler_cls__')
    # 把meta 信息放到脚本实例中
    instance.project_name = obj['script_name']
    instance.project = obj
    assert instance is not None, "need BaseHandler in project module"

    # 开始调用脚本中的方法
    instance.run_task(module, 456, 123456, 'callback')
    print(module)

test.py

# coding: utf-8

from base_handler import BaseHandler


class TestC(BaseHandler):

    def __init__(self, config):
        self.config = config

    @classmethod
    def execute(self, v, v2):
        print(v, v2)

    @classmethod
    def callback(self, v, v2):
        print(v, v2)