把操作类注册在配置文件中,主程序通过java反射机制执行相应的操作类(Class),实现各类任务,便于在程序框架基本不动的前台下,按任务需要编写或修改相关服务,以满足敏捷化开发的需求。
datasource.DatasourceService Interface
datasource.GraphQLServiceImp graphql数据接口类
targetdb.targetdbService Interface
target.EsServiceImp ES库的操作类
task.taskService Interface
task.TracingAutoOrder 定时任务类
<!-- jackson库解析json文件 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
graphql处理用的是graphql-client
<dependency>
<groupId>org.mountcloud</groupId>
<artifactId>graphql-client</artifactId>
<version>1.2</version>
</dependency>
elasticsearch7.9
<!-- elasticsearch7.9.2 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.9.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.9.2</version>
</dependency>
<!-- elasticsearch 依赖 2.x 的 log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
{
"datasource" : {
"name": "datasource.GraphQLServiceImp",
"para": {
"url":"http://127.0.0.1:8090/graphql"
}
},
"targetdb" : {
"name": "target.EsServiceImp",
"para": {
"url":"http://127.0.0.1:9200"
}
},
{
"tasks" : [
{
"name" : "task.TracingAutoOrder"
},
...
]
}
即只要将节点name定义的类读入并执行即可,ConfigParser.java
public class ConfigParser {
private static Logger logger = LoggerFactory.getLogger(ConfigParser.class);
String rootPath = this.getClass().getResource("/").getPath(); //读入根目录
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonConfig;
public ConfigParser(String configFileName) throws IOException {
String jsonFile= rootPath+configFileName; //读入配置文件
File configFile = new File(jsonFile);
if (!configFile.exists()) {
logger.info("configFile: {} is not existed",jsonFile);
}else{
this.jsonConfig = this.objectMapper.readValue(configFile, JsonNode.class);
logger.info("configFile: {} is {}.",jsonFile,jsonConfig.toString());
}
}
public Object getClass(String className) throws Exception{ //反射,返回类的实例
Class<?> clazz=Class.forName(className);
return clazz.newInstance();
}
public DatasourceService getDatasource() throws Exception{
//获得graphql的操作类
String className=this.jsonConfig.get("datasource").get("name").asText();
//必须使用asText,不能用toString(),否则value会保留“”造成问题
return (DatasourceService)getClass(className);
}
public TargetdbService getTargetdb() throws Exception{
//获得目标数据库的操作类
String className=this.jsonConfig.get("targetdb").get("name").asText();
return (TargetdbService)getClass(className);
}
public String getGraphqlUrl(){
//获得graphqlUrl
return this.jsonConfig.get("datasource").get("para").get("url").asText()
}
DatasourceService接口主要是定义初始化连接以及其他操作方法
public interface DatasourceService {
public void initConnect(String url);
...
}
相关实现datasource.GraphQLServiceImp,除了初始化引用url连接之外,增加了一个查询
package datasource;
import...
public class GraphQLServiceImp implements DatasourceService{
@Override
public void initConnect(String url){
try{
GraphqlClient graphqlClient = GraphqlClient.buildGraphqlClient(url);
String queryMethodName = "searchService";
GraphqlQuery query = new DefaultGraphqlQuery(queryMethodName);
query.addParameter("serviceCode","cquant-trade-service");
query.addResultAttributes("id","name");
GraphqlResponse response = graphqlClient.doQuery(query);
Map result = response.getData();
System.out.println("result::"+result.toString());
}catch (Exception e){
e.printStackTrace();
}
...
TargetdbService接口也是如此,不赘述
public interface TargetdbService {
public void initConnect(String url);
...
}
target.EsServiceImp实现
public class EsServiceImp implements TargetdbService {
@Override
public void initConnect(String url) {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost(HttpHost.create(url)))
);
try {
String indexName = "test_sw_traces-2023-12-19";
GetIndexRequest request = new GetIndexRequest(indexName);
//boolean exists = esClient.indices().exists(request, RequestOptions.DEFAULT);
if (esClient.indices().exists(request, RequestOptions.DEFAULT)) {
//查询索引
GetIndexResponse getIndexResponse =
esClient.indices().get(request, RequestOptions.DEFAULT);
System.out.println(getIndexResponse.getAliases());
System.out.println(getIndexResponse.getMappings());
System.out.println(getIndexResponse.getSettings());
} else
System.out.println("index:" + indexName + " is ont existed");
}catch (Exception e) {
e.printStackTrace();
}
4 定时任务
用了ScheduledExecutorService,调度框架如下…
public class TaskManager {
private ScheduledExecutorService executorService;
public TaskManager() {
executorService = Executors.newScheduledThreadPool(5);
}
public void addTask(Runnable task, long delay, long period, TimeUnit timeUnit) {
executorService.scheduleAtFixedRate(task, delay, period, timeUnit);
}
public void shutdown() {
executorService.shutdown();
}
}
同样需要一个接口类,描述一类定时任务的初始化
package task;
import com.fasterxml.jackson.databind.JsonNode;
public interface TaskService {
public void init(JsonNode paraData);
}
定时任务实例,必须同时实现任务接口和runnable接口
public class TracingAutoOrder implements TaskService,Runnable{
String serviceName,endpointName,es_index;
@Override
public void run() {
System.out.println("Task executed at " + new Date()+" serviceName::"
+this.serviceName+" endpointName::"+ this.endpointName+" index::"+this.es_index);
}
@Override
public void init(JsonNode paraData) {
this.serviceName=paraData.get("serviceName").asText();
this.endpointName=paraData.get("endpointName").asText();
this.es_index=paraData.get("es_index").asText();
}
}
这里runnable就简化,仅仅打印变量
5 主程序
public class MyTaskProcess {
public static void main(String[] args) {
try{
// 读入配置文件
ConfigParser config=new ConfigParser("config.json");
// 连接SW Server 数据接口
DatasourceService datasourceInstance=config.getDatasource();
String datasourceUrl= config.getGraphqlUrl();
datasourceInstance.initConnect(datasourceUrl);
// 连接ES,获得可用的数据库
TargetdbService targetdbInstance=config.getTargetdb();
String targetdbUrl=config.getTargetDBUrl();
targetdbInstance.initConnect(targetdbUrl);
//执行定时任务,采集数据并入库
TaskManager taskManager = new TaskManager(); //任务管理器
System.out.println("start:: " + new Date());
//读入任务列表,并且遍历
ArrayNode taskList=config.getTaskList();
taskList.forEach(JsonNode->{
String taskName=JsonNode.get("name").asText();
System.out.println("taskName::"+taskName);
String switch_on=JsonNode.get("switch").asText();
System.out.println("switch_on::"+switch_on);
if(switch_on.equals("on")){
//判断开关是否打开
//String para=JsonNode.get("para").asText();
//System.out.println("para::"+para);
try {
TaskService task=(TaskService)config.getClass(taskName);
task.init(JsonNode.get("para"));
taskManager.addTask((Runnable) task, 1, 10, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}catch (Exception e){
e.printStackTrace();
}
//taskManager.shutdown();
}
}