众所周知,YARN可以扩展到数千个节点。YARN的可扩展性由资源管理器确定,并且与节点数,活动应用程序,活动容器和(节点和应用程序的)心跳频率成比例。降低心跳可以增加可伸缩性,但不利于利用率(请参阅旧的Hadoop 1.x经验)。该文档介绍了一种基于联合的方法,通过联合多个YARN子群集将单个YARN群集扩展到成千上万个节点。所提出的方法是将一个大型(10-100k个节点)集群划分为称为子集群的较小单元,每个子单元都有自己的YARN RM和计算节点。联合系统会将这些子集群缝合在一起,使它们在应用程序中看起来像是一个大型YARN集群。在此联合环境中运行的应用程序将看到单个大型YARN群集,并将能够在联合群集的任何节点上调度任务。在后台,联合系统将与子集群的资源管理器进行协商,并为应用程序提供资源。目标是允许单个作业无缝“扩展”子集群。
由于我们限制了每个RM负责的节点数量,并且采用适当的策略,该设计在结构上是可扩展的,将尝试确保大多数应用程序驻留在单个子集群中,因此每个RM可以看到的应用程序数量也有界。这意味着我们可以通过简单地添加子集群(几乎不需要它们之间的协调)来几乎线性地扩展规模。这种架构可以非常严格地强制执行每个子集群中的调度不变式(仅继承自YARN),而跨子集群的连续重新平衡将(不太严格地)强制要求在全局级别也要尊重这些属性(例如,群集丢失了大量节点,我们可以将队列重新映射到其他子群集,以确保在受损的子群集上运行的用户不会受到不公平的影响)。
联合身份验证被设计为现有YARN代码库之上的“层”,而核心YARN机制的更改有限。
假设:
已知OSS YARN可以扩展到大约数千个节点。提出的体系结构利用了将多个此类较小的YARN群集(称为子群集)联合到包含数万个节点的较大的联合YARN群集中的概念。在此联合环境中运行的应用程序可以看到一个统一的大型YARN群集,并且能够在群集中的任何节点上计划任务。在后台,联合系统将与子集群RM协商,并为应用程序提供资源。图1中的逻辑体系结构显示了组成联合集群的主要组件,下面将对其进行描述。
子集群是具有多达数千个节点的YARN集群。子群集的确切大小将根据部署/维护的简便性,与网络或可用性区域的对齐方式以及一般最佳实践来确定。
子群集YARN RM将在保持工作高可用性的情况下运行,即,我们应该能够以最小的中断容忍YARN RM,NM故障。如果整个子集群遭到破坏,则外部机制将确保作业在单独的子集群中重新提交(最终可能会包含在联合身份验证设计中)。
子集群还是联合环境中的可伸缩性单元。我们可以通过添加一个或多个子集群来扩展联盟环境。
注意:根据设计,每个子集群都是功能齐全的YARN RM,并且可以将其对联盟的贡献设置为仅占其总容量的一小部分,即子集群可以对联盟具有“部分”承诺,同时保留以完全本地化的方式发挥其部分能力的能力。
YARN应用程序被提交到其中一个路由器,后者又应用了路由策略(从策略存储中获取),在状态存储中查询子集群URL,并将应用程序提交请求重定向到适当的子集群RM。我们将作业开始的子集群称为“家庭子集群”,将“第二子集群”称为工作跨接的所有其他子集群。路由器将ApplicationClientProtocol暴露给外界,透明地隐藏了多个RM的存在。为此,路由器还将应用程序及其本地子群集之间的映射保持在状态存储中。这样可以使路由器处于软状态,同时廉价地支持用户请求,因为任何路由器都可以将该应用程序恢复到归属子群集映射,并将请求定向到正确的RM,而无需广播它们。对于性能缓存和会话粘性,建议使用。联盟的状态(包括应用程序和节点)通过Web UI公开。
AMRMProxy是允许应用程序跨子集群扩展和运行的关键组件。AMRMProxy在所有NM机器上运行,并通过实现ApplicationMasterProtocol充当AM的YARN RM的代理。应用程序将不允许直接与子集群RM通信。它们被系统强制仅连接到AMRMProxy端点,这将提供对多个YARN RM的透明访问(通过动态路由/拆分/合并通信)。在任何时候,一项作业都可以跨越一个家庭子集群和多个辅助子集群,但是AMRMProxy中运行的策略试图限制每个作业的占用空间,以最大程度地减少调度基础结构的开销(有关可伸缩性的更多信息/加载)。图中显示了ARMMProxy的拦截器链体系结构。
AMRMProxy的作用
全局策略生成器会忽略整个联合身份,并确保始终正确配置和调整系统。一个关键的设计点是,群集可用性不依赖于始终在线的GPG。GPG连续运行,但在所有集群操作之外都处于带外状态,并为我们提供了一个独特的优势,该优势使我们可以强制执行全局不变性,影响负载平衡,触发将要维护的子集群的排空等。 GPG将更新用户容量分配到子群集的映射,并且很少更改在路由器,AMRMProxy(以及可能的RM)中运行的策略。
万一GPG不可用,集群操作将自GPG上次发布政策起继续进行,而长期不可用可能意味着一些理想的平衡属性,最佳集群利用率和全局不变性可能会逐渐消失,计算和数据访问不会受到损害。
注意:在当前实现中,GPG是手动调整过程,只需通过CLI(YARN-3657)即可公开。
联邦系统的这一部分是YARN-5597未来工作的一部分。
提交应用程序后,系统将确定最合适的子集群来运行该应用程序,我们将其称为应用程序的主子集群。从AM到RM的所有通信都将通过AM计算机上本地运行的AMRMProxy进行代理。AMRMProxy公开与YARN RM相同的ApplicationMasterService协议终结点。AM可以使用存储层公开的位置信息来请求容器。在理想情况下,该应用程序将放置在一个子集群中,该应用程序将需要该应用程序所需的所有资源和数据,但是如果它确实需要其他子集群中的节点上的容器,则AMRMProxy将与这些子集群的RM协商。子集群透明地提供给应用程序,从而使应用程序可以将整个联合环境视为一个大型YARN群集。AMRMProxy,全局策略生成器(GPG)和路由器一起工作,以实现无缝连接。
该图显示了以下作业执行流程的序列图:
要将YARN配置为使用联合,请在conf / yarn-site.xml中设置以下属性:
这些是通用配置,应显示在联合身份验证中每台计算机的conf / yarn-site.xml中。
属性 | 例 | 描述 |
---|---|---|
启用yarn.federation | 真正 | 是否启用联盟 |
yarn.resourcemanager.cluster-id | <unique-subcluster-id> | 此RM的唯一子集群标识符(与用于HA的子集群标识符相同)。 |
当前,我们支持状态存储的ZooKeeper和基于SQL的实现。
注意:必须始终使用以下其中之一覆盖State-Store实现。
ZooKeeper:必须为Hadoop设置ZooKeeper设置:
属性 | 例 | 描述 |
---|---|---|
yarn.federation.state-store.class | org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore | 要使用的状态存储的类型。 |
hadoop.zk.address | 主机:端口 | ZooKeeper合奏的地址。 |
SQL:必须设置以下参数:
属性 | 例 | 描述 |
---|---|---|
yarn.federation.state-store.class | org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore | 要使用的状态存储的类型。 |
yarn.federation.state-store.sql.url | jdbc:mysql:// <主机>:<端口> / FederationStateStore | 对于SQLFederationStateStore,存储状态的数据库的名称。 |
yarn.federation.state-store.sql.jdbc-class | com.mysql.jdbc.jdbc2.optional.MysqlDataSource | 对于SQLFederationStateStore,可以使用jdbc类。 |
yarn.federation.state-store.sql.username | <dbuser> | 对于SQLFederationStateStore,用于数据库连接的用户名。 |
yarn.federation.state-store.sql.password | <dbpass> | 对于SQLFederationStateStore,用于数据库连接的密码。 |
我们提供用于MySQL和Microsoft SQL Server的脚本。
对于MySQL,必须从MVN Repository下载最新的jar版本5.x ,并将其添加到CLASSPATH。然后,通过在数据库中执行以下SQL脚本来创建数据库模式:
在同一目录中,我们提供脚本来删除存储过程,表,用户和数据库。
注意: FederationStateStoreUser.sql为数据库定义了默认的用户名/密码,强烈建议您将其设置为正确的强密码。
对于SQL Server,该过程类似,但是已经包含了jdbc驱动程序。SQL Server脚本位于sbin / FederationStateStore / SQLServer /中。
属性 | 例 | 描述 |
---|---|---|
yarn.federation.failover.enabled | 真正 | 是否应该重试考虑每个子群集中的RM故障转移。 |
yarn.federation.blacklist-subclusters | <subcluster-id> | 列入黑名单的子集群列表,可用于禁用子集群 |
纱线联合会政策经理 | org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager | 策略管理器的选择确定如何通过系统路由应用程序和资源请求。 |
yarn.federation.policy-manager-params | <二进制> | 配置策略的有效负载。在我们的示例中,路由器和amrmproxy策略的一组权重。这通常是通过序列化以编程方式配置的策略管理器,或通过以.json序列化形式填充状态存储来生成的。 |
yarn.federation.subcluster-resolver.class | org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl | 用于解析节点属于哪个子集群以及机架属于哪个子集群的类。 |
纱线联合机械列表 | <机器列表文件的路径> | SubClusterResolver使用的计算机列表文件的路径。文件的每一行都是一个带有子集群和机架信息的节点。下面是示例: node1,subcluster1,机架1 node2,subcluster2,rack1 node3, subcluster3,rack2 node4,subcluster3,rack2 |
这些是额外的配置,应显示在每个ResourceManager 的conf / yarn-site.xml中。
属性 | 例 | 描述 |
---|---|---|
yarn.resourcemanager.epoch | <独特时代> | The seed value for the epoch. This is used to guarantee uniqueness of container-IDs generate by different RMs. It must therefore be unique among sub-clusters and well-spaced to allow for failures which increment epoch. Increments of 1000 allow for a large number of sub-clusters and practically ensure near-zero chance of collisions (a clash will only happen if a container is still alive for 1000 restarts of one RM, while the next RM never restarted, and an app requests more containers). |
Optional:
Property | Example | Description |
---|---|---|
yarn.federation.state-store.heartbeat-interval-secs | 60 | The rate at which RMs report their membership to the federation to the central state-store. |
These are extra configurations that should appear in the conf/yarn-site.xml at each Router.
Property | Example | Description |
---|---|---|
yarn.router.bind-host | 0.0.0.0 | Host IP to bind the router to. The actual address the server will bind to. If this optional address is set, the RPC and webapp servers will bind to this address and the port specified in yarn.router.*.address respectively. This is most useful for making Router listen to all interfaces by setting to 0.0.0.0. |
yarn.router.clientrm.interceptor-class.pipeline | org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor | A comma-separated list of interceptor classes to be run at the router when interfacing with the client. The last step of this pipeline must be the Federation Client Interceptor. |
Optional:
Property | Example | Description |
---|---|---|
yarn.router.hostname | 0.0.0.0 | Router host name. |
yarn.router.clientrm.address | 0.0.0.0:8050 | Router client address. |
yarn.router.webapp.address | 0.0.0.0:8089 | Webapp address at the router. |
yarn.router.admin.address | 0.0.0.0:8052 | Admin address at the router. |
yarn.router.webapp.https.address | 0.0.0.0:8091 | Secure webapp address at the router. |
yarn.router.submit.retry | 3 | The number of retries in the router before we give up. |
yarn.federation.statestore.max-connections | 10 | This is the maximum number of parallel connections each Router makes to the state-store. |
yarn.federation.cache-ttl.secs | 60 | The Router caches informations, and this is the time to leave before the cache is invalidated. |
yarn.router.webapp.interceptor-class.pipeline | org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST | A comma-seperated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. |
These are extra configurations that should appear in the conf/yarn-site.xml at each NodeManager.
Property | Example | Description |
---|---|---|
yarn.nodemanager.amrmproxy.enabled | true | Whether or not the AMRMProxy is enabled. |
yarn.nodemanager.amrmproxy.interceptor-class.pipeline | org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. |
Optional:
Property | Example | Description |
---|---|---|
yarn.nodemanager.amrmproxy.ha.enable | true | Whether or not the AMRMProxy HA is enabled for multiple application attempt support. |
yarn.federation.statestore.max-connections | 1 | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. |
yarn.federation.cache-ttl.secs | 300 | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. |
In order to submit jobs to a Federation cluster one must create a separate set of configs for the client from which jobs will be submitted. In these, the conf/yarn-site.xml should have the following additional configurations:
Property | Example | Description |
---|---|---|
yarn.resourcemanager.address | <router_host>:8050 | Redirects jobs launched at the client to the router’s client RM port. |
yarn.resourcemanager.scheduler.address | localhost:8049 | Redirects jobs to the federation AMRMProxy port. |
Any YARN jobs for the cluster can be submitted from the client configurations described above. In order to launch a job through federation, first start up all the clusters involved in the federation as described here. Next, start up the router on the router machine with the following command:
$HADOOP_HOME/bin/yarn --daemon start router
Now with $HADOOP_CONF_DIR pointing to the client configurations folder that is described above, run your job the usual way. The configurations in the client configurations folder described above will direct the job to the router’s client RM port where the router should be listening after being started. Here is an example run of a Pi job on a federation cluster from the client:
$HADOOP_HOME/bin/yarn jar hadoop-mapreduce-examples-3.0.0.jar pi 16 1000
This job is submitted to the router which as described above, uses a generated policy from the GPG to pick a home RM for the job to which it is submitted.
The output from this particular example job should be something like:
2017-07-13 16:29:25,055 INFO mapreduce.Job: Job job_1499988226739_0001 running in uber mode : false 2017-07-13 16:29:25,056 INFO mapreduce.Job: map 0% reduce 0% 2017-07-13 16:29:33,131 INFO mapreduce.Job: map 38% reduce 0% 2017-07-13 16:29:39,176 INFO mapreduce.Job: map 75% reduce 0% 2017-07-13 16:29:45,217 INFO mapreduce.Job: map 94% reduce 0% 2017-07-13 16:29:46,228 INFO mapreduce.Job: map 100% reduce 100% 2017-07-13 16:29:46,235 INFO mapreduce.Job: Job job_1499988226739_0001 completed successfully . . . Job Finished in 30.586 seconds Estimated value of Pi is 3.14250000......
也可以在RouterWeb UI上的routerhost:8089上跟踪作业的状态。请注意,使用联合身份验证无需更改代码或重新编译输入jar。同样,此作业的输出与不进行联合运行时的输出完全相同。另外,为了获得联合的全部好处,请使用足够数量的映射器,以便需要多个群集。在上面的示例中,该数字恰好是16。