windows 11
ElasticSearch 5.6.2
Idea 2020
请注意,5.6.2是已经停止维护的版本,官方文档好多已经404,transport client
客户端在6.x的版本中被官方弃用,但是现在仍然有老服务在用。最新的elasticsearch 7.x和8.x不再支持jdk1.8,但是我的服务器在2023年仍不能升级高版本java,所以只能学习老版本。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ES_Client</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.2</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch.client</groupId>-->
<!-- <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!-- <version>5.6.2</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class IndexCreated {
public static void main(String[] args) {
try {
// 设置集群名称
Settings settings = Settings.builder()
.put("cluster.name", "elasticsearch")
.build();
// 创建 TransportClient
try (TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {
// 准备索引数据
String jsonDocument = "{" +
"\"user\":\"Marry Doe\"," +
"\"postDate\":\"2023-12-01\"," +
"\"message\":\"Hello, Index!\"" +
"}";
// 创建索引请求
IndexResponse response = client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonDocument)
.get();
// 输出索引结果
System.out.println("Index created: " + response.getId());
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
2 删除索引
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class DeleteIndexExample {
public static void main(String[] args) {
try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {
// 删除索引请求
DeleteIndexRequest request = new DeleteIndexRequest("geo_index");
// 执行删除索引请求
DeleteIndexResponse response = client.admin().indices().delete(request).actionGet();
// 打印删除索引响应
System.out.println("Index deleted: " + response.isAcknowledged());
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
public class ListAllIndicesExample {
public static void main(String[] args) {
try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {
// 获取所有索引请求
GetIndexRequest request = new GetIndexRequest();
// 执行获取所有索引请求
GetIndexResponse response = client.admin().indices().getIndex(request).actionGet();
// 获取所有索引的名称
String[] indices = response.getIndices();
// 打印所有索引的名称
System.out.println("All Indices:");
for (String index : indices) {
System.out.println(index);
// 获取索引的设置
try {
Settings indexSettings = client.admin().indices().prepareGetSettings(index).get().getIndexToSettings().get(index);
System.out.println("Settings: " + indexSettings);
} catch (Exception e) {
if (e.getMessage().contains("Index not exist")) {
System.out.println("Index not found: " + index);
} else {
e.printStackTrace();
}
}
// 获取索引的映射
try {
GetMappingsResponse mappingsResponse = client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).get();
ImmutableOpenMap<String, MappingMetaData> indexMappings = mappingsResponse.mappings().get(index);
System.out.println("Mappings: " + indexMappings);
} catch (Exception e) {
if (e.getMessage().contains("Index not exist")) {
System.out.println("Index not found: " + index);
} else {
e.printStackTrace();
}
}
System.out.println();
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class QueryIndexExample {
public static void main(String[] args) {
try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {
// 准备查询请求
SearchRequestBuilder searchRequest = client.prepareSearch("twitter")
.setQuery(QueryBuilders.matchAllQuery());
// 执行查询请求
SearchResponse searchResponse = searchRequest.get();
// 处理查询结果
System.out.println("Search results:");
System.out.println(searchResponse.toString());
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class DeleteIndexExample {
public static void main(String[] args) {
try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {
// 删除索引请求
DeleteIndexRequest request = new DeleteIndexRequest("geo_index");
// 执行删除索引请求
DeleteIndexResponse response = client.admin().indices().delete(request).actionGet();
// 打印删除索引响应
System.out.println("Index deleted: " + response.isAcknowledged());
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
}
将以下数据存入前文创建的索引geo_index
,csv文件一共20万行。
以下为数据示例,第二列为地理数据。每行第二列的 loc 字段包含包围符 "
,并且在字段中包含逗号 ,
,我们需要更复杂的逻辑来解析 CSV 文件。以下使用 Apache Commons CSV 库来处理包含引号和逗号的 CSV 数据:
12345678901234567890,"23.123456,113.123456"
23456789012345678901,"24.234567,114.234567"
34567890123456789012,"25.345678,115.345678"
45678901234567890123,"26.456789,116.456789"
代码假设csv文件位置在"D:\geo_clean.csv"
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.FileReader;
import java.net.InetAddress;
public class BulkInsertCsvToElasticsearch {
public static void main(String[] args) {
try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {
// 读取 CSV 文件
String csvFile = "D:\\geo_clean.csv";
CSVParser csvParser = CSVParser.parse(new FileReader(csvFile), CSVFormat.DEFAULT);
// 创建批量请求构建器
BulkRequestBuilder bulkRequest = client.prepareBulk();
// 读取 CSV 文件的每一行
for (CSVRecord csvRecord : csvParser) {
// 获取 CSV 数据中的两列
String did = csvRecord.get(0);
String loc = csvRecord.get(1);
// 创建索引请求
IndexRequest indexRequest = client.prepareIndex("geo_index", "doc")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("did", did)
.field("loc", loc)
.endObject())
.request();
// 添加索引请求到批量请求
bulkRequest.add(indexRequest);
}
// 执行批量请求
BulkResponse bulkResponse = bulkRequest.get();
// 处理批量响应
if (bulkResponse.hasFailures()) {
System.out.println("Bulk request failed with failures: " + bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk request successfully processed.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class QueryGeoIndexExample {
public static void main(String[] args) {
try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))) {
// 构建搜索请求
SearchRequest searchRequest = new SearchRequest("geo_index");
// 构建搜索源
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchSourceBuilder.size(10); // 限制返回结果的数量
// 设置搜索源
searchRequest.source(searchSourceBuilder);
// 执行搜索请求
SearchResponse searchResponse = client.search(searchRequest).actionGet();
// 打印搜索结果
System.out.println("Search Results:");
System.out.println(searchResponse.toString());
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}