侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

聊聊carrera的GroovyScriptAction

2022-06-11 星期六 / 0 评论 / 0 点赞 / 93 阅读 / 6498 字

序本文主要研究一下carrera的GroovyScriptActionActionDDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/

本文主要研究一下carrera的GroovyScriptAction

Action

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/Action.java

public interface Action {    enum Status {        FAIL, CONTINUE, FINISH, ASYNCHRONIZED    }    class UnsupportedDataType extends RuntimeException {    }    default Status act(UpstreamJob job) {        Object data = job.getData();        if (data instanceof byte[]) {            return act(job, (byte[]) data);        } else if (data instanceof JSONObject) {            return act(job, (JSONObject) data);        } else {            throw new UnsupportedDataType();        }    }    default Status act(UpstreamJob job, byte[] bytes) {        throw new UnsupportedDataType();    }    default Status act(UpstreamJob job, JSONObject jsonObject) {        throw new UnsupportedDataType();    }    default void shutdown() {        // DO NOTHING BY DEFAULT    }    default void logMetrics() {        // DO NOTHING BY DEFAULT    }}
  • Action接口定义了Status枚举,也定义了act、shutdown、logMetrics方法

GroovyScriptAction

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/GroovyScriptAction.java

public class GroovyScriptAction implements Action {    private final static String CARRERA_GROOVY_CONTEXT = "carreraContext";    @SuppressWarnings("rawtypes")    private final static LoadingCache<String, Class> cache = CacheBuilder        .newBuilder()        .expireAfterAccess(1, TimeUnit.HOURS)        .build(new CacheLoader<String, Class>() {            private final AtomicLong al = new AtomicLong(0);            @Override            public Class load(String key) throws Exception {                try (GroovyClassLoader groovyLoader = new GroovyClassLoader()) {                    GroovyCodeSource gcs = AccessController.doPrivileged((PrivilegedAction<GroovyCodeSource>) () -> new GroovyCodeSource(key, "Script" + al.getAndIncrement() + ".groovy", "/groovy/shell"));                    Class clazz = groovyLoader.parseClass(gcs, false);                    return clazz;                } catch (Throwable e) {                    LogUtils.logErrorInfo("GroovyScript_error", "[GroovyErr]", e);                    return null;                }            }        });    @Override    public Status act(UpstreamJob job, JSONObject jsonObject) {        String groovyText = job.getUpstreamTopic().getGroovyScript();        if (StringUtils.isBlank(groovyText)) {            return Status.FINISH;        }        try {            @SuppressWarnings("rawtypes")            Class groovyScript = cache.get(groovyText);            if (groovyScript == null) {                MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);                return Status.FINISH;            }            jsonObject.put(CARRERA_GROOVY_CONTEXT, new GroovyContext(job));            Script script = InvokerHelper.createScript(groovyScript, new Binding(jsonObject));            Object scriptRet = script.run();            if (scriptRet instanceof Boolean) {                if ((Boolean) scriptRet) {                    jsonObject.remove(CARRERA_GROOVY_CONTEXT);                    return Status.CONTINUE;                }            }        } catch (MissingPropertyException e) {            LogUtils.logErrorInfo("GroovyScript_error", "missing property exception, jsonObject:{}, job:{}, e.msg:{}",                    JsonUtils.toJsonString(jsonObject), job.info(), e.getMessageWithoutLocationText());        } catch (Throwable e) {            LogUtils.logErrorInfo("GroovyScript_error", "error when running groovy script, job={}, e={}", job, e.getMessage());        }        MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);        return Status.FINISH;    }}
  • GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

小结

GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

doc

.
.

广告 广告

评论区