OpenStack Nova Compute Node资源使用情况

发布时间: 更新时间: 总字数:1737 阅读时间:4m 作者: IP上海 分享 网址

OpenStack Compute Node资源使用情况。


compute_nodes保存了compute node的资源使用情况,主要包括几个方面:

  • hypervisor信息
    • hypervisor_type (xen)
    • hypervisor_version (4001000)
  • 计算
    • cpu_info (例如: {“arch”: “x86_64”, “features”: [], “topology”: {}} )
    • 物理vcpus数量 (os.sysconf(‘SC_NPROCESSORS_ONLN’))
    • vcpus_used (利用libvirt connection找出所有的domain,累加vcpus)
  • 内存
    • host_memory_total (libvirt connection getInfo()获得)
    • memory_mb_used (利用libvirt connection找出所有的domain,累加已使用mem。如果是xen,还算上domain0的内存)
  • 磁盘
    • FLAGS.instances_path的总空间 (os.statvfs(path))
    • FLAGS.instances_path的已使用空间
    • FLAGS.instances_path的剩余空间
CREATE TABLE `compute_nodes` (
  `created_at` datetime default NULL,
  `updated_at` datetime default NULL,
  `deleted_at` datetime default NULL,
  `deleted` tinyint(1) default NULL,
  `id` int(11) NOT NULL auto_increment,
  `service_id` int(11) NOT NULL,
  `vcpus` int(11) NOT NULL,
  `memory_mb` int(11) NOT NULL,
  `local_gb` int(11) NOT NULL,
  `vcpus_used` int(11) NOT NULL,
  `memory_mb_used` int(11) NOT NULL,
  `local_gb_used` int(11) NOT NULL,
  `hypervisor_type` mediumtext NOT NULL,
  `hypervisor_version` int(11) NOT NULL,
  `cpu_info` mediumtext NOT NULL,
  `disk_available_least` int(11) default NULL,
  `free_ram_mb` int(11) default NULL,
  `free_disk_gb` int(11) default NULL,
  `current_workload` int(11) default NULL,
  `running_vms` int(11) default NULL,
  `hypervisor_hostname` varchar(255) default NULL,
  PRIMARY KEY  (`id`)


ComputeManager定期将node资源使用发给scheduler,ComputeManager定期(默认2小时)通过LibvirtConnection.get_host_stats获取compute节点真实的资源情况,并通过rpc发送给scheduler,让其调用update_service_capabilities方法,scheduler的update_service_capabilities方法被调用后,立即存入 service_states内存中


def _report_driver_status(self, context):
	curr_time = time.time()
	if curr_time - self._last_host_check > FLAGS.host_state_interval:
		self._last_host_check = curr_time"Updating host status"))
		# This will grab info about the host and queue it
		# to be sent to the Schedulers.
             help='Interval in seconds for querying the host status'),


