序
本文主要研究一下flink JobManager的High Availability
配置
flink-conf.yaml
high-availability: zookeeperhigh-availability.zookeeper.quorum: zookeeper:2181high-availability.zookeeper.path.root: /flinkhigh-availability.cluster-id: /cluster_one # important: customize per clusterhigh-availability.storageDir: file:///share复制代码
- high-availability的可选值为NONE或者zookeeper;high-availability.zookeeper.quorum用于指定zookeeper的peers;high-availability.zookeeper.path.root用于指定在zookeeper的root node路径;high-availability.cluster-id用于指定当前cluster的node名称,该cluster node位于root node下面;high-availability.storageDir用于指定JobManager metadata的存储路径
masters文件
localhost:8081localhost:8082复制代码
- masters文件用于指定jobmanager的地址
HighAvailabilityMode
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
public enum HighAvailabilityMode { NONE(false), ZOOKEEPER(true), FACTORY_CLASS(true); private final boolean haActive; HighAvailabilityMode(boolean haActive) { this.haActive = haActive; } /** * Return the configured {@link HighAvailabilityMode}. * * @param config The config to parse * @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not * configured. */ public static HighAvailabilityMode fromConfig(Configuration config) { String haMode = config.getValue(HighAvailabilityOptions.HA_MODE); if (haMode == null) { return HighAvailabilityMode.NONE; } else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) { // Map old default to new default return HighAvailabilityMode.NONE; } else { try { return HighAvailabilityMode.valueOf(haMode.toUpperCase()); } catch (IllegalArgumentException e) { return FACTORY_CLASS; } } } /** * Returns true if the defined recovery mode supports high availability. * * @param configuration Configuration which contains the recovery mode * @return true if high availability is supported by the recovery mode, otherwise false */ public static boolean isHighAvailabilityModeActivated(Configuration configuration) { HighAvailabilityMode mode = fromConfig(configuration); return mode.haActive; }}复制代码
- HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability
HighAvailabilityOptions
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/HighAvailabilityOptions.java
@PublicEvolving@ConfigGroups(groups = { @ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")})public class HighAvailabilityOptions { // ------------------------------------------------------------------------ // Required High Availability Options // ------------------------------------------------------------------------ /** * Defines high-availability mode used for the cluster execution. * A value of "NONE" signals no highly available setup. * To enable high-availability, set this mode to "ZOOKEEPER". * Can also be set to FQN of HighAvailability factory class. */ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY) public static final ConfigOptionHA_MODE = key("high-availability") .defaultValue("NONE") .withDeprecatedKeys("recovery.mode") .withDescription("Defines high-availability mode used for the cluster execution." + " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class."); /** * The ID of the Flink cluster, used to separate multiple Flink clusters * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos. */ public static final ConfigOption HA_CLUSTER_ID = key("high-availability.cluster-id") .defaultValue("/default") .withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace") .withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." + " Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos."); /** * File system path (URI) where Flink persists metadata in high-availability setups. */ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY) public static final ConfigOption HA_STORAGE_PATH = key("high-availability.storageDir") .noDefaultValue() .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir") .withDescription("File system path (URI) where Flink persists metadata in high-availability setups."); // ------------------------------------------------------------------------ // Recovery Options // ------------------------------------------------------------------------ /** * Optional port (range) used by the job manager in high-availability mode. */ public static final ConfigOption HA_JOB_MANAGER_PORT_RANGE = key("high-availability.jobmanager.port") .defaultValue("0") .withDeprecatedKeys("recovery.jobmanager.port") .withDescription("Optional port (range) used by the job manager in high-availability mode."); /** * The time before a JobManager after a fail over recovers the current jobs. */ public static final ConfigOption HA_JOB_DELAY = key("high-availability.job.delay") .noDefaultValue() .withDeprecatedKeys("recovery.job.delay") .withDescription("The time before a JobManager after a fail over recovers the current jobs."); // ------------------------------------------------------------------------ // ZooKeeper Options // ------------------------------------------------------------------------ /** * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper. */ public static final ConfigOption HA_ZOOKEEPER_QUORUM = key("high-availability.zookeeper.quorum") .noDefaultValue() .withDeprecatedKeys("recovery.zookeeper.quorum") .withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper."); /** * The root path under which Flink stores its entries in ZooKeeper. */ public static final ConfigOption HA_ZOOKEEPER_ROOT = key("high-availability.zookeeper.path.root") .defaultValue("/flink") .withDeprecatedKeys("recovery.zookeeper.path.root") .withDescription("The root path under which Flink stores its entries in ZooKeeper."); public static final ConfigOption HA_ZOOKEEPER_LATCH_PATH = key("high-availability.zookeeper.path.latch") .defaultValue("/leaderlatch") .withDeprecatedKeys("recovery.zookeeper.path.latch") .withDescription("Defines the znode of the leader latch which is used to elect the leader."); /** ZooKeeper root path (ZNode) for job graphs. */ public static final ConfigOption HA_ZOOKEEPER_JOBGRAPHS_PATH = key("high-availability.zookeeper.path.jobgraphs") .defaultValue("/jobgraphs") .withDeprecatedKeys("recovery.zookeeper.path.jobgraphs") .withDescription("ZooKeeper root path (ZNode) for job graphs"); public static final ConfigOption HA_ZOOKEEPER_LEADER_PATH = key("high-availability.zookeeper.path.leader") .defaultValue("/leader") .withDeprecatedKeys("recovery.zookeeper.path.leader") .withDescription("Defines the znode of the leader which contains the URL to the leader and the current" + " leader session ID."); /** ZooKeeper root path (ZNode) for completed checkpoints. */ public static final ConfigOption HA_ZOOKEEPER_CHECKPOINTS_PATH = key("high-availability.zookeeper.path.checkpoints") .defaultValue("/checkpoints") .withDeprecatedKeys("recovery.zookeeper.path.checkpoints") .withDescription("ZooKeeper root path (ZNode) for completed checkpoints."); /** ZooKeeper root path (ZNode) for checkpoint counters. */ public static final ConfigOption HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = key("high-availability.zookeeper.path.checkpoint-counter") .defaultValue("/checkpoint-counter") .withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter") .withDescription("ZooKeeper root path (ZNode) for checkpoint counters."); /** ZooKeeper root path (ZNode) for Mesos workers. */ @PublicEvolving public static final ConfigOption HA_ZOOKEEPER_MESOS_WORKERS_PATH = key("high-availability.zookeeper.path.mesos-workers") .defaultValue("/mesos-workers") .withDeprecatedKeys("recovery.zookeeper.path.mesos-workers") .withDescription(Description.builder() .text("The ZooKeeper root path for persisting the Mesos worker information.") .build()); // ------------------------------------------------------------------------ // ZooKeeper Client Settings // ------------------------------------------------------------------------ public static final ConfigOption ZOOKEEPER_SESSION_TIMEOUT = key("high-availability.zookeeper.client.session-timeout") .defaultValue(60000) .withDeprecatedKeys("recovery.zookeeper.client.session-timeout") .withDescription("Defines the session timeout for the ZooKeeper session in ms."); public static final ConfigOption ZOOKEEPER_CONNECTION_TIMEOUT = key("high-availability.zookeeper.client.connection-timeout") .defaultValue(15000) .withDeprecatedKeys("recovery.zookeeper.client.connection-timeout") .withDescription("Defines the connection timeout for ZooKeeper in ms."); public static final ConfigOption ZOOKEEPER_RETRY_WAIT = key("high-availability.zookeeper.client.retry-wait") .defaultValue(5000) .withDeprecatedKeys("recovery.zookeeper.client.retry-wait") .withDescription("Defines the pause between consecutive retries in ms."); public static final ConfigOption ZOOKEEPER_MAX_RETRY_ATTEMPTS = key("high-availability.zookeeper.client.max-retry-attempts") .defaultValue(3) .withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts") .withDescription("Defines the number of connection retries before the client gives up."); public static final ConfigOption ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH = key("high-availability.zookeeper.path.running-registry") .defaultValue("/running_job_registry/"); public static final ConfigOption ZOOKEEPER_CLIENT_ACL = key("high-availability.zookeeper.client.acl") .defaultValue("open") .withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" + " set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" + " SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos)."); // ------------------------------------------------------------------------ /** Not intended to be instantiated. */ private HighAvailabilityOptions() {}}复制代码
- HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
HighAvailabilityServicesUtils
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
public class HighAvailabilityServicesUtils { public static HighAvailabilityServices createAvailableOrEmbeddedServices( Configuration config, Executor executor) throws Exception { HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config); switch (highAvailabilityMode) { case NONE: return new EmbeddedHaServices(executor); case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(config), executor, config, blobStoreService); case FACTORY_CLASS: return createCustomHAServices(config, executor); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); } } public static HighAvailabilityServices createHighAvailabilityServices( Configuration configuration, Executor executor, AddressResolution addressResolution) throws Exception { HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration); switch (highAvailabilityMode) { case NONE: final Tuple2hostnamePort = getJobManagerAddress(configuration); final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl( hostnamePort.f0, hostnamePort.f1, JobMaster.JOB_MANAGER_NAME, addressResolution, configuration); final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl( hostnamePort.f0, hostnamePort.f1, ResourceManager.RESOURCE_MANAGER_NAME, addressResolution, configuration); final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl( hostnamePort.f0, hostnamePort.f1, Dispatcher.DISPATCHER_NAME, addressResolution, configuration); final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS), "%s must be set", RestOptions.ADDRESS.key()); final int port = configuration.getInteger(RestOptions.PORT); final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration); final String protocol = enableSSL ? "https://" : "http://"; return new StandaloneHaServices( resourceManagerRpcUrl, dispatcherRpcUrl, jobManagerRpcUrl, String.format("%s%s:%s", protocol, address, port)); case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService); case FACTORY_CLASS: return createCustomHAServices(configuration, executor); default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } } /** * Returns the JobManager's hostname and port extracted from the given * {@link Configuration}. * * @param configuration Configuration to extract the JobManager's address from * @return The JobManager's hostname and port * @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration */ public static Tuple2 getJobManagerAddress(Configuration configuration) throws ConfigurationException { final String hostname = configuration.getString(JobManagerOptions.ADDRESS); final int port = configuration.getInteger(JobManagerOptions.PORT); if (hostname == null) { throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS + "' is missing (hostname/address of JobManager to connect to)."); } if (port <= 0 || port >= 65536) { throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT + "' (port of the JobManager actor system) : " + port + ". it must be greater than 0 and less than 65536."); } return Tuple2.of(hostname, port); } private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException { final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE); final HighAvailabilityServicesFactory highAvailabilityServicesFactory; try { highAvailabilityServicesFactory = InstantiationUtil.instantiate( haServicesClassName, HighAvailabilityServicesFactory.class, classLoader); } catch (Exception e) { throw new FlinkException( String.format( "Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.", haServicesClassName), e); } try { return highAvailabilityServicesFactory.createHAServices(config, executor); } catch (Exception e) { throw new FlinkException( String.format( "Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.", haServicesClassName), e); } } /** * Enum specifying whether address resolution should be tried or not when creating the * {@link HighAvailabilityServices}. */ public enum AddressResolution { TRY_ADDRESS_RESOLUTION, NO_ADDRESS_RESOLUTION }}复制代码
- HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices
- 其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
- HighAvailabilityServicesUtils还提供了getJobManagerAddress静态方法,用于获取JobManager的hostname及port
HighAvailabilityServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
/** * The HighAvailabilityServices give access to all services needed for a highly-available * setup. In particular, the services provide access to highly available storage and * registries, as well as distributed counters and leader election. * *
- *
- ResourceManager leader election and leader retrieval *
- JobManager leader election and leader retrieval *
- Persistence for checkpoint metadata *
- Registering the latest completed checkpoint(s) *
- Persistence for the BLOB store *
- Registry that marks a job's status *
- Naming of RPC endpoints *
This method does not delete or clean up any data stored in external stores * (file systems, ZooKeeper, etc). Another instance of the high availability * services will be able to recover the job. * *
If an exception occurs during closing services, this method will attempt to * continue closing other services and report exceptions only after all services * have been attempted to be closed. * * @throws Exception Thrown, if an exception occurred while closing these services. */ @Override void close() throws Exception; /** * Closes the high availability services (releasing all resources) and deletes * all data stored by these services in external stores. * *
After this method was called, the any job or session that was managed by * these high availability services will be unrecoverable. * *
If an exception occurs during cleanup, this method will attempt to * continue the cleanup and report exceptions only after all cleanup steps have * been attempted. * * @throws Exception Thrown, if an exception occurred while closing these services * or cleaning up data stored by them. */ void closeAndCleanupAllData() throws Exception;}复制代码
- HighAvailabilityServices定义了highly-available所需的各种services的get方法
ZooKeeperHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
/** * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper. * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure: * ** /flink * +/cluster_id_1/resource_manager_lock * | | * | +/job-id-1/job_manager_lock * | | /checkpoints/latest * | | /latest-1 * | | /latest-2 * | | * | +/job-id-2/job_manager_lock * | * +/cluster_id_2/resource_manager_lock * | * +/job-id-1/job_manager_lock * |/checkpoints/latest * | /latest-1 * |/persisted_job_graph ** *The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to * accommodate specific permission. * *
The "cluster_id" part identifies the data stored for a specific Flink "cluster". * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job * on a framework like YARN or Mesos (in a "per-job-cluster" mode). * *
In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured * automatically by the client or dispatcher that submits the Job to YARN or Mesos. * *
In the case of a standalone cluster, that cluster-id needs to be configured via * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same * cluster and participate in the execution of the same set of jobs. */public class ZooKeeperHaServices implements HighAvailabilityServices { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class); private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock"; private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock"; private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock"; private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock"; // ------------------------------------------------------------------------ /** The ZooKeeper client to use */ private final CuratorFramework client; /** The executor to run ZooKeeper callbacks on */ private final Executor executor; /** The runtime configuration */ private final Configuration configuration; /** The zookeeper based running jobs registry */ private final RunningJobsRegistry runningJobsRegistry; /** Store for arbitrary blobs */ private final BlobStoreService blobStoreService; public ZooKeeperHaServices( CuratorFramework client, Executor executor, Configuration configuration, BlobStoreService blobStoreService) { this.client = checkNotNull(client); this.executor = checkNotNull(executor); this.configuration = checkNotNull(configuration); this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration); this.blobStoreService = checkNotNull(blobStoreService); } // ------------------------------------------------------------------------ // Services // ------------------------------------------------------------------------ @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override public LeaderRetrievalService getDispatcherLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID)); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) { return getJobManagerLeaderRetriever(jobID); } @Override public LeaderRetrievalService getWebMonitorLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override public LeaderElectionService getDispatcherLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH); } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID)); } @Override public LeaderElectionService getWebMonitorLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH); } @Override public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor); } @Override public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration); } @Override public RunningJobsRegistry getRunningJobsRegistry() { return runningJobsRegistry; } @Override public BlobStore createBlobStore() throws IOException { return blobStoreService; } // ------------------------------------------------------------------------ // Shutdown // ------------------------------------------------------------------------ @Override public void close() throws Exception { Throwable exception = null; try { blobStoreService.close(); } catch (Throwable t) { exception = t; } internalClose(); if (exception != null) { ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices."); } } @Override public void closeAndCleanupAllData() throws Exception { LOG.info("Close and clean up all data for ZooKeeperHaServices."); Throwable exception = null; try { blobStoreService.closeAndCleanupAllData(); } catch (Throwable t) { exception = t; } internalClose(); if (exception != null) { ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices."); } } /** * Closes components which don't distinguish between close and closeAndCleanupAllData */ private void internalClose() { client.close(); } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ private static String getPathForJobManager(final JobID jobID) { return "/" + jobID + JOB_MANAGER_LEADER_PATH; }}复制代码
- ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs
JobClient.submitJob
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/client/JobClient.java
public class JobClient { private static final Logger LOG = LoggerFactory.getLogger(JobClient.class); //...... /** * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be * passed to {@code awaitJobResult} to get the result of the submission. * @return JobListeningContext which may be used to retrieve the JobExecutionResult via * {@code awaitJobResult(JobListeningContext context)}. */ public static JobListeningContext submitJob( ActorSystem actorSystem, Configuration config, HighAvailabilityServices highAvailabilityServices, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates, ClassLoader classLoader) { checkNotNull(actorSystem, "The actorSystem must not be null."); checkNotNull(highAvailabilityServices, "The high availability services must not be null."); checkNotNull(jobGraph, "The jobGraph must not be null."); checkNotNull(timeout, "The timeout must not be null."); // for this job, we create a proxy JobClientActor that deals with all communication with // the JobManager. It forwards the job submission, checks the success/failure responses, logs // update messages, watches for disconnect between client and JobManager, ... Props jobClientActorProps = JobSubmissionClientActor.createActorProps( highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), timeout, sysoutLogUpdates, config); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); Future
- 像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job
小结
- HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability;HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
- HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices;其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
- HighAvailabilityServices定义了highly-available所需的各种services的get方法;ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs;像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job