OpenStack Compute Node资源使用情况。
compute_nodes表解析
表compute_nodes
保存了compute node的资源使用情况,主要包括几个方面:
- hypervisor信息
- hypervisor_type (xen)
- hypervisor_version (4001000)
- 计算
- cpu_info (例如: {“arch”: “x86_64”, “features”: [], “topology”: {}} )
- 物理vcpus数量 (os.sysconf(‘SC_NPROCESSORS_ONLN’))
- vcpus_used (利用libvirt connection找出所有的domain,累加vcpus)
- 内存
- host_memory_total (libvirt connection getInfo()获得)
- memory_mb_used (利用libvirt connection找出所有的domain,累加已使用mem。如果是xen,还算上domain0的内存)
- 磁盘
- FLAGS.instances_path的总空间 (os.statvfs(path))
- FLAGS.instances_path的已使用空间
- FLAGS.instances_path的剩余空间
CREATE TABLE `compute_nodes` (
`created_at` datetime default NULL,
`updated_at` datetime default NULL,
`deleted_at` datetime default NULL,
`deleted` tinyint(1) default NULL,
`id` int(11) NOT NULL auto_increment,
`service_id` int(11) NOT NULL,
`vcpus` int(11) NOT NULL,
`memory_mb` int(11) NOT NULL,
`local_gb` int(11) NOT NULL,
`vcpus_used` int(11) NOT NULL,
`memory_mb_used` int(11) NOT NULL,
`local_gb_used` int(11) NOT NULL,
`hypervisor_type` mediumtext NOT NULL,
`hypervisor_version` int(11) NOT NULL,
`cpu_info` mediumtext NOT NULL,
`disk_available_least` int(11) default NULL,
`free_ram_mb` int(11) default NULL,
`free_disk_gb` int(11) default NULL,
`current_workload` int(11) default NULL,
`running_vms` int(11) default NULL,
`hypervisor_hostname` varchar(255) default NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8
定时任务
ComputeManager定期将node资源使用发给scheduler,ComputeManager定期(默认2小时)通过LibvirtConnection.get_host_stats获取compute节点真实的资源情况,并通过rpc发送给scheduler,让其调用update_service_capabilities方法,scheduler的update_service_capabilities方法被调用后,立即存入 service_states内存中
ComputeManager._report_driver_status,注意其中的refresh=True,含义是从libvirt获取真实的资源使用情况
@manager.periodic_task
def _report_driver_status(self, context):
curr_time = time.time()
if curr_time - self._last_host_check > FLAGS.host_state_interval:
self._last_host_check = curr_time
LOG.info(_("Updating host status"))
# This will grab info about the host and queue it
# to be sent to the Schedulers.
self.update_service_capabilities(
self.driver.get_host_stats(refresh=True))
cfg.IntOpt('host_state_interval',
default=120,
help='Interval in seconds for querying the host status'),
libvirt/connection.py
class HostState(object):
"""Manages information about the compute node through libvirt"""
def __init__(self, read_only):
super(HostState, self).__init__()
self.read_only = read_only
self._stats = {}
self.connection = None
self.update_status()
def get_host_stats(self, refresh=False):
"""Return the current state of the host.
If 'refresh' is True, run update the stats first."""
if refresh:
self.update_status()
return self._stats
def update_status(self):
"""Retrieve status info from libvirt."""
LOG.debug(_("Updating host stats"))
if self.connection is None:
self.connection = get_connection(self.read_only)
data = {}
data["vcpus"] = self.connection.get_vcpu_total()
data["vcpus_used"] = self.connection.get_vcpu_used()
data["cpu_info"] = utils.loads(self.connection.get_cpu_info())
data["disk_total"] = self.connection.get_local_gb_total()
data["disk_used"] = self.connection.get_local_gb_used()
data["disk_available"] = data["disk_total"] - data["disk_used"]
data["host_memory_total"] = self.connection.get_memory_mb_total()
data["host_memory_free"] = (data["host_memory_total"] -
self.connection.get_memory_mb_used())
data["hypervisor_type"] = self.connection.get_hypervisor_type()
data["hypervisor_version"] = self.connection.get_hypervisor_version()
self._stats = data
return data
数据更新
ComputeManager在run_instance、terminate_instance、ha_migrate_instance时,会调用LibvirtConnection.update_available_resource来创建或修改compute_nodes记录
FilterScheduler._schedule,当Scheduler开始schedule时,就更新host state数据
def _schedule(self, context, topic, request_spec, *args, **kwargs):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
"""
elevated = context.elevated()
if topic != "compute":
msg = _("Scheduler only understands Compute nodes (for now)")
raise NotImplementedError(msg)
instance_properties = request_spec['instance_properties']
instance_type = request_spec.get("instance_type", None)
cost_functions = self.get_cost_functions()
config_options = self._get_configuration_options()
filter_properties = kwargs.get('filter_properties', {})
filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,
'instance_type': instance_type})
self.populate_filter_properties(request_spec,
filter_properties)
# Find our local list of acceptable hosts by repeatedly
# filtering and weighing our options. Each time we choose a
# host, we virtually consume resources on it so subsequent
# selections can adjust accordingly.
# unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
unfiltered_hosts_dict = self.host_manager.get_all_host_states(
elevated, topic)
# Note: remember, we are using an iterator here. So only
# traverse this list once. This can bite you if the hosts
# are being scanned in a filter or weighing function.
hosts = unfiltered_hosts_dict.itervalues()
num_instances = request_spec.get('num_instances', 1)
selected_hosts = []
for num in xrange(num_instances):
# Filter local hosts based on requirements ...
hosts = self.host_manager.filter_hosts(hosts,
filter_properties)
if not hosts:
# Can't get any more locally.
break
LOG.debug(_("Filtered %(hosts)s") % locals())
# weighted_host = WeightedHost() ... the best
# host for the job.
# TODO(comstud): filter_properties will also be used for
# weighing and I plan fold weighing into the host manager
# in a future patch. I'll address the naming of this
# variable at that time.
weighted_host = least_cost.weighted_sum(cost_functions,
hosts, filter_properties)
LOG.debug(_("Weighted %(weighted_host)s") % locals())
selected_hosts.append(weighted_host)
# Now consume the resources so the filter/weights
# will change for the next instance.
weighted_host.host_state.consume_from_instance(
instance_properties)
selected_hosts.sort(key=operator.attrgetter('weight'))
return selected_hosts[:num_instances]
def notify(message):
"""Look for specific compute manager events and interprete them
so as to keep the Capacity table up to date.
NOTE: the True/False return codes are only for testing.
"""
db.api.compute_node_utilization_update(context.get_admin_context(), host,
free_ram_mb_delta=free_ram_mb, free_disk_gb_delta=free_disk_gb,
work_delta=work, vm_delta=vms)
cfg.IntOpt('host_state_interval',
default=120,
help='Interval in seconds for querying the host status'),
定时任务将last_capabilities发给Scheduler,让Scheduler知道最新的资源情况
class SchedulerDependentManager(Manager):
"""Periodically send capability updates to the Scheduler services.
Services that need to update the Scheduler of their capabilities
should derive from this class. Otherwise they can derive from
manager.Manager directly. Updates are only sent after
update_service_capabilities is called with non-None values.
"""
def __init__(self, host=None, db_driver=None, service_name='undefined'):
self.last_capabilities = None
self.service_name = service_name
super(SchedulerDependentManager, self).__init__(host, db_driver)
def update_service_capabilities(self, capabilities):
"""Remember these capabilities to send on next periodic update."""
self.last_capabilities = capabilities
@periodic_task
def _publish_service_capabilities(self, context):
"""Pass data back to the scheduler at a periodic interval."""
if self.last_capabilities:
LOG.debug(_('Notifying Schedulers of capabilities ...'))
api.update_service_capabilities(context, self.service_name,
self.host, self.last_capabilities)
def update_service_capabilities(context, service_name, host, capabilities):
"""Send an update to all the scheduler services informing them
of the capabilities of this service."""
kwargs = dict(method='update_service_capabilities',
args=dict(service_name=service_name, host=host,
capabilities=capabilities))
return rpc.fanout_cast(context, 'scheduler', kwargs)
SchedulerManager:
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
"""Process a capability update from a service node."""
if capabilities is None:
capabilities = {}
self.driver.update_service_capabilities(service_name, host,
capabilities)
def update_service_capabilities(self, service_name, host, capabilities):
"""Update the per-service capabilities based on this notification."""
LOG.debug(_("Received %(service_name)s service update from "
"%(host)s.") % locals())
service_caps = self.service_states.get(host, {})
# Copy the capabilities, so we don't modify the original dict
capab_copy = dict(capabilities)
capab_copy["timestamp"] = utils.utcnow() # Reported time
service_caps[service_name] = capab_copy
self.service_states[host] = service_caps
scheduler接收后存入service_states内存中。