OpenStack Nova Compute Node资源使用情况

发布时间: 更新时间: 总字数:1737 阅读时间:4m 作者: IP上海 分享 网址

OpenStack Compute Node资源使用情况。

compute_nodes表解析

compute_nodes保存了compute node的资源使用情况,主要包括几个方面:

  • hypervisor信息
    • hypervisor_type (xen)
    • hypervisor_version (4001000)
  • 计算
    • cpu_info (例如: {“arch”: “x86_64”, “features”: [], “topology”: {}} )
    • 物理vcpus数量 (os.sysconf(‘SC_NPROCESSORS_ONLN’))
    • vcpus_used (利用libvirt connection找出所有的domain,累加vcpus)
  • 内存
    • host_memory_total (libvirt connection getInfo()获得)
    • memory_mb_used (利用libvirt connection找出所有的domain,累加已使用mem。如果是xen,还算上domain0的内存)
  • 磁盘
    • FLAGS.instances_path的总空间 (os.statvfs(path))
    • FLAGS.instances_path的已使用空间
    • FLAGS.instances_path的剩余空间
CREATE TABLE `compute_nodes` (
  `created_at` datetime default NULL,
  `updated_at` datetime default NULL,
  `deleted_at` datetime default NULL,
  `deleted` tinyint(1) default NULL,
  `id` int(11) NOT NULL auto_increment,
  `service_id` int(11) NOT NULL,
  `vcpus` int(11) NOT NULL,
  `memory_mb` int(11) NOT NULL,
  `local_gb` int(11) NOT NULL,
  `vcpus_used` int(11) NOT NULL,
  `memory_mb_used` int(11) NOT NULL,
  `local_gb_used` int(11) NOT NULL,
  `hypervisor_type` mediumtext NOT NULL,
  `hypervisor_version` int(11) NOT NULL,
  `cpu_info` mediumtext NOT NULL,
  `disk_available_least` int(11) default NULL,
  `free_ram_mb` int(11) default NULL,
  `free_disk_gb` int(11) default NULL,
  `current_workload` int(11) default NULL,
  `running_vms` int(11) default NULL,
  `hypervisor_hostname` varchar(255) default NULL,
  PRIMARY KEY  (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8

定时任务

ComputeManager定期将node资源使用发给scheduler,ComputeManager定期(默认2小时)通过LibvirtConnection.get_host_stats获取compute节点真实的资源情况,并通过rpc发送给scheduler,让其调用update_service_capabilities方法,scheduler的update_service_capabilities方法被调用后,立即存入 service_states内存中

ComputeManager._report_driver_status,注意其中的refresh=True,含义是从libvirt获取真实的资源使用情况

@manager.periodic_task
def _report_driver_status(self, context):
	curr_time = time.time()
	if curr_time - self._last_host_check > FLAGS.host_state_interval:
		self._last_host_check = curr_time
		LOG.info(_("Updating host status"))
		# This will grab info about the host and queue it
		# to be sent to the Schedulers.
		self.update_service_capabilities(
			self.driver.get_host_stats(refresh=True))
cfg.IntOpt('host_state_interval',
             default=120,
             help='Interval in seconds for querying the host status'),

libvirt/connection.py

class HostState(object):
    """Manages information about the compute node through libvirt"""
    def __init__(self, read_only):
        super(HostState, self).__init__()
        self.read_only = read_only
        self._stats = {}
        self.connection = None
        self.update_status()

    def get_host_stats(self, refresh=False):
        """Return the current state of the host.

        If 'refresh' is True, run update the stats first."""
        if refresh:
            self.update_status()
        return self._stats

    def update_status(self):
        """Retrieve status info from libvirt."""
        LOG.debug(_("Updating host stats"))
        if self.connection is None:
            self.connection = get_connection(self.read_only)
        data = {}
        data["vcpus"] = self.connection.get_vcpu_total()
        data["vcpus_used"] = self.connection.get_vcpu_used()
        data["cpu_info"] = utils.loads(self.connection.get_cpu_info())
        data["disk_total"] = self.connection.get_local_gb_total()
        data["disk_used"] = self.connection.get_local_gb_used()
        data["disk_available"] = data["disk_total"] - data["disk_used"]
        data["host_memory_total"] = self.connection.get_memory_mb_total()
        data["host_memory_free"] = (data["host_memory_total"] -
                                    self.connection.get_memory_mb_used())
        data["hypervisor_type"] = self.connection.get_hypervisor_type()
        data["hypervisor_version"] = self.connection.get_hypervisor_version()

        self._stats = data

        return data

数据更新

ComputeManager在run_instance、terminate_instance、ha_migrate_instance时,会调用LibvirtConnection.update_available_resource来创建或修改compute_nodes记录

FilterScheduler._schedule,当Scheduler开始schedule时,就更新host state数据

def _schedule(self, context, topic, request_spec, *args, **kwargs):
	"""Returns a list of hosts that meet the required specs,
	ordered by their fitness.
	"""
	elevated = context.elevated()
	if topic != "compute":
		msg = _("Scheduler only understands Compute nodes (for now)")
		raise NotImplementedError(msg)

	instance_properties = request_spec['instance_properties']
	instance_type = request_spec.get("instance_type", None)

	cost_functions = self.get_cost_functions()
	config_options = self._get_configuration_options()

	filter_properties = kwargs.get('filter_properties', {})
	filter_properties.update({'context': context,
							'request_spec': request_spec,
							'config_options': config_options,
							'instance_type': instance_type})

	self.populate_filter_properties(request_spec,
									filter_properties)

	# Find our local list of acceptable hosts by repeatedly
	# filtering and weighing our options. Each time we choose a
	# host, we virtually consume resources on it so subsequent
	# selections can adjust accordingly.

	# unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
	unfiltered_hosts_dict = self.host_manager.get_all_host_states(
			elevated, topic)

	# Note: remember, we are using an iterator here. So only
	# traverse this list once. This can bite you if the hosts
	# are being scanned in a filter or weighing function.
	hosts = unfiltered_hosts_dict.itervalues()

	num_instances = request_spec.get('num_instances', 1)
	selected_hosts = []

	for num in xrange(num_instances):
		# Filter local hosts based on requirements ...
		hosts = self.host_manager.filter_hosts(hosts,
				filter_properties)
		if not hosts:
			# Can't get any more locally.
			break

		LOG.debug(_("Filtered %(hosts)s") % locals())

		# weighted_host = WeightedHost() ... the best
		# host for the job.
		# TODO(comstud): filter_properties will also be used for
		# weighing and I plan fold weighing into the host manager
		# in a future patch.  I'll address the naming of this
		# variable at that time.
		weighted_host = least_cost.weighted_sum(cost_functions,
				hosts, filter_properties)
		LOG.debug(_("Weighted %(weighted_host)s") % locals())
		selected_hosts.append(weighted_host)

		# Now consume the resources so the filter/weights
		# will change for the next instance.
		weighted_host.host_state.consume_from_instance(
				instance_properties)

	selected_hosts.sort(key=operator.attrgetter('weight'))
	return selected_hosts[:num_instances]
def notify(message):
    """Look for specific compute manager events and interprete them
    so as to keep the Capacity table up to date.

    NOTE: the True/False return codes are only for testing.
    """

	db.api.compute_node_utilization_update(context.get_admin_context(), host,
        free_ram_mb_delta=free_ram_mb, free_disk_gb_delta=free_disk_gb,
        work_delta=work, vm_delta=vms)
		cfg.IntOpt('host_state_interval',
	               default=120,
	               help='Interval in seconds for querying the host status'),

定时任务将last_capabilities发给Scheduler,让Scheduler知道最新的资源情况

class SchedulerDependentManager(Manager):
    """Periodically send capability updates to the Scheduler services.

    Services that need to update the Scheduler of their capabilities
    should derive from this class. Otherwise they can derive from
    manager.Manager directly. Updates are only sent after
    update_service_capabilities is called with non-None values.

    """

    def __init__(self, host=None, db_driver=None, service_name='undefined'):
        self.last_capabilities = None
        self.service_name = service_name
        super(SchedulerDependentManager, self).__init__(host, db_driver)

    def update_service_capabilities(self, capabilities):
        """Remember these capabilities to send on next periodic update."""
        self.last_capabilities = capabilities

    @periodic_task
    def _publish_service_capabilities(self, context):
        """Pass data back to the scheduler at a periodic interval."""
        if self.last_capabilities:
            LOG.debug(_('Notifying Schedulers of capabilities ...'))
            api.update_service_capabilities(context, self.service_name,
                                self.host, self.last_capabilities)
def update_service_capabilities(context, service_name, host, capabilities):
    """Send an update to all the scheduler services informing them
       of the capabilities of this service."""
    kwargs = dict(method='update_service_capabilities',
                  args=dict(service_name=service_name, host=host,
                            capabilities=capabilities))
    return rpc.fanout_cast(context, 'scheduler', kwargs)

SchedulerManager:

def update_service_capabilities(self, context, service_name=None,
		host=None, capabilities=None, **kwargs):
	"""Process a capability update from a service node."""
	if capabilities is None:
		capabilities = {}
	self.driver.update_service_capabilities(service_name, host,
			capabilities)
def update_service_capabilities(self, service_name, host, capabilities):
	"""Update the per-service capabilities based on this notification."""
	LOG.debug(_("Received %(service_name)s service update from "
				"%(host)s.") % locals())
	service_caps = self.service_states.get(host, {})
	# Copy the capabilities, so we don't modify the original dict
	capab_copy = dict(capabilities)
	capab_copy["timestamp"] = utils.utcnow()  # Reported time
	service_caps[service_name] = capab_copy
	self.service_states[host] = service_caps

scheduler接收后存入service_states内存中。

参考

  1. http://zhouyaguo.github.io/openstack/2015/09/02/compute-node/
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数