neutron openvswitch agent的启动命令
#/var/lib/kolla/venv/bin/python /var/lib/kolla/venv/bin/neutron-openvswitch-agent --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugins/ml2/ml2_conf.ini
import neutron.plugins.ml2.drivers.openvswitch.agent.main as agent_main
def main():
agent_main.main()
这里初始了配置,import 了native的module
cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
'common.config')
_main_modules = {
'ovs-ofctl': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'ovs_ofctl.main',
'native': 'neutron.plugins.ml2.drivers.openvswitch.agent.openflow.'
'native.main',
}
def main():
common_config.init(sys.argv[1:])
driver_name = cfg.CONF.OVS.of_interface
mod_name = _main_modules[driver_name]
mod = importutils.import_module(mod_name)
mod.init_config()
common_config.setup_logging()
profiler.setup("neutron-ovs-agent", cfg.CONF.host)
mod.main()
调用了OVSNeutronAgentOSKenApp的start方法
def main():
app_manager.AppManager.run_apps([
'neutron.plugins.ml2.drivers.openvswitch.agent.'
'openflow.native.ovs_oskenapp',
])
def agent_main_wrapper(bridge_classes):
try:
ovs_agent.main(bridge_classes)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception("Agent main thread died of an exception")
finally:
# The following call terminates os-ken's AppManager.run_apps(),
# which is needed for clean shutdown of an agent process.
# The close() call must be called in another thread, otherwise
# it suicides and ends prematurely.
hub.spawn(app_manager.AppManager.get_instance().close)
class OVSNeutronAgentOSKenApp(app_manager.OSKenApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def start(self):
# Start os-ken event loop thread
super(OVSNeutronAgentOSKenApp, self).start()
def _make_br_cls(br_cls):
return functools.partial(br_cls, os_ken_app=self)
# Start agent main loop thread
bridge_classes = {
'br_int': _make_br_cls(br_int.OVSIntegrationBridge),
'br_phys': _make_br_cls(br_phys.OVSPhysicalBridge),
'br_tun': _make_br_cls(br_tun.OVSTunnelBridge),
}
return hub.spawn(agent_main_wrapper, bridge_classes, raise_error=True)
校验 tunnel 类型和local ip,
def main(bridge_classes):
prepare_xen_compute()
ovs_capabilities.register()
ext_manager.register_opts(cfg.CONF)
agent_config.setup_privsep()
service_conf.register_service_opts(service_conf.RPC_EXTRA_OPTS, cfg.CONF)
ext_mgr = ext_manager.L2AgentExtensionsManager(cfg.CONF)
# now that all extensions registered their options, we can log them
n_utils.log_opt_values(LOG)
validate_tunnel_config(cfg.CONF.AGENT.tunnel_types, cfg.CONF.OVS.local_ip)
try:
agent = OVSNeutronAgent(bridge_classes, ext_mgr, cfg.CONF)
capabilities.notify_init_event(n_const.AGENT_TYPE_OVS, agent)
except (RuntimeError, ValueError) as e:
LOG.error("%s Agent terminated!", e)
sys.exit(1)
agent.daemon_loop()
初始化neutron agent, 创建br-int 交换机,启用tunnel则创建br-tun交换机以及流表, agent侧开启rpc,如果启用dvr则下发dvr流表,初始化firewall (目前是 基于iptables 的 neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver)
class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
dvr_rpc.DVRAgentRpcCallbackMixin):
def __init__(self, bridge_classes, ext_manager, conf=None):
'''Constructor.
:param bridge_classes: a dict for bridge classes.
:param conf: an instance of ConfigOpts
'''
super(OVSNeutronAgent, self).__init__()
self.conf = conf or cfg.CONF
self.ovs = ovs_lib.BaseOVS()
self.ext_manager = ext_manager
agent_conf = self.conf.AGENT
ovs_conf = self.conf.OVS
self.fullsync = False
# init bridge classes with configured datapath type.
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
functools.partial(bridge_classes[b],
datapath_type=ovs_conf.datapath_type)
for b in ('br_int', 'br_phys', 'br_tun'))
self.use_veth_interconnection = ovs_conf.use_veth_interconnection
self.veth_mtu = agent_conf.veth_mtu
self.available_local_vlans = set(six.moves.range(
n_const.MIN_VLAN_TAG, n_const.MAX_VLAN_TAG + 1))
self.tunnel_types = agent_conf.tunnel_types or []
self.l2_pop = agent_conf.l2_population
# TODO(ethuleau): Change ARP responder so it's not dependent on the
# ML2 l2 population mechanism driver.
self.enable_distributed_routing = agent_conf.enable_distributed_routing
self.arp_responder_enabled = agent_conf.arp_responder and self.l2_pop
host = self.conf.host
self.agent_id = 'ovs-agent-%s' % host
self.enable_tunneling = bool(self.tunnel_types)
# Validate agent configurations
self._check_agent_configurations()
# Keep track of int_br's device count for use by _report_state()
self.int_br_device_count = 0
self.int_br = self.br_int_cls(ovs_conf.integration_bridge)
self.setup_integration_br()
# Stores port update notifications for processing in main rpc loop
self.updated_ports = set()
# Stores port delete notifications
self.deleted_ports = set()
# Stores the port IDs whose binding has been deactivated
self.deactivated_bindings = set()
# Stores the port IDs whose binding has been activated
self.activated_bindings = set()
self.network_ports = collections.defaultdict(set)
# keeps association between ports and ofports to detect ofport change
self.vifname_to_ofport_map = {}
# Stores newly created bridges
self.added_bridges = list()
self.bridge_mappings = self._parse_bridge_mappings(
ovs_conf.bridge_mappings)
self.rp_bandwidths = place_utils.parse_rp_bandwidths(
ovs_conf.resource_provider_bandwidths)
br_set = set(six.itervalues(self.bridge_mappings))
n_utils.validate_rp_bandwidth(self.rp_bandwidths,
br_set)
self.rp_inventory_defaults = place_utils.parse_rp_inventory_defaults(
ovs_conf.resource_provider_inventory_defaults)
self.rp_hypervisors = utils.default_rp_hypervisors(
ovs_conf.resource_provider_hypervisors,
{k: [v] for k, v in self.bridge_mappings.items()}
)
self.setup_physical_bridges(self.bridge_mappings)
self.vlan_manager = vlanmanager.LocalVlanManager()
self._reset_tunnel_ofports()
self.polling_interval = agent_conf.polling_interval
self.minimize_polling = agent_conf.minimize_polling
self.ovsdb_monitor_respawn_interval = (
agent_conf.ovsdb_monitor_respawn_interval or
constants.DEFAULT_OVSDBMON_RESPAWN)
self.local_ip = ovs_conf.local_ip
self.tunnel_count = 0
self.vxlan_udp_port = agent_conf.vxlan_udp_port
self.dont_fragment = agent_conf.dont_fragment
self.tunnel_csum = agent_conf.tunnel_csum
self.tos = ('inherit'
if agent_conf.dscp_inherit
else (int(agent_conf.dscp) << 2
if agent_conf.dscp
else None))
self.tun_br = None
self.patch_int_ofport = constants.OFPORT_INVALID
self.patch_tun_ofport = constants.OFPORT_INVALID
if self.enable_tunneling:
# The patch_int_ofport and patch_tun_ofport are updated
# here inside the call to setup_tunnel_br()
self.setup_tunnel_br(ovs_conf.tunnel_bridge)
self.setup_tunnel_br_flows()
self.enable_vip = False
self.groups = None
self.setup_rpc()
self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
self.context,
self.dvr_plugin_rpc,
self.int_br,
self.tun_br,
self.bridge_mappings,
self.phys_brs,
self.int_ofports,
self.phys_ofports,
self.patch_int_ofport,
self.patch_tun_ofport,
host,
self.enable_tunneling,
self.enable_distributed_routing,
self.arp_responder_enabled)
if self.enable_distributed_routing:
self.dvr_agent.setup_dvr_flows(self.bridge_mappings)
# Collect additional bridges to monitor
self.ancillary_brs = self.setup_ancillary_bridges(
ovs_conf.integration_bridge, ovs_conf.tunnel_bridge)
agent_api = ovs_ext_api.OVSAgentExtensionAPI(self.int_br,
self.tun_br,
self.phys_brs)
self.ext_manager.initialize(
self.connection, constants.EXTENSION_DRIVER_TYPE, agent_api)
# In order to keep existed device's local vlan unchanged,
# restore local vlan mapping at start
self._restore_local_vlan_map()
# Security group agent support
self.sg_agent = agent_sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True,
integration_bridge=self.int_br)
self.sg_plugin_rpc.register_legacy_sg_notification_callbacks(
self.sg_agent)
self.sg_agent.init_ovs_dvr_firewall(self.dvr_agent)
# we default to False to provide backward compat with out of tree
# firewall drivers that expect the logic that existed on the Neutron
# server which only enabled hybrid plugging based on the use of the
# hybrid driver.
hybrid_plug = getattr(self.sg_agent.firewall,
'OVS_HYBRID_PLUG_REQUIRED', False)
self.prevent_arp_spoofing = (
not self.sg_agent.firewall.provides_arp_spoofing_protection)
self.failed_report_state = False
# TODO(mangelajo): optimize resource_versions to only report
# versions about resources which are common,
# or which are used by specific extensions.
self.agent_state = {
'binary': 'neutron-openvswitch-agent',
'host': host,
'topic': n_const.L2_AGENT_TOPIC,
'configurations': {'bridge_mappings': self.bridge_mappings,
c_const.RP_BANDWIDTHS: self.rp_bandwidths,
c_const.RP_INVENTORY_DEFAULTS:
self.rp_inventory_defaults,
'resource_provider_hypervisors':
self.rp_hypervisors,
'integration_bridge':
ovs_conf.integration_bridge,
'tunnel_types': self.tunnel_types,
'tunneling_ip': self.local_ip,
'l2_population': self.l2_pop,
'arp_responder_enabled':
self.arp_responder_enabled,
'enable_distributed_routing':
self.enable_distributed_routing,
'log_agent_heartbeats':
agent_conf.log_agent_heartbeats,
'extensions': self.ext_manager.names(),
'datapath_type': ovs_conf.datapath_type,
'ovs_capabilities': self.ovs.capabilities,
'vhostuser_socket_dir':
ovs_conf.vhostuser_socket_dir,
portbindings.OVS_HYBRID_PLUG: hybrid_plug},
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
'agent_type': agent_conf.agent_type,
'start_flag': True}
report_interval = agent_conf.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
# Initialize iteration counter
self.iter_num = 0
self.run_daemon_loop = True
self.catch_sigterm = False
self.catch_sighup = False
# The initialization is complete; we can start receiving messages
self.connection.consume_in_threads()
self.quitting_rpc_timeout = agent_conf.quitting_rpc_timeout
self.install_ingress_direct_goto_flows()
self._register_rpc_consumers()
self.idc_lb_vxlan_remote_ips = ovs_conf.idc_lb_vxlan_remote_ips
self.to_idc_lb_hash_fields = ovs_conf.to_idc_lb_hash_fields
if self.arp_responder_enabled and self.enable_tunneling and self.idc_lb_vxlan_remote_ips:
self.enable_vip = True
#key is net_uuid, value is list of network's vip port
self.network_vip = dict()
#key is vip port_id, key is dict of {mac:xxx, net_uuid:xxx, vlan: xxx, fixed_ips:xxx}
self.vip_detail = dict()
#key is vxlan remote ip, value is ovs ofport
self.vip_vxlan_ofport = dict()
bucket_num = 0
for ip in self.idc_lb_vxlan_remote_ips.split(','):
port_name = self.get_tunnel_name(
n_const.TYPE_VXLAN, self.local_ip, ip)
if port_name is not None:
ofport = self._setup_tunnel_port(self.tun_br,
port_name,
ip,
n_const.TYPE_VXLAN)
self.vip_vxlan_ofport[ip] = ofport
self.tun_br.set_port_bfd(port_name, True, self.local_ip, ip)
bucket_num = bucket_num + 1
self.groups = self.tun_br.dump_groups(bucket_num)
LOG.debug("group table is %s", self.groups)
def daemon_loop(self):
# Start everything.
LOG.info("Agent initialized successfully, now running... ")
signal.signal(signal.SIGTERM, self._handle_sigterm)
if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, self._handle_sighup)
br_names = [br.br_name for br in self.phys_brs.values()]
self.ovs.ovsdb.idl_monitor.start_bridge_monitor(br_names)
with polling.get_polling_manager(
self.minimize_polling,
self.ovsdb_monitor_respawn_interval) as pm:
self.rpc_loop(polling_manager=pm)
重点关注 tunnel_sync和 process_network_ports, 其中 process_network_ports主要获取本机相关devices, 代码 while self._check_and_handle_signal() 会一直死循环检查 bridge, tunnel, port 变更等
def rpc_loop(self, polling_manager):
idl_monitor = self.ovs.ovsdb.idl_monitor
sync = False
ports = set()
updated_ports_copy = set()
activated_bindings_copy = set()
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
bridges_recreated = False
consecutive_resyncs = 0
need_clean_stale_flow = True
ports_not_ready_yet = set()
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {'added': set(), 'removed': set()}
failed_devices_retries_map = {}
while self._check_and_handle_signal():
if self.fullsync:
LOG.info("rpc_loop doing a full sync.")
sync = True
self.fullsync = False
port_info = {}
ancillary_port_info = {}
start = time.time()
LOG.info("Agent rpc_loop - iteration:%d started",
self.iter_num)
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self._handle_ovs_restart(polling_manager)
tunnel_sync = self.enable_tunneling or tunnel_sync
# setup port forward flows
self.setup_pf_fip_flows()
# setup snat ip flows
self.setup_snat_ip_flows()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
# loop in which ovs status will be checked periodically.
port_stats = self.get_port_stats({}, {})
self.loop_count_and_wait(start, port_stats)
continue
# Check if any physical bridge wasn't recreated recently
added_bridges = idl_monitor.bridges_added + self.added_bridges
bridges_recreated |= self._reconfigure_physical_bridges(
added_bridges)
if bridges_recreated:
# In case when any bridge was "re-created", we need to ensure
# that there is no any stale flows in bridges left
need_clean_stale_flow = True
sync |= bridges_recreated
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
try:
tunnel_sync = self.tunnel_sync()
except Exception:
LOG.exception("Error while configuring tunnel endpoints")
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
devices_need_retry = (any(failed_devices.values()) or
any(failed_ancillary_devices.values()) or
ports_not_ready_yet)
if (self._agent_has_updates(polling_manager) or sync or
devices_need_retry or ovs_restarted):
try:
LOG.info("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Save updated ports dict to perform rollback in
# case resync would be needed, and then clear
# self.updated_ports. As the greenthread should not yield
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
activated_bindings_copy = self.activated_bindings
self.activated_bindings = set()
(port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet) = (self.process_port_info(
start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs, ports_not_ready_yet,
failed_devices, failed_ancillary_devices))
sync = False
self.process_deleted_ports(port_info)
self.process_deactivated_bindings(port_info)
self.process_activated_bindings(port_info,
activated_bindings_copy)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
port_info.setdefault('updated', set()).update(
ofport_changed_ports)
LOG.info("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
self.sg_agent.firewall_refresh_needed() or
ovs_restarted or bridges_recreated):
LOG.debug("Starting to process devices in:%s",
port_info)
provisioning_needed = (
ovs_restarted or bridges_recreated)
failed_devices = self.process_network_ports(
port_info, provisioning_needed)
if need_clean_stale_flow:
self.cleanup_stale_flows()
need_clean_stale_flow = False
LOG.info("Agent rpc_loop - iteration:%(iter_num)d - "
"ports processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
ports = port_info['current']
if self.ancillary_brs:
failed_ancillary_devices = (
self.process_ancillary_network_ports(
ancillary_port_info))
LOG.info("Agent rpc_loop - iteration: "
"%(iter_num)d - ancillary ports "
"processed. Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
ancillary_ports = ancillary_port_info['current']
polling_manager.polling_completed()
failed_devices_retries_map = (
self.update_retries_map_and_remove_devs_not_to_retry(
failed_devices, failed_ancillary_devices,
failed_devices_retries_map))
# Keep this flag in the last line of "try" block,
# so we can sure that no other Exception occurred.
ovs_restarted = False
bridges_recreated = False
self._dispose_local_vlan_hints()
except Exception:
LOG.exception("Error while processing VIF ports")
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
self.activated_bindings |= activated_bindings_copy
sync = True
port_stats = self.get_port_stats(port_info, ancillary_port_info)
self.loop_count_and_wait(start, port_stats)