侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊storm client的nimbus.seeds参数

2023-09-21 星期四 / 0 评论 / 0 点赞 / 109 阅读 / 9700 字

序 本文主要研究一下storm client的nimbus.seeds参数 NIMBUS_SEEDS storm-core-1.1.0-sources.jar!/org/apache/storm/Co

本文主要研究一下storm client的nimbus.seeds参数

NIMBUS_SEEDS

storm-core-1.1.0-sources.jar!/org/apache/storm/Config.java

    /**     * The host that the master server is running on, added only for backward compatibility,     * the usage deprecated in favor of nimbus.seeds config.     */    @Deprecated    @isString    public static final String NIMBUS_HOST = "nimbus.host";    /**     * List of seed nimbus hosts to use for leader nimbus discovery.     */    @isStringList    public static final String NIMBUS_SEEDS = "nimbus.seeds";
  • 可以看到这里废除了nimbus.host参数,而nimbus.seeds参数主要用于发现nimbus leader

StormSubmitter

storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java

    public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {        if(!Utils.isValidConf(stormConf)) {            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");        }        stormConf = new HashMap(stormConf);        stormConf.putAll(Utils.readCommandLineOpts());        Map conf = Utils.readStormConfig();        conf.putAll(stormConf);        stormConf.putAll(prepareZookeeperAuthentication(conf));        validateConfs(conf, topology);        Map<String,String> passedCreds = new HashMap<>();        if (opts != null) {            Credentials tmpCreds = opts.get_creds();            if (tmpCreds != null) {                passedCreds = tmpCreds.get_creds();            }        }        Map<String,String> fullCreds = populateCredentials(conf, passedCreds);        if (!fullCreds.isEmpty()) {            if (opts == null) {                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);            }            opts.set_creds(new Credentials(fullCreds));        }        try {            if (localNimbus!=null) {                LOG.info("Submitting topology " + name + " in local mode");                if (opts!=null) {                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);                } else {                    // this is for backwards compatibility                    localNimbus.submitTopology(name, stormConf, topology);                }                LOG.info("Finished submitting topology: " +  name);            } else {                String serConf = JSONValue.toJSONString(stormConf);                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {                    if (topologyNameExists(name, client)) {                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");                    }                    // Dependency uploading only makes sense for distributed mode                    List<String> jarsBlobKeys = Collections.emptyList();                    List<String> artifactsBlobKeys;                    DependencyUploader uploader = new DependencyUploader();                    try {                        uploader.init();                        jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);                        artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);                    } catch (Throwable e) {                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster                        uploader.deleteBlobs(jarsBlobKeys);                        uploader.shutdown();                        throw e;                    }                    try {                        setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);                        submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);                    } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster                        // Note that we don't handle TException to delete jars blobs                        // because it's safer to leave some blobs instead of topology not running                        uploader.deleteBlobs(jarsBlobKeys);                        throw e;                    } finally {                        uploader.shutdown();                    }                }            }        } catch(TException e) {            throw new RuntimeException(e);        }        invokeSubmitterHook(name, asUser, conf, topology);    }
  • StormSubmitter的submitTopologyAs通过NimbusClient.getConfiguredClientAs(conf, asUser)创建NimbusClient

NimbusClient

storm-core-1.1.0-sources.jar!/org/apache/storm/utils/NimbusClient.java

    public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {        if (conf.containsKey(Config.STORM_DO_AS_USER)) {            if (asUser != null && !asUser.isEmpty()) {                LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."                        , asUser, conf.get(Config.STORM_DO_AS_USER));            }            asUser = (String) conf.get(Config.STORM_DO_AS_USER);        }        List<String> seeds;        if(conf.containsKey(Config.NIMBUS_HOST)) {            LOG.warn("Using deprecated config {} for backward compatibility. Please update your storm.yaml so it only has config {}",                     Config.NIMBUS_HOST, Config.NIMBUS_SEEDS);            seeds = Lists.newArrayList(conf.get(Config.NIMBUS_HOST).toString());        } else {            seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);        }        for (String host : seeds) {            int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());            NimbusSummary nimbusSummary;            NimbusClient client = null;            try {                client = new NimbusClient(conf, host, port, null, asUser);                nimbusSummary = client.getClient().getLeader();                if (nimbusSummary != null) {                    String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();                    LOG.info("Found leader nimbus : {}", leaderNimbus);                    if (nimbusSummary.get_host().equals(host) && nimbusSummary.get_port() == port) {                        NimbusClient ret = client;                        client = null;                        return ret;                    }                    try {                        return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);                    } catch (TTransportException e) {                        throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);                    }                }            } catch (Exception e) {                LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host                                 + ". will retry with a different seed host.", e);                continue;            } finally {                if (client != null) {                    client.close();                }            }            throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try " +                                                            "again after some time.");        }        throw new NimbusLeaderNotFoundException(                "Could not find leader nimbus from seed hosts " + seeds + ". " +                        "Did you specify a valid list of nimbus hosts for config " +                        Config.NIMBUS_SEEDS + "?");    }
  • 这里仍然兼容NIMBUS_HOST参数,如果有NIMBUS_HOST参数则从中读取seeds,没有则从NIMBUS_SEEDS参数获取
  • 之后遍历seeds,根据每个seed创建NimbusClient,然后调用client.getClient().getLeader()获取leader信息,如果获取成功,则判断leader是否当前连接的seed,如果是则直接返回,如果不是则根据leader的host和port创建新的NimbusClient返回
  • 如果nimbusSummary为null,则会抛出NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.")
  • 如果连接leader出现异常,则遍历下一个seed,进行retry操作,如果所有seed都retry失败,则跳出循环,最后抛出NimbusLeaderNotFoundException("Could not find leader nimbus from seed hosts " + seeds + ". Did you specify a valid list of nimbus hosts for config nimbus.seeds?")

小结

  • 对于storm client来说,nimbus.seeds参数用于client进行寻找nimbus leader,而nimbus.host参数已经被废弃
  • 寻找nimbus leader的过程就是挨个遍历seeds配置的host,进行连接,然后获取leader的信息,如果获取成功但是nimbusSummary为null,则抛出NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.")。
  • 如果有异常则遍历下一个seed进行retry,如果都不成功,则最后跳出循环,抛出NimbusLeaderNotFoundException("Could not find leader nimbus from seed hosts " + seeds + ". Did you specify a valid list of nimbus hosts for config nimbus.seeds?")

doc

  • Setting-up-a-Storm-cluster

广告 广告

评论区