class HostState(object):
    """Manages information about the compute node through libvirt"""
    def __init__(self, read_only):
        super(HostState, self).__init__()
        self.read_only = read_only
        self._stats = {}
        self.connection = None

    def get_host_stats(self, refresh=False):
        """Return the current state of the host.

        If 'refresh' is True, run update the stats first."""
        if refresh:
        return self._stats

    def update_status(self):
        """Retrieve status info from libvirt."""
        LOG.debug(_("Updating host stats"))
        if self.connection is None:
            self.connection = get_connection(self.read_only)
        data = {}
        data["vcpus"] = self.connection.get_vcpu_total()
        data["vcpus_used"] = self.connection.get_vcpu_used()
        data["cpu_info"] = utils.loads(self.connection.get_cpu_info())
        data["disk_total"] = self.connection.get_local_gb_total()
        data["disk_used"] = self.connection.get_local_gb_used()
        data["disk_available"] = data["disk_total"] - data["disk_used"]
        data["host_memory_total"] = self.connection.get_memory_mb_total()
        data["host_memory_free"] = (data["host_memory_total"] -
        data["hypervisor_type"] = self.connection.get_hypervisor_type()
        data["hypervisor_version"] = self.connection.get_hypervisor_version()

        self._stats = data

        return data



FilterScheduler._schedule,当Scheduler开始schedule时,就更新host state数据

def _schedule(self, context, topic, request_spec, *args, **kwargs):
	"""Returns a list of hosts that meet the required specs,
	ordered by their fitness.
	elevated = context.elevated()
	if topic != "compute":
		msg = _("Scheduler only understands Compute nodes (for now)")
		raise NotImplementedError(msg)

	instance_properties = request_spec['instance_properties']
	instance_type = request_spec.get("instance_type", None)

	cost_functions = self.get_cost_functions()
	config_options = self._get_configuration_options()

	filter_properties = kwargs.get('filter_properties', {})
	filter_properties.update({'context': context,
							'request_spec': request_spec,
							'config_options': config_options,
							'instance_type': instance_type})


	# Find our local list of acceptable hosts by repeatedly
	# filtering and weighing our options. Each time we choose a
	# host, we virtually consume resources on it so subsequent
	# selections can adjust accordingly.

	# unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
	unfiltered_hosts_dict = self.host_manager.get_all_host_states(
			elevated, topic)

	# Note: remember, we are using an iterator here. So only
	# traverse this list once. This can bite you if the hosts
	# are being scanned in a filter or weighing function.
	hosts = unfiltered_hosts_dict.itervalues()

	num_instances = request_spec.get('num_instances', 1)
	selected_hosts = []

	for num in xrange(num_instances):
		# Filter local hosts based on requirements ...
		hosts = self.host_manager.filter_hosts(hosts,
		if not hosts:
			# Can't get any more locally.

		LOG.debug(_("Filtered %(hosts)s") % locals())

		# weighted_host = WeightedHost() ... the best
		# host for the job.
		# TODO(comstud): filter_properties will also be used for
		# weighing and I plan fold weighing into the host manager
		# in a future patch.  I'll address the naming of this
		# variable at that time.
		weighted_host = least_cost.weighted_sum(cost_functions,
				hosts, filter_properties)
		LOG.debug(_("Weighted %(weighted_host)s") % locals())

		# Now consume the resources so the filter/weights
		# will change for the next instance.

	return selected_hosts[:num_instances]
def notify(message):
    """Look for specific compute manager events and interprete them
    so as to keep the Capacity table up to date.

    NOTE: the True/False return codes are only for testing.

	db.api.compute_node_utilization_update(context.get_admin_context(), host,
        free_ram_mb_delta=free_ram_mb, free_disk_gb_delta=free_disk_gb,
        work_delta=work, vm_delta=vms)
	               help='Interval in seconds for querying the host status'),


class SchedulerDependentManager(Manager):
    """Periodically send capability updates to the Scheduler services.

    Services that need to update the Scheduler of their capabilities
    should derive from this class. Otherwise they can derive from
    manager.Manager directly. Updates are only sent after
    update_service_capabilities is called with non-None values.


    def __init__(self, host=None, db_driver=None, service_name='undefined'):
        self.last_capabilities = None
        self.service_name = service_name
        super(SchedulerDependentManager, self).__init__(host, db_driver)

    def update_service_capabilities(self, capabilities):
        """Remember these capabilities to send on next periodic update."""
        self.last_capabilities = capabilities

    def _publish_service_capabilities(self, context):
        """Pass data back to the scheduler at a periodic interval."""
        if self.last_capabilities:
            LOG.debug(_('Notifying Schedulers of capabilities ...'))
            api.update_service_capabilities(context, self.service_name,
                      , self.last_capabilities)
def update_service_capabilities(context, service_name, host, capabilities):
    """Send an update to all the scheduler services informing them
       of the capabilities of this service."""
    kwargs = dict(method='update_service_capabilities',
                  args=dict(service_name=service_name, host=host,
    return rpc.fanout_cast(context, 'scheduler', kwargs)


def update_service_capabilities(self, context, service_name=None,
		host=None, capabilities=None, **kwargs):
	"""Process a capability update from a service node."""
	if capabilities is None:
		capabilities = {}
	self.driver.update_service_capabilities(service_name, host,
def update_service_capabilities(self, service_name, host, capabilities):
	"""Update the per-service capabilities based on this notification."""
	LOG.debug(_("Received %(service_name)s service update from "
				"%(host)s.") % locals())
	service_caps = self.service_states.get(host, {})
	# Copy the capabilities, so we don't modify the original dict
	capab_copy = dict(capabilities)
	capab_copy["timestamp"] = utils.utcnow()  # Reported time
	service_caps[service_name] = capab_copy
	self.service_states[host] = service_caps



Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数