Ignite提供了一个数据流API,可用于将大量连续的数据流注入Ignite集群,数据流API支持容错和线性扩展,并为注入Ignite的数据提供了至少一次保证,这意味着每个条目至少会被处理一次。
数据通过与缓存关联的数据流处理器流式注入到缓存中。数据流处理器自动缓冲数据并将其分组成批次以提高性能,并将其并行发送到多个节点。
数据流API提供以下功能:
数据流处理器与某个缓存关联,并提供用于将数据注入缓存的接口。
在典型场景中,用户拿到数据流处理器之后,会使用其中某个方法将数据流式注入缓存中,而Ignite根据分区规则对数据条目进行批处理,从而避免不必要的数据移动。
拿到某个缓存的数据流处理器的方法如下:
// Get the data streamer reference and stream data.
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myCache")) {
// Stream entries.
for (int i = 0; i < 100000; i++)
stmr.addData(i, Integer.toString(i));
}
System.out.println("dataStreamerExample output:" + cache.get(99999));
在Ignite的Java版本中,数据流处理器是IgniteDataStreamer
接口的实现,IgniteDataStreamer
提供了一组addData(…?)
方法来向缓存中添加键-值对,完整的方法列表,可以参见IgniteDataStreamer的javadoc。
数据流处理器默认不会覆盖已有的数据,通过将allowOverwrite
属性配置为true
,可以修改该行为。
stmr.allowOverwrite(true);
提示
如果allowOverwrite
配置为false
(默认),更新不会传播到外部存储(如果开启)。
如果需要在添加新数据之前执行自定义逻辑,则可以使用数据流接收器。在将数据存储到缓存之前,数据流接收器用于以并置方式处理数据,其中实现的逻辑会在存储数据的节点上执行。
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myCache")) {
stmr.allowOverwrite(true);
stmr.receiver((StreamReceiver<Integer, String>) (cache, entries) -> entries.forEach(entry -> {
// do something with the entry
cache.put(entry.getKey(), entry.getValue());
}));
}
提示
注意数据流接收器不会自动将数据注入缓存,需要显式地调用put(…?)
方法之一。
警告
要在远端节点执行的接收器类定义必须在该节点可用,这可通过2种方式实现:
StreamTransformer
是StreamReceiver
的简单实现,用于更新流中的数据。数据流转换器利用了并置的特性,并在将要存储数据的节点上更新数据。
在下面的示例中,使用StreamTransformer
为文本流中找到的每个不同单词增加一个计数:
String[] text = { "hello", "world", "hello", "Ignite" };
CacheConfiguration<String, Long> cfg = new CacheConfiguration<>("wordCountCache");
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(cfg);
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
// Allow data updates.
stmr.allowOverwrite(true);
// Configure data transformation to count instances of the same word.
stmr.receiver(StreamTransformer.from((e, arg) -> {
// Get current count.
Long val = e.getValue();
// Increment count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}));
// Stream words into the streamer cache.
for (String word : text)
stmr.addData(word, 1L);
}
StreamVisitor
也是StreamReceiver
的一个方便实现,它会访问流中的每个键-值对,但不会更新缓存。如果键-值对需要存储在缓存内,那么需要显式地调用任意的put(...)
方法。
在下面的示例中,有两个缓存:marketData
和instruments
,收到market数据的瞬间就会将它们放入marketData
缓存的流处理器,映射到某market数据的集群节点上的marketData
的流处理器的StreamVisitor
就会被调用,在分别收到market数据后就会用最新的市场价格更新instrument
缓存。
注意,根本不会更新marketData
缓存,它一直是空的,只是直接在数据将要存储的集群节点上简单利用了market数据的并置处理能力。
static class Instrument {
final String symbol;
Double latest;
Double high;
Double low;
public Instrument(String symbol) {
this.symbol = symbol;
}
}
static Map<String, Double> getMarketData() {
//populate market data somehow
return new HashMap<>();
}
@Test
void streamVisitorExample() {
try (Ignite ignite = Ignition.start()) {
CacheConfiguration<String, Double> mrktDataCfg = new CacheConfiguration<>("marketData");
CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instruments");
// Cache for market data ticks streamed into the system.
IgniteCache<String, Double> mrktData = ignite.getOrCreateCache(mrktDataCfg);
// Cache for financial instruments.
IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg);
try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer("marketData")) {
// Note that we do not populate the 'marketData' cache (it remains empty).
// Instead we update the 'instruments' cache based on the latest market price.
mktStmr.receiver(StreamVisitor.from((cache, e) -> {
String symbol = e.getKey();
Double tick = e.getValue();
Instrument inst = instCache.get(symbol);
if (inst == null)
inst = new Instrument(symbol);
// Update instrument price based on the latest market tick.
inst.high = Math.max(inst.high, tick);
inst.low = Math.min(inst.low, tick);
inst.latest = tick;
// Update the instrument cache.
instCache.put(symbol, inst);
}));
// Stream market data into the cluster.
Map<String, Double> marketData = getMarketData();
for (Map.Entry<String, Double> tick : marketData.entrySet())
mktStmr.addData(tick);
}
}
}
数据流处理器线程池专用于处理来自数据流处理器的消息。
默认池大小为max(8, CPU总核数)
,使用IgniteConfiguration.setDataStreamerThreadPoolSize(…?)
可以改变线程池的大小。
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDataStreamerThreadPoolSize(10);
Ignite ignite = Ignition.start(cfg);
在缓存上的所有操作都是通过IgniteCache
实例进行的,也可以在已有的缓存上拿到IgniteCache
,也可以动态创建。
Ignite ignite = Ignition.ignite();
// Obtain an instance of the cache named "myCache".
// Note that different caches may have different generics.
IgniteCache<Integer, String> cache = ignite.cache("myCache");
动态创建缓存方式如下:
Ignite ignite = Ignition.ignite();
CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>();
cfg.setName("myNewCache");
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
// Create a cache with the given name if it does not exist.
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);
关于缓存的配置参数,请参见缓存配置章节的内容。
在基线拓扑变更过程中调用创建缓存的方法,会抛出org.apache.ignite.IgniteCheckedException
异常:
javax.cache.CacheException: class org.apache.ignite.IgniteCheckedException: Failed to start/stop cache, cluster state change is in progress.
at org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1323)
at org.apache.ignite.internal.IgniteKernal.createCache(IgniteKernal.java:3001)
at org.apache.ignite.internal.processors.platform.client.cache.ClientCacheCreateWithNameRequest.process(ClientCacheCreateWithNameRequest.java:48)
at org.apache.ignite.internal.processors.platform.client.ClientRequestHandler.handle(ClientRequestHandler.java:51)
at org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.onMessage(ClientListenerNioListener.java:173)
at org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.onMessage(ClientListenerNioListener.java:47)
at org.apache.ignite.internal.util.nio.GridNioFilterChain$TailFilter.onMessageReceived(GridNioFilterChain.java:278)
at org.apache.ignite.internal.util.nio.GridNioFilterAdapter.proceedMessageReceived(GridNioFilterAdapter.java:108)
at org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter$3.body(GridNioAsyncNotifyFilter.java:96)
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:119)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
如果拿到这个异常,可以进行重试。
要在整个集群中删除一个缓存,需要调用destroy()
方法:
Ignite ignite = Ignition.ignite();
IgniteCache<Long, String> cache = ignite.cache("myCache");
cache.destroy();
拿到缓存实例后,就可以对其进行读写操作:
IgniteCache<Integer, String> cache = ignite.cache("myCache");
// Store keys in the cache (the values will end up on different cache nodes).
for (int i = 0; i < 10; i++)
cache.put(i, Integer.toString(i));
for (int i = 0; i < 10; i++)
System.out.println("Got [key=" + i + ", val=" + cache.get(i) + ']');
提示
putAll()
和putAll()
这样的批量操作方法,是以原子化的模式按顺序执行,可能部分失败。发生这种情况时,会抛出包含了更新失败数据列表的CachePartialUpdateException
异常。 如果希望在一个操作中更新条目的集合,建议考虑使用事务。
下面是更多基本缓存操作的示例:
// Put-if-absent which returns previous value.
String oldVal = cache.getAndPutIfAbsent(11, "Hello");
// Put-if-absent which returns boolean success flag.
boolean success = cache.putIfAbsent(22, "World");
// Replace-if-exists operation (opposite of getAndPutIfAbsent), returns previous
// value.
oldVal = cache.getAndReplace(11, "New value");
// Replace-if-exists operation (opposite of putIfAbsent), returns boolean
// success flag.
success = cache.replace(22, "Other new value");
// Replace-if-matches operation.
success = cache.replace(22, "Other new value", "Yet-another-new-value");
// Remove-if-matches operation.
success = cache.remove(11, "Hello");
大多数缓存操作方法都有对应的异步执行模式,方法名带有Async
后缀。
// a synchronous get
V get(K key);
// an asynchronous get
IgniteFuture<V> getAsync(K key);
异步操作会返回一个代表操作结果的对象,可以以阻塞或非阻塞的方式,等待操作的完成。
以非阻塞的方式等待结果,可以使用IgniteFuture.listen()
或IgniteFuture.chain()
方法注册一个闭包,其会在操作完成后被调用。
IgniteCompute compute = ignite.compute();
// Execute a closure asynchronously.
IgniteFuture<String> fut = compute.callAsync(() -> "Hello World");
// Listen for completion and print out the result.
fut.listen(f -> System.out.println("Job result: " + f.get()));
闭包执行和线程池
如果在将闭包传递给IgniteFuture.listen()
或IgniteFuture.chain()
方法时已完成异步操作,则该闭包由调用线程同步执行。否则当操作完成时,闭包将异步执行。
根据操作的类型,闭包将被系统线程池中的线程(异步缓存操作)或公共线程池中的线程(异步计算操作)调用。因此应避免在闭包内部调用同步缓存和计算操作,因为由于线程池不足,它可能导致死锁。
为了实现异步计算操作的嵌套执行,可以利用自定义线程池。
在Ignite中,数据以二进制格式存储,然后在每次读取时再反序列化为对象,不过可以直接操作二进制对象避免反序列化。
二进制对象是缓存数据的二进制表示的包装器,每个二进制对象都有field(name)
方法(返回对应字段的值)和type()
方法(提取对象的类型信息)。当只需要处理对象的部分字段而不需要反序列化整个对象时,二进制对象会很有用。
处理二进制对象时不需要具体的类定义,不重启集群就可以动态修改对象的结构。
在所有支持的平台上,二进制对象格式都是统一的,包括Java、.NET和C++。可以启动一个Java版Ignite集群,然后使用.NET和C++客户端接入集群,然后在这些客户端上使用二进制对象而不需要持有类定义。
限制
BinaryObject
格式在类的不同层次上也不允许有同样的属性名;Externalizable
接口,Ignite会使用OptimizedMarshaller
,OptimizedMarshaller
会使用writeExternal()
和readExternal()
来进行类对象的序列化和反序列化,这需要将实现Externalizable
的类加入服务端节点的类路径中。当从缓存中拿数据时,默认返回的是反序列化格式,要处理二进制格式,需要使用withKeepBinary()
方法拿到缓存的实例,这个实例会尽可能返回二进制格式的对象。
// Create a regular Person object and put it into the cache.
Person person = new Person(1, "FirstPerson");
ignite.cache("personCache").put(1, person);
// Get an instance of binary-enabled cache.
IgniteCache<Integer, BinaryObject> binaryCache = ignite.cache("personCache").withKeepBinary();
BinaryObject binaryPerson = binaryCache.get(1);
注意并不是所有的对象都会转为二进制对象格式,下面的类不会进行转换(即toBinary(Object)
方法返回原始对象,以及这些类的实例存储不会发生变化):
byte
、int
等)及其包装类(Byte
、Integer
等);byte[]
、int[]
等);String
及其数组;UUID
及其数组;Date
及其数组;Timestamp
及其数组;Enum
及其数组;二进制对象实例是不可变的,要更新字段或者创建新的二进制对象,需要使用二进制对象的建造器工具类,其可以在没有对象的类定义的前提下,修改二进制对象的字段。
限制
二进制对象建造器实例获取方式如下:
BinaryObjectBuilder builder = ignite.binary().builder("org.apache.ignite.snippets.Person");
builder.setField("id", 2L);
builder.setField("name", "SecondPerson");
binaryCache.put(2, builder.build());
通过这个方式创建的建造器没有任何字段,调用setField(…?)
方法可以添加字段:
通过调用toBuilder()
方法,也可以从一个已有的二进制对象上获得建造器实例,这时该二进制对象的所有字段都会复制到该建造器中。
在下面的示例中,会在服务端通过EntryProcessor
机制更新一个对象,而不需要在该节点部署该对象类定义,也不需要完整对象的反序列化。
// The EntryProcessor is to be executed for this key.
int key = 1;
ignite.cache("personCache").<Integer, BinaryObject>withKeepBinary().invoke(key, (entry, arguments) -> {
// Create a builder from the old value.
BinaryObjectBuilder bldr = entry.getValue().toBuilder();
//Update the field in the builder.
bldr.setField("name", "Ignite");
// Set new value to the entry.
entry.setValue(bldr.build());
return null;
});
二进制对象持有其表示的对象的类型信息,类型信息包括字段名、字段类型和关联字段名。
每个字段的类型通过一个BinaryField
对象来表示,获得BinaryField
对象后,如果需要从集合中的每个对象读取相同的字段,则可以多次重用该对象。重用BinaryField
对象比直接从每个二进制对象读取字段值要快,下面是使用二进制字段的示例:
Collection<BinaryObject> persons = getPersons();
BinaryField salary = null;
double total = 0;
int count = 0;
for (BinaryObject person : persons) {
if (salary == null) {
salary = person.type().field("salary");
}
total += (float) salary.value(person);
count++;
}
double avg = total / count;
Ignite为给定类型的每个二进制对象保留一个模式,该模式指定对象中的字段及其顺序和类型。模式在整个集群中复制,具有相同字段但顺序不同的二进制对象被认为具有不同的模式,因此建议以相同的顺序往二进制对象中添加字段。
空字段通常需要5个字节来存储,字段ID4个字节,字段长度1个字节。在内存方面,最好不要包含字段,也不要包含空字段。但是,如果不包括字段,则Ignite会为此对象创建一个新模式,该模式与包含该字段的对象的模式不同。如果有多个字段以随机组合设置为null
,那么Ignite会为每种组合维护一个不同的二进制对象模式,这样Java堆可能会被二进制对象模式耗尽。最好为二进制对象提供几个模式,并以相同的顺序设置相同类型的相同字段集。通过提供相同的字段集(即使具有空值)来创建二进制对象时,选择其中一个,这也是需要为空字段提供字段类型的原因。
如果有一个子集的字段是可选的,但要么全部不存在,要么全部存在,那么也可以嵌套二进制对象,可以将它们放在单独的二进制对象中,该对象存储在父对象的字段下,或者设置为null。
如果有大量字段,这些字段在任何组合中都是可选的,并且通常为空,则可以将其存储在映射字段中,值对象中将有几个固定字段,还有一个映射用于其他属性。
在绝大多数场景中,无需配置二进制对象。但是如果需要更改类型和字段ID的生成或插入自定义序列化器,则可以通过配置来实现。
二进制对象的类型和字段由其ID标识,该ID由相对应的字符串名计算为哈希值,并将其存储在每个二进制对象中,可以在配置中定义自己的ID生成实现。
名字到ID的转换分为两个步骤。首先,由名字映射器转换类型名(类名)或字段名,然后由ID映射器计算ID。可以指定全局名字映射器,全局ID映射器和全局二进制序列化器,以及每个类型的映射器和序列化器。每个类型的配置均支持通配符,这时所提供的配置将应用于与类型名字模板匹配的所有类型。
IgniteConfiguration igniteCfg = new IgniteConfiguration();
BinaryConfiguration binaryConf = new BinaryConfiguration();
binaryConf.setNameMapper(new MyBinaryNameMapper());
binaryConf.setIdMapper(new MyBinaryIdMapper());
BinaryTypeConfiguration binaryTypeCfg = new BinaryTypeConfiguration();
binaryTypeCfg.setTypeName("org.apache.ignite.snippets.*");
binaryTypeCfg.setSerializer(new ExampleSerializer());
binaryConf.setTypeConfigurations(Collections.singleton(binaryTypeCfg));
igniteCfg.setBinaryConfiguration(binaryConf);
IgniteCache
有几个查询方法,他们会接收Query
类的子类,然后返回一个QueryCursor
。
Query
表示在缓存上执行的分页查询的抽象,页面大小通过Query.setPageSize(…?)
进行配置,默认值为1024
。
QueryCursor
表示结果集,可以透明地按页迭代。当用户迭代到页尾时,QueryCursor
会自动在后台请求下一页。对于不需要分页的场景,可以使用QueryCursor.getAll()
方法,其会拿到所有的数据,并将其存储在一个集合中。
关闭游标
调用QueryCursor.getAll()
方法时,游标会自动关闭。如果在循环中迭代游标,或者显式拿到Iterator
,必须手动关闭游标,或者使用try-with-resources
语句。
扫描查询是以分布式的方式从缓存中获取数据的简单搜索查询,如果执行时没有参数,扫描查询会从缓存中获取所有数据。
IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");
QueryCursor<Cache.Entry<Integer, Person>> cursor = cache.query(new ScanQuery<>());
如果指定了谓语,扫描查询会返回匹配谓语的数据,谓语应用于远端节点:
IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");
// Find the persons who earn more than 1,000.
IgniteBiPredicate<Integer, Person> filter = (key, p) -> p.getSalary() > 1000;
try (QueryCursor<Cache.Entry<Integer, Person>> qryCursor = cache.query(new ScanQuery<>(filter))) {
qryCursor.forEach(
entry -> System.out.println("Key = " + entry.getKey() + ", Value = " + entry.getValue()));
}
扫描查询还支持可选的转换器闭包,可以在数据返回之前在服务端转换数据,比如,当只想从大对象中获取少量字段,以最小化网络传输时,这个功能就很有用。下面的示例显示如何只返回键,而不返回值:
IgniteCache<Integer, Person> cache = ignite.getOrCreateCache("myCache");
// Get only keys for persons earning more than 1,000.
List<Integer> keys = cache.query(new ScanQuery<>(
// Remote filter
(IgniteBiPredicate<Integer, Person>) (k, p) -> p.getSalary() > 1000),
// Transformer
(IgniteClosure<Cache.Entry<Integer, Person>, Integer>) Cache.Entry::getKey).getAll();
扫描查询默认是分布到所有节点上的,不过也可以只在本地执行查询,这时查询只会处理本地节点(查询执行的节点)上存储的数据。
QueryCursor<Cache.Entry<Integer, Person>> cursor = cache
.query(new ScanQuery<Integer, Person>().setLocal(true));
警告
这是个试验性API。
读修复
是指在正常读取操作期间修复主备数据之间不一致的技术。当用户操作读取了某个或某些键时,Ignite会检查给定键在所有备份副本中的值。
读修复模式旨在保持一致性。不过由于检查了备份副本,因此读操作的成本增加了约2倍。通常不建议一直使用此模式,而应一次性使用。
要启用读修复模式,需要获取一个开启了读修复的缓存实例,如下所示:
IgniteCache<Object, Object> cache = ignite.cache("my_cache").withReadRepair();
Object value = cache.get(10);
一致性检查与下面的缓存配置不兼容:
拓扑中的值将替换为最新版本的值。
TransactionConcurrency.OPTIMISTIC
并发模型或TransactionIsolation.READ_COMMITTED
隔离级别的事务自动处理;TransactionConcurrency.PESSIMISTIC
并发模型和TransactionIsolation.READ_COMMITTED
隔离级别的事务,在commit()
阶段自动处理;当检测到备份不一致时,Ignite将生成一个违反一致性事件(如果在配置中启用了该事件),通过监听该事件可以获取有关不一致问题的通知。关于如果进行事件监听,请参见使用事件的介绍。
如果事务中已经缓存了值,则读修复不能保证检查所有副本
。例如,如果使用非TransactionIsolation.READ_COMMITTED
隔离级别,并且已经读取了该值或执行了写入操作,则将获得缓存的值。
如果发现差异,则抛出违反一致性异常。
由于原子化缓存的性质,可以观察到假阳性结果。比如在缓存加载中尝试检查一致性可能会触发违反一致性异常。读修复的实现会尝试检查给定键三次,尝试次数可以通过IGNITE_NEAR_GET_MAX_REMAPS
系统属性来修改。
注意不会为原子缓存记录违反一致性事件。
要为某个缓存开启事务支持,需要在缓存配置中将atomicityMode
设置为TRANSACTIONAL
,具体请参见原子化模式。
事务可以将多个缓存操作,可能对应一个或者多个键,组合为一个单个原子事务,这些操作在没有任何其他交叉操作的情况下执行,或全部成功或全部失败,没有部分成功的状态。
在缓存配置中可以为缓存开启事务支持:
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setName("cacheName");
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setCacheConfiguration(cacheCfg);
// Optional transaction configuration. Configure TM lookup here.
TransactionConfiguration txCfg = new TransactionConfiguration();
cfg.setTransactionConfiguration(txCfg);
// Start a node
Ignition.start(cfg);
键-值API为开启和完成事务以及获取和事务有关的指标,提供了一个接口,该接口可以通过Ignite
实例获得:
Ignite ignite = Ignition.ignite();
IgniteTransactions transactions = ignite.transactions();
try (Transaction tx = transactions.txStart()) {
Integer hello = cache.get("Hello");
if (hello == 1)
cache.put("Hello", 11);
cache.put("World", 22);
tx.commit();
}
当原子化模式配置为TRANSACTIONAL
时,Ignite对事务支持乐观
和悲观
的并发模型。并发模型决定了何时获得一个条目级的事务锁-在访问数据时或者在prepare
阶段。锁定可以防止对一个对象的并发访问。比如,当试图用悲观锁更新一个ToDo列表项时,服务端会在该对象上置一个锁,在提交或者回滚该事务之前,其它的事务或者操作都无法更新该条目。不管在一个事务中使用那种并发模型,在提交之前都存在事务中的所有条目被锁定的时刻。
隔离级别定义了并发事务如何"看"以及处理针对同一个键的操作。Ignite支持READ_COMMITTED
、REPEATABLE_READ
、SERIALIZABLE
隔离级别。
并发模型和隔离级别的所有组合都是可以同时使用的。下面是针对Ignite提供的每一个并发-隔离组合的行为和保证的描述。
在PESSIMISTIC
事务中,锁是在第一次读或者写访问期间获得(取决于隔离级别)然后被事务持有直到其被提交或者回滚。该模式中,锁首先在主节点获得然后在准备阶段提升至备份节点。下面的隔离级别可以配置为PESSIMISTIC
并发模型。
READ_COMMITTED
:数据被无锁地读取并且不会被事务本身缓存。如果缓存配置允许,数据是可能从一个备份节点中读取的。在这个隔离级别中,可以有所谓的非可重复读,因为当在自己的事务中读取数据两次时,一个并发事务可以改变该数据。锁只有在第一次写访问时才会获得(包括EntryProcessor
调用)。这意味着事务中已经读取的一个条目在该事务提交时可能有一个不同的值,这种情况是不会抛出异常的。REPEATABLE_READ
:获得条目锁以及第一次读或者写访问时从主节点获得数据,然后就存储在本地事务映射中。之后对同一数据的所有连续访问都是本地化的,并且返回最后一次读或者被更新的事务值。这意味着没有其它的并发事务可以改变锁定的数据,这样就获得了事务的可重复读。SERIALIZABLE
:在PESSIMISTIC
模式中,这个隔离级别与REPEATABLE_READ
是一样的工作方式。注意,在PESSIMISTIC
模式中,锁的顺序是很重要的。此外Ignite可以按照指定的顺序依次并且准确地获得锁。
拓扑变化约束
注意,如果至少获取了一个PESSIMISTIC
事务锁,则在提交或回滚事务之前,将无法更改缓存拓扑。因此应该避免长时间持有事务锁。
在OPTIMISTIC
事务中,条目锁是在二阶段提交的准备
阶段从主节点获得的,然后提升至备份节点,该锁在事务提交时被释放。如果回滚事务没有试图做提交,是不会获得锁的。下面的隔离级别可以与OPTIMISTIC
并发模型配置在一起。
READ_COMMITTED
:应该作用于缓存的改变是在源节点上收集的,然后事务提交后生效。事务数据无锁地读取并且不会在事务中缓存。如果缓存配置允许,该数据是可能从备份节点中读取的。在这个隔离级别中,可以有一个所谓的非可重复读,因为在自己的事务中读取数据两次时另一个事务可以修改数据。这个模式组合在第一次读或者写操作后如果条目值被修改是不会做校验的,并且不会抛出异常。REPEATABLE_READ
:这个隔离级别的事务的工作方式类似于OPTIMISTIC
的READ_COMMITTED
的事务,只有一个不同:读取值缓存于发起节点并且所有的后续读保证都是本地化的。这个模式组合在第一次读或者写操作后如果条目值被修改是不会做校验的,并且不会抛出异常。SERIALIZABLE
:在第一次读访问之后会存储一个条目的版本,如果Ignite引擎检测到发起事务中的条目只要有一个被修改,Ignite就会在提交阶段放弃该事务,这是在提交阶段对集群内的事务中记载的条目的版本进行内部检查实现的。简而言之,这意味着Ignite如果在一个事务的提交阶段检测到一个冲突,就会放弃这个事务并且抛出TransactionOptimisticException
异常以及回滚已经做出的任何改变,开发者应该处理这个异常并且重试该事务。CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>();
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setName("myCache");
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);
// Re-try the transaction a limited number of times.
int retryCount = 10;
int retries = 0;
// Start a transaction in the optimistic mode with the serializable isolation
// level.
while (retries < retryCount) {
retries++;
try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC,
TransactionIsolation.SERIALIZABLE)) {
// modify cache entries as part of this transaction.
cache.put(1, "foo");
cache.put(2, "bar");
// commit the transaction
tx.commit();
// the transaction succeeded. Leave the while loop.
break;
} catch (TransactionOptimisticException e) {
// Transaction has failed. Retry.
}
}
这里另外一个需要注意的重要的点是,即使一个条目只是简单地读取(没有改变,cache.put(...)
),一个事务仍然可能失败,因为该条目的值对于发起事务中的逻辑很重要。
注意,对于READ_COMMITTED
和REPEATABLE_READ
事务,键的顺序是很重要的,因为这些模式中锁也是按顺序获得的。
为了在悲观模式下实现完全的读一致性,需要获取读锁。这意味着只有在悲观的可重复读(或可序列化)事务中,悲观模式下的读之间的完全一致性才能够实现。
当使用乐观事务时,通过禁止读之间的潜在冲突,可以实现完全的读一致性。此行为由乐观的可序列化模式提供。但是,请注意,在提交发生之前,用户仍然可以读取部分事务状态,因此事务逻辑必须对其进行保护。只有在提交阶段,在任何冲突的情况下,才会抛出TransactionOptimisticException
,这将允许开发者重试事务。
警告
如果没有使用悲观的可重复读或者可序列化事务,或者也没有使用乐观的序列化事务,那么就可以看到部分的事务状态,这意味着如果一个事务更新对象A和B,那么另一个事务可能看到A的新值和B的旧值。
当处理分布式事务时必须要遵守的主要规则是参与一个事务的键的锁,必须按照同样的顺序获得,违反这个规则就可能导致分布式死锁。
Ignite无法避免分布式死锁,而是有一个内建的功能来使调试和解决这个问题更容易。
在下面的代码片段中,事务启动时带有超时限制,如果到期,死锁检测过程就会试图查找一个触发这个超时的可能的死锁。当超过超时时间时,无论死锁如何,都会向应用层抛出CacheException
,并将TransactionTimeoutException
作为其触发原因。不过如果检测到了一个死锁,返回的TransactionTimeoutException
的触发原因会是TransactionDeadlockException
(至少涉及死锁的一个事务)。
CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>();
cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cfg.setName("myCache");
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);
try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
TransactionIsolation.READ_COMMITTED, 300, 0)) {
cache.put(1, "1");
cache.put(2, "1");
tx.commit();
} catch (CacheException e) {
if (e.getCause() instanceof TransactionTimeoutException
&& e.getCause().getCause() instanceof TransactionDeadlockException)
System.out.println(e.getCause().getCause().getMessage());
}
TransactionDeadlockException
里面包含了有用的信息,有助于找到导致死锁的原因。
Deadlock detected:
K1: TX1 holds lock, TX2 waits lock.
K2: TX2 holds lock, TX1 waits lock.
Transactions:
TX1 [txId=GridCacheVersion [topVer=74949328, time=1463469328421, order=1463469326211, nodeOrder=1], nodeId=ad68354d-07b8-4be5-85bb-f5f2362fbb88, threadId=73]
TX2 [txId=GridCacheVersion [topVer=74949328, time=1463469328421, order=1463469326210, nodeOrder=1], nodeId=ad68354d-07b8-4be5-85bb-f5f2362fbb88, threadId=74]
Keys:
K1 [key=1, cache=default]
K2 [key=2, cache=default]
死锁检测是一个多步过程,根据集群中节点的数量、键以及可能导致死锁涉及的事务数,可能需要做多次迭代。一个死锁检测的发起者是发起事务并且出现TransactionTimeoutException
错误的那个节点,这个节点会检查是否发生了死锁,通过与其它远程节点交换请求/响应,并且准备一个与死锁有关的、由TransactionDeadlockException
提供的报告,每个这样的消息(请求/响应)都会被称为一次迭代。
因为死锁检测过程不结束,事务就不会回滚,有时,如果希望对于事务回滚有一个可预测的时间,调整一下参数还是有意义的(下面会描述)。
IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS
:指定死锁检测过程迭代的最大数,如果这个属性的值小于等于0,死锁检测会被禁用(默认为1000);IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT
:指定死锁检测机制的超时时间(默认为1分钟)。注意如果迭代次数太少,可能获得一个不完整的死锁检测报告。
对于OPTIMISTIC``SERIALIZABLE
事务,锁不是按顺序获得的。该模式中键可以按照任何顺序访问,因为事务锁是通过一个额外的检查以并行的方式获得的,这使得Ignite可以避免死锁。
这里需要引入几个概念来描述SERIALIZABLE
的事务锁的工作方式。Ignite中的每个事务都会被赋予一个叫做XidVersion
的可比较的版本号,事务提交时该事务中修改的每个条目都会被赋予一个叫做EntryVersion
的新的版本号,一个版本号为XidVersionA
的OPTIMISTIC``SERIALIZABLE
事务在如下情况下会抛出TransactionOptimisticException
异常而失败:
PESSIMISTIC
或者非可序列化OPTIMISTIC
事务在SERIALIZABLE
事务中的一个条目上持有了一个锁;XidVersionB
的OPTIMISTIC``SERIALIZABLE
事务,在XidVersionB > XidVersionA
时以及这个事务在SERIALIZABLE
事务中的一个条目上持有了一个锁;OPTIMISTIC``SERIALIZABLE
事务获得所有必要的锁时,存在在提交之前的版本与当前版本不同的条目;注意
在一个高并发环境中,乐观锁可能出现高事务失败率,而悲观锁如果锁被事务以一个不同的顺序获得可能导致死锁。
不过在一个同质化的环境中,乐观可序列化锁对于大的事务可能提供更好的性能,因为网络交互的数量只取决于事务相关的节点的数量,而不取决于事务中的键的数量。
如下的异常可能导致事务失败:
异常名称 | 描述 | 解决办法 |
---|---|---|
由TransactionTimeoutException 触发的CacheException | 事务超时会触发TransactionTimeoutException 。 | 可以增加超时时间或者缩短事务执行时间 |
TransactionDeadlockException 触发TransactionTimeoutException ,再触发CacheException | 事务死锁会触发这个异常。 | 使用死锁检测机制调试和修正死锁,或者切换到乐观序列化事务(无死锁事务)。 |
TransactionOptimisticException | 某种原因的乐观事务失败会抛出这个异常,大多数情况下,该异常发生在事务试图并发更新数据的场景中。 | 重新执行事务。 |
TransactionRollbackException | 事务自动或者手动回滚时,可能抛出这个异常,这时,数据状态是一致的。 | 因为数据状态是一致的,所以可以对事务进行重试。 |
TransactionHeuristicException | 这是一个不太可能发生的异常,由Ignite中意想不到的内部错误或者通信错误导致,该异常存在于事务子系统无法预知的不确定场景中,目前没有被合理地处理。 | 如果出现该异常,数据可能不一致,这时需要对数据进行重新加载,或者报告给Ignite开发社区。 |
在Ignite集群中,部分事件会触发分区映射的交换过程以及数据的再平衡,来保证整个集群的数据分布,这个事件的一个例子就是集群拓扑变更事件,它会在新节点加入或者已有节点离开时触发,还有,新的缓存或者SQL表创建时,也会触发分区映射的交换。
当分区映射交换开始时,Ignite会在某个阶段拿到一个全局锁,在未完成的事务并行执行时无法获得锁,这些事务会阻止分区映射交换进程,从而阻断一些新节点加入进程这样的一些操作。
使用TransactionConfiguration.setTxTimeoutOnPartitionMapExchange(...)
方法,可以配置长期运行事务阻断分区映射交换的最大时间,时间一到,所有的未完成事务都会回滚,让分区映射交换进程先完成。
下面的示例显示如何配置超时时间:
// Create a configuration
IgniteConfiguration cfg = new IgniteConfiguration();
// Create a Transaction configuration
TransactionConfiguration txCfg = new TransactionConfiguration();
// Set the timeout to 20 seconds
txCfg.setTxTimeoutOnPartitionMapExchange(20000);
cfg.setTransactionConfiguration(txCfg);
// Start the node
Ignition.start(cfg);
和事务相关的指标信息,请参见事务监控章节的介绍。
关于如何追踪事务的信息,请参见追踪章节的介绍。
另外,还可以通过控制脚本获取事务的信息,或者取消集群中正在执行的事务。
Ignite是一个兼容于ANSI-99、可水平扩展和容错的分布式SQL数据库,根据使用场景,数据在整个集群中是以分区或者复制的模式进行分发。
作为SQL数据库,Ignite支持所有DML命令,包括SELECT、UPDATE、INSERT和DELETE语句,并且还实现了与分布式系统相关的DDL命令的子集。
在外部工具和应用中通过使用JDBC或ODBC驱动,可以像处理任何其他支持SQL的存储一样与Ignite交互。Java、.NET和C++开发者还可以使用原生SQL API。
在内部,SQL表具有与键-值缓存相同的数据结构,这意味着可以更改数据的分区分布,并利用关联并置技术获得更好的性能。
Ignite的SQL引擎使用H2数据库来解析和优化查询并生成执行计划。
对分区表的查询以分布式方式执行:
映射
查询和一个汇总
查询;也可以强制查询在本地进行处理,即在执行查询的节点上的数据子集上执行。
如果在复制表上执行查询,将会在本地数据上执行。
Ignite具有若干默认模式,并支持创建自定义模式。
默认两个模式可用:
SYS
模式:其中包含许多和集群各种信息有关的系统视图,不能在此模式中创建表,更多信息请参见系统视图章节的介绍;PUBLIC
模式:未指定模式时的默认模式。在以下场景中,可以创建自定义模式:
如果需要且未指定模式时,默认会使用PUBLIC
模式。例如,当通过JDBC接入集群而未显式设置模式时,就会使用PUBLIC
模式。
可以通过IgniteConfiguration
的sqlSchemas
属性设置自定义模式,启动集群之前在配置中指定模式列表,然后在运行时就可以在这些模式中创建对象。
下面是带有两个自定义模式的配置示例:
IgniteConfiguration cfg = new IgniteConfiguration();
SqlConfiguration sqlCfg = new SqlConfiguration();
sqlCfg.setSqlSchemas("MY_SCHEMA", "MY_SECOND_SCHEMA" );
cfg.setSqlConfiguration(sqlCfg);
要接入指定的模式,比如通过JDBC驱动,那么可以在连接串中指定模式名:
jdbc:ignite:thin://127.0.0.1/MY_SCHEMA
当创建带有可查询字段的缓存时,可以通过SQL API来对缓存的数据进行维护,在SQL层面,每个缓存对应一个独立的模式,模式的名字等同于缓存的名字。
简单来说,当通过SQL API创建了一个表,可以通过编程接口将其当做键-值缓存访问,而对应的缓存名,可以通过CREATE TABLE
语句的WITH
子句中的CACHE_NAME
参数进行指定:
CREATE TABLE City (
ID INT(11),
Name CHAR(35),
CountryCode CHAR(3),
District CHAR(20),
Population INT(11),
PRIMARY KEY (ID, CountryCode)
) WITH "backups=1, CACHE_NAME=City";
更多信息请参见CREATE TABLE章节的介绍。
如果未指定这个参数,缓存名定义为如下格式(大写格式):
SQL_<SCHEMA_NAME>_<TABLE_NAME>
除了常规的DDL命令,比如CREATE/DROP INDEX,开发者还可以使用SQL API来定义索引。
提示
索引的功能是通过ignite-indexing
模块提供的,所以如果通过Java代码启动Ignite,需要将这个模块加入类路径。
Ignite会自动为每个缓存的主键和关联键字段创建索引,当在值对象的字段上创建索引时,Ignite会创建一个由索引字段和主键字段组成的组合索引。在SQL的角度,该索引由2列组成:索引列和主键列。
具体请参见CREATE INDEX章节的内容。
索引和可查询字段,在代码上,可以通过@QuerySqlField
注解进行配置。在下面的示例中,Ignite的SQL引擎会在id
和salary
字段上创建索引:
public class Person implements Serializable {
/** Indexed field. Will be visible to the SQL engine. */
@QuerySqlField(index = true)
private long id;
/** Queryable field. Will be visible to the SQL engine. */
@QuerySqlField
private String name;
/** Will NOT be visible to the SQL engine. */
private int age;
/**
* Indexed field sorted in descending order. Will be visible to the SQL engine.
*/
@QuerySqlField(index = true, descending = true)
private float salary;
}
SQL查询中,类型名会被用作表名,这时,表名为Person
(使用的模式名和定义见模式章节的介绍)。
id
和salary
都是索引字段,id
为升序排列,而salary
为倒序排列。
如果不希望索引一个字段,但是希望在SQL查询中使用该列,那么该字段需要加上该注解,但是不需要index = true
参数,这样的字段叫做可查询字段,在上例中,name
定义为可查询字段。
age
字段既不是可查询字段,也不是一个索引字段,因此在SQL查询中是无法访问的。
定义索引字段后,还需要注册索引类型。
运行时更新索引和可查询字段
如果希望运行时管理索引或者对象字段的可见性,需要使用CREATE/DROP INDEX命令。
使用注解,对象的嵌套字段也可以被索引和查询。比如,考虑一个Person
对象内部有一个Address
对象:
public class Person {
/** Indexed field. Will be visible for SQL engine. */
@QuerySqlField(index = true)
private long id;
/** Queryable field. Will be visible for SQL engine. */
@QuerySqlField
private String name;
/** Will NOT be visible for SQL engine. */
private int age;
/** Indexed field. Will be visible for SQL engine. */
@QuerySqlField(index = true)
private Address address;
}
而Address
类的结构如下:
public class Address {
/** Indexed field. Will be visible for SQL engine. */
@QuerySqlField (index = true)
private String street;
/** Indexed field. Will be visible for SQL engine. */
@QuerySqlField(index = true)
private int zip;
}
在上面的示例中,Address
类的所有字段都加上了@QuerySqlField(index = true)
注解,Person
类的Address
对象,也加上了该注解。
这样就可以执行下面的SQL语句:
QueryCursor<List<?>> cursor = personCache.query(new SqlFieldsQuery( "select * from Person where street = 'street1'"));
注意在SQL语句的WHERE条件中不需要指定address.street
,这是因为Address
类的字段会被合并到Person
中,这样就可以简单地在查询中直接访问Address
中的字段。
警告
如果在嵌套对象上创建了索引,就不能在这个表上执行UPDATE或者INSERT语句。
定义索引和可查询字段之后,需要将它们及其所属的对象类型一起注册到SQL引擎中。
要指定应建立索引的类型,需要在CacheConfiguration.setIndexedTypes()
方法中传递相应的键-值对,如下例所示:
// Preparing configuration.
CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>();
// Registering indexed type.
ccfg.setIndexedTypes(Long.class, Person.class);
此方法仅接受成对的类型:一个键类,一个值类,基本类型需要用包装器类型传入。
预定义字段
除了用@QuerySqlField
注解标注的所有字段,每个表都有两个特别的预定义字段:_key
和_val
,它表示到整个键对象和值对象的引用。这很有用,比如当它们中的一个是基本类型并且希望用它的值进行过滤时,执行SELECT * FROM Person WHERE _key = 100
查询即可。
注意
因为有二进制编组器,不需要将索引类型类加入集群节点的类路径中,SQL引擎不需要对象反序列化就可以钻取索引和可查询字段的值。
当查询条件复杂时可以使用多字段索引来加快查询的速度,这时可以用@QuerySqlField.Group
注解。如果希望一个字段参与多个组合索引时也可以将多个@QuerySqlField.Group
注解加入orderedGroups
中。
比如,下面的Person
类中age
字段加入了名为age_salary_idx
的组合索引,它的分组序号是0并且降序排列,同一个组合索引中还有一个字段salary
,它的分组序号是3并且升序排列。最重要的是salary
字段还是一个单列索引(除了orderedGroups
声明之外,还加上了index = true
)。分组中的order
不需要是什么特别的数值,它只是用于分组内的字段排序。
public class Person implements Serializable {
/** Indexed in a group index with "salary". */
@QuerySqlField(orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 0, descending = true) })
private int age;
/** Indexed separately and in a group index with "age". */
@QuerySqlField(index = true, orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 3) })
private double salary;
}
注意
将@QuerySqlField.Group
放在@QuerySqlField(orderedGroups={...})
外面是无效的。
索引和字段也可以通过org.apache.ignite.cache.QueryEntity
进行配置,它便于利用Spring进行基于XML的配置。
在上面基于注解的配置中涉及的所有概念,对于基于QueryEntity
的方式也都有效,此外,如果类型的字段通过@QuerySqlField
进行了配置并且通过CacheConfiguration.setIndexedTypes
注册过的,在内部也会被转换为查询实体。
下面的示例显示的是如何定义单一字段索引、组合索引和可查询字段:
CacheConfiguration<Long, Person> cache = new CacheConfiguration<Long, Person>("myCache");
QueryEntity queryEntity = new QueryEntity();
queryEntity.setKeyFieldName("id").setKeyType(Long.class.getName()).setValueType(Person.class.getName());
LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("id", "java.lang.Long");
fields.put("name", "java.lang.String");
fields.put("salary", "java.lang.Long");
queryEntity.setFields(fields);
queryEntity.setIndexes(Arrays.asList(new QueryIndex("name"),
new QueryIndex(Arrays.asList("id", "salary"), QueryIndexType.SORTED)));
cache.setQueryEntities(Arrays.asList(queryEntity));
SQL查询中会使用valueType
的简称作为表名,这时,表名为Person
(模式名的用法和定义请参见理解模式章节的内容)。
QueryEntity定义之后,就可以执行下面的查询了:
SqlFieldsQuery qry = new SqlFieldsQuery("SELECT id, name FROM Person" + "WHERE id > 1500 LIMIT 10");
运行时更新索引和可查询字段
如果希望运行时管理索引或者对象字段的可见性,需要使用CREATE/DROP INDEX命令。
正确的索引内联值有助于增加索引字段上的查询速度,关于如何选择正确的内联值,请参见增加索引内联值章节的介绍。
大多数情况下,只需要为可变长度字段的索引设置内联值,比如字符串或者数组,默认值是10。
可通过如下方式修改默认值:
CacheConfiguration.sqlIndexMaxInlineSize
属性为缓存内的所有索引配置内联值;IGNITE_MAX_INDEX_PAYLOAD_SIZE
系统属性为集群内的所有索引配置内联值。配置将按照上面的顺序依次生效。
可以为每个索引单独配置内联值,这会覆盖默认值。如果要为开发者定义的索引设置内联值,可以用下面的方法之一,该值以字节数为单位。
注解方式
@QuerySqlField(index = true, inlineSize = 13)
private String country;
QueryEntity方式
QueryIndex idx = new QueryIndex("country");
idx.setInlineSize(13);
queryEntity.setIndexes(Arrays.asList(idx));
CREATE INDEX命令
如果使用的是CREATE INDEX
命令,那么可以使用INLINE_SIZE
选项来配置内联值:
create index country_idx on Person (country) INLINE_SIZE 13;
如果只使用预定义的SQL数据类型作为缓存键,那么就没必要对和DML相关的配置做额外的操作,这些数据类型在GridQueryProcessor#SQL_TYPES
常量中进行定义,列举如下:
char
和Character
;String
;BigDecimal
;byte[]
;java.util.Date
,?java.sql.Date
,?java.sql.Timestamp
;java.util.UUID
。不过如果决定引入复杂的自定义缓存键,那么在DML语句中要指向这些字段就需要:
QueryEntity
中定义这些字段,与在值对象中配置字段一样;QueryEntitty.setKeyFields(..)
来对键和值进行区分。下面的例子展示了如何实现:
// Preparing cache configuration.
CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<Long, Person>("personCache");
// Creating the query entity.
QueryEntity entity = new QueryEntity("CustomKey", "Person");
// Listing all the queryable fields.
LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("intKeyField", Integer.class.getName());
fields.put("strKeyField", String.class.getName());
fields.put("firstName", String.class.getName());
fields.put("lastName", String.class.getName());
entity.setFields(fields);
// Listing a subset of the fields that belong to the key.
Set<String> keyFlds = new HashSet<>();
keyFlds.add("intKeyField");
keyFlds.add("strKeyField");
entity.setKeyFields(keyFlds);
// End of new settings, nothing else here is DML related
entity.setIndexes(Collections.<QueryIndex>emptyList());
cacheCfg.setQueryEntities(Collections.singletonList(entity));
ignite.createCache(cacheCfg);
哈希值自动计算和equals实现
如果自定义键可以被序列化为二进制形式,那么Ignite会自动进行哈希值的计算并且实现equals
方法。
但是,如果键类型是Externalizable
类型,那么就无法序列化为二进制形式,那么就需要自行实现hashCode
和equals
方法,具体请参见使用二进制对象章节的介绍。
除了使用JDBC驱动,Java开发者还可以使用Ignite的SQL API来访问和修改Ignite中存储的数据。
SqlFieldsQuery
类是执行SQL查询和处理结果集的接口,SqlFieldsQuery
通过IgniteCache.query(SqlFieldsQuery)
方法执行,然后会返回一个游标。
如果希望使用SQL语句来查询缓存,需要定义值对象的哪些字段是可查询的,可查询字段是数据模型中SQL引擎可以处理的字段。
提示
如果使用JDBC或者SQL工具建表,则不需要定义可查询字段。
提示
索引的功能是通过ignite-indexing
模块提供的,所以如果通过Java代码启动Ignite,需要将这个模块加入类路径。
在Java中,可查询字段可以通过两种方式来定义:
要让某个字段可查询,需要在值类定义的对应字段上加注@QuerySqlField
注解,然后调用CacheConfiguration.setIndexedTypes(…?)
方法。
class Person implements Serializable {
/** Indexed field. Will be visible to the SQL engine. */
@QuerySqlField(index = true)
private long id;
/** Queryable field. Will be visible to the SQL engine. */
@QuerySqlField
private String name;
/** Will NOT be visible to the SQL engine. */
private int age;
/**
* Indexed field sorted in descending order. Will be visible to the SQL engine.
*/
@QuerySqlField(index = true, descending = true)
private float salary;
}
public static void main(String[] args) {
Ignite ignite = Ignition.start();
CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
personCacheCfg.setName("Person");
personCacheCfg.setIndexedTypes(Long.class, Person.class);
IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}
可以通过QueryEntity
类来定义可查询字段,查询实体可以通过XML来配置:
class Person implements Serializable {
private long id;
private String name;
private int age;
private float salary;
}
public static void main(String[] args) {
Ignite ignite = Ignition.start();
CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
personCacheCfg.setName("Person");
QueryEntity queryEntity = new QueryEntity(Long.class, Person.class)
.addQueryField("id", Long.class.getName(), null).addQueryField("age", Integer.class.getName(), null)
.addQueryField("salary", Float.class.getName(), null)
.addQueryField("name", String.class.getName(), null);
queryEntity.setIndexes(Arrays.asList(new QueryIndex("id"), new QueryIndex("salary", false)));
personCacheCfg.setQueryEntities(Arrays.asList(queryEntity));
IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}
要在缓存上执行查询,简单地创建一个SqlFieldsQuery
对象,将查询字符串传给构造方法,然后执行cache.query(…?)
即可。注意在下面的示例中,Person缓存必须配置为对SQL引擎可见。
IgniteCache<Long, Person> cache = ignite.cache("Person");
SqlFieldsQuery sql = new SqlFieldsQuery(
"select concat(firstName, ' ', lastName) from Person");
// Iterate over the result set.
try (QueryCursor<List<?>> cursor = cache.query(sql)) {
for (List<?> row : cursor)
System.out.println("personName=" + row.get(0));
}
SqlFieldsQuery
会返回一个游标,然后可以用游标来迭代匹配SQL查询的结果集。
如果要强制一个查询在本地执行,可以使用SqlFieldsQuery.setLocal(true)
方法。这时,查询是在执行查询的节点的本地数据上执行,这意味着查询的结果集是不完整的,所以使用这个模式前要了解这个限制。
INSERT
、MERGE
语句中的SELECT
查询,以及由UPDATE
和DELETE
操作生成的SELECT
查询也是分布式的,可以以并置或非并置的模式执行。
但是,如果WHERE
子句中有一个子查询,那么其只能以并置的方式执行。
比如,考虑下面的查询:
DELETE FROM Person WHERE id IN
(SELECT personId FROM Salary s WHERE s.amount > 2000);
SQL引擎会生成一个SELECT
查询,来获取要删除的条目列表。该查询是分布式的,在整个集群中执行,大致如下:
SELECT _key, _val FROM Person WHERE id IN
(SELECT personId FROM Salary s WHERE s.amount > 2000);
但是,IN
子句中的子查询(SELECT personId FROM Salary …?
)并不是分布式的,只能在节点的本地可用数据集上执行。
使用SqlFieldsQuery
可以执行DML命令来修改数据:
IgniteCache<Long, Person> cache = ignite.cache("personCache");
cache.query(
new SqlFieldsQuery("INSERT INTO Person(id, firstName, lastName) VALUES(?, ?, ?)")
.setArgs(1L, "John", "Smith"))
.getAll();
当使用SqlFieldsQuery
来执行DDL语句时,必须调用query(…?)
方法返回的游标的getAll()
方法。
通过SqlFieldsQuery
执行的任何SELECT
语句,默认都是在PUBLIC
模式下解析的。但是如果表不在这个模式下,需要调用SqlFieldsQuery.setSchema(…?)
来指定模式,这样语句就在指定的模式下执行了。
SqlFieldsQuery sql = new SqlFieldsQuery("select name from City").setSchema("PERSON");
另外,也可以在语句中指定模式:
SqlFieldsQuery sql = new SqlFieldsQuery("select name from Person.City");
可以向SqlFieldsQuery
传递任何受支持的DDL语句,如下所示:
IgniteCache<Long, Person> cache = ignite
.getOrCreateCache(new CacheConfiguration<Long, Person>().setName("Person"));
// Creating City table.
cache.query(new SqlFieldsQuery(
"CREATE TABLE City (id int primary key, name varchar, region varchar)")).getAll();
在SQL模式方面,上述代码的执行结果,创建了下面的表:
Person
模式中的Person
表(如果之前未创建);Person
模式中的City
表。要查询City
表,可以使用两种方式:select * from Person.City
或new SqlFieldsQuery("select * from City").setSchema("PERSON")
(注意大写)。
有两种方式可以取消长时间运行的查询。
第一种方式是设置查询执行超时:
SqlFieldsQuery query = new SqlFieldsQuery("SELECT * from Person");
// Setting query execution timeout
query.setTimeout(10_000, TimeUnit.SECONDS);
第二个方式是调用QueryCursor.close()
来终止查询:
SqlFieldsQuery query = new SqlFieldsQuery("SELECT * FROM Person");
// Executing the query
QueryCursor<List<?>> cursor = cache.query(query);
// Halting the query that might be still in progress.
cursor.close();
Ignite的源代码中有一个直接可以运行的SqlDmlExample
,其演示了所有上面提到过的DML操作的使用。
分布式关联是指SQL语句中通过关联子句组合了两个或者更多的分区表,如果这些表关联在分区列(关联键)上,该关联称为并置关联,否则称为非并置关联。
并置关联更高效,因为其可以高效地在集群节点间分布。
Ignite默认将每个关联查询都视为并置关联,并按照并置的模式执行。
警告
如果查询是非并置的,需要通过SqlFieldsQuery.setDistributedJoins(true)
来开启查询执行的非并置模式,否则查询的结果集会是不正确的。
警告
如果经常关联表,那么建议将表在同一个列(关联表的列)上进行分区。
非并置的关联仅适用于无法使用并置关联的场景。
下图解释了并置关联的执行过程,一个并置关联(Q
)会被发给存储与查询条件匹配的数据的所有节点,然后查询在每个节点的本地数据集上执行(E(Q)
),结果集(R
)会在查询的发起节点(客户端节点)聚合:
如果以非并置模式执行查询,则SQL引擎将在存储与查询条件匹配的数据的所有节点上本地执行查询。但是因为数据不是并置的,所以每个节点将通过发送广播或单播请求从其他节点拉取缺失的数据(本地不存在),下图描述了此过程:
如果关联是在主键或关联键上,则节点将发送单播请求,因为这时节点知道缺失数据的位置。否则节点将发送广播请求。出于性能原因,广播和单播请求都被汇总为批次。
通过设置JDBC/ODBC参数,或通过调用SqlFieldsQuery.setDistributedJoins(true)
使用SQL API,可以启用非并置查询执行模式。
警告
如果对复制表中的列使用非并置关联,则该列必须有索引。否则会抛出异常。
警告
支持SQL事务当前处于测试阶段,生产环境建议使用键-值事务。
配置为TRANSACTIONAL_SNAPSHOT
原子化模式的缓存支持SQL事务。TRANSACTIONAL_SNAPSHOT
模式是Ignite缓存的多版本并发控制(MVCC)实现,关于MVCC的更多信息以及当前的限制,请参见多版本并发控制章节的内容。
关于Ignite支持的事务语法,请参见事务章节的内容。
在缓存配置中使用TRANSACTIONAL_SNAPSHOT
原子化模式可以为缓存开启MVCC,如果使用CREATE TABLE
命令建表,可以在命令的WITH
子句中指定原子化模式参数。
CacheConfiguration cacheCfg = new CacheConfiguration<>();
cacheCfg.setName("myCache");
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
TRANSACTIONAL_SNAPSHOT
模式是缓存级的,因此不允许在一个事务中的缓存具有不同的原子化模式,如果要在一个事务中覆盖多张表,那么所有的相关表都要使用TRANSACTIONAL_SNAPSHOT
模式创建。
通过JDBC/ODBC连接参数,Ignite支持三种模式用于处理嵌套的SQL事务。
JDBC连接串示例:
jdbc:ignite:thin://127.0.0.1/?nestedTransactionsMode=COMMIT
当事务中发生了嵌套的事务,系统的行为取决于nestedTransactionsMode
参数:
ERROR
:如果遇到嵌套事务,会抛出错误并且包含的事务会回滚,这是默认的行为;COMMIT
:包含事务会被挂起,嵌套事务启动后如果遇到COMMIT语句会被提交。包含事务中的其余语句会作为隐式事务执行;IGNORE
:不要使用这个模式,嵌套事务的开始会被忽略,嵌套事务中的语句会作为包含事务的一部分执行,并且随着嵌套事务的提交而提交所有的变更,包含事务的剩余语句会作为隐式事务执行。Ignite的SQL引擎支持通过额外用Java编写的自定义SQL函数,来扩展ANSI-99规范定义的SQL函数集。
一个自定义SQL函数仅仅是一个加注了@QuerySqlFunction
注解的公共静态方法。
// Defining a custom SQL function.
public class MyFunctions {
@QuerySqlFunction
public static int sqr(int x) {
return x * x;
}
}
持有自定义SQL函数的类需要使用setSqlFunctionClasses(...)
方法在某个CacheConfiguration
中注册。
// Preparing a cache configuration.
CacheConfiguration cfg = new CacheConfiguration();
// Registering the class that contains custom SQL functions.
cfg.setSqlFunctionClasses(MyFunctions.class);
经过了上述配置的缓存部署之后,在SQL查询中就可以调用自定义函数了,如下所示:
// Preparing the query that uses customly defined 'sqr' function.
SqlFieldsQuery query = new SqlFieldsQuery(
"SELECT name FROM Blocks WHERE sqr(size) > 100");
// Executing the query.
cache.query(query).getAll();
类注册
在自定义SQL函数可能要执行的所有节点上,通过CacheConfiguration.setSqlFunctionClasses(...)
注册的类都需要添加到类路径中,否则在自定义函数执行时会抛出ClassNotFoundException
异常。
Ignite提供了JDBC驱动,可以通过标准的SQL语句处理分布式数据,比如从JDBC端直接进行SELECT
、INSERT
、UPDATE
和DELETE
。
目前,Ignite支持两种类型的驱动,轻量易用的JDBC Thin模式驱动以及以客户端节点形式与集群进行交互的JDBC客户端驱动。
JDBC Thin模式驱动是Ignite提供的默认轻量级驱动,要使用这种驱动,只需要将ignite-core-{version}.jar
加入应用的类路径即可。
驱动会接入集群的一个节点然后将所有的请求转发给它进行处理。节点会处理分布式的查询以及结果集的汇总,然后将结果集反馈给客户端应用。
JDBC连接串可以有两种模式:URL查询模式以及分号模式:
// URL query pattern
jdbc:ignite:thin://<hostAndPortRange0>[,<hostAndPortRange1>]...[,<hostAndPortRangeN>][/schema][?<params>]
hostAndPortRange := host[:port_from[..port_to]]
params := param1=value1[¶m2=value2]...[¶mN=valueN]
// Semicolon pattern
jdbc:ignite:thin://<hostAndPortRange0>[,<hostAndPortRange1>]...[,<hostAndPortRangeN>][;schema=<schema_name>][;param1=value1]...[;paramN=valueN]
host
:必需,它定义了要接入的集群节点主机地址;port_from
:打开连接的端口范围的起始点,如果忽略此参数默认为10800
;port_to
:可选,如果忽略此参数则等同于port_from
;schema
:要访问的模式名,默认是PUBLIC
,这个名字对应于SQL的ANSI-99标准,不加引号是大小写不敏感的,加引号是大小写敏感的。如果使用了分号模式,模式可以通过参数名schema
定义;<params>
:可选。驱动类名为org.apache.ignite.IgniteJdbcThinDriver
,比如,下面就是如何打开到集群节点的连接,监听地址为192.168.0.50:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
// Open the JDBC connection.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://192.168.0.50");
如果通过bash接入则JDBC URL需要加引号
如果通过bash环境接入,则连接URL需要加" "
,比如:"jdbc:ignite:thin://[address]:[port];user=[username];password=[password]"
下表列出了JDBC连接串支持的所有参数:
属性名 | 描述 | 默认值 |
---|---|---|
user | SQL连接的用户名,如果服务端开启了认证则此参数为必需。关于如何开启认证和创建用户,可以分别参见认证和创建用户的文档。 | ignite |
password | SQL连接的密码,如果服务端开启了认证则此参数为必需。关于如何开启认证和创建用户,可以分别参见认证和创建用户的文档。 | ignite |
distributedJoins | 对于非并置数据是否使用分布式关联 | false |
enforceJoinOrder | 是否在查询中强制表的关联顺序,如果配置为true ,查询优化器在关联中不会对表进行重新排序。 | false |
collocated | 如果SQL语句包含按主键或关联键对结果集进行分组的GROUP BY子句,可以将此参数设置为true。当Ignite执行分布式查询时,会向单个集群节点发送子查询,如果事先知道待查询的数据是在同一个节点上并置在一起的,并且是按主键或关联键分组的,那么Ignite通过在参与查询的每个节点本地分组数据来实现显著的性能和网络优化。 | false |
replicatedOnly | 查询是否只包含复制表,这是一个潜在的可能提高性能的提示。 | false |
autoCloseServerCursor | 当拿到最后一个结果集时是否自动关闭服务端游标。开启之后,对ResultSet.close() 的调用就不需要网络访问,这样会改进性能。但是,如果服务端游标已经关闭,在调用ResultSet.getMetadata() 方法时会抛出异常,这时为什么默认值为false 的原因。 | false |
partitionAwareness | 启用分区感知模式,该模式中,驱动会尝试确定要查询的数据所在的节点,然后把请求发给这些节点。 | false |
partitionAwarenessSQLCacheSize | 驱动为优化而在本地保留的不同SQL查询数。当第一次执行查询时,驱动会接收正在查询的表的分区分布,并将其保存以备将来在本地使用。下次查询此表时,驱动使用该分区分布来确定要查询的数据的位置,以便将查询直接发送到正确的节点。当集群拓扑发生变更时,此包含SQL查询的本地存储将失效。此参数的最佳值应等于要执行的不同SQL查询的数量。 | 1000 |
partitionAwarenessPartitionDistributionsCacheSize | 表示分区分布的不同对象的数量,驱动在本地保留以进行优化。具体请参见partitionAwarenessSQLCacheSize 参数的说明。当集群拓扑发生变更时,持有分区分布对象的本地存储将失效。此参数的最佳值应等于要在查询中使用的不同表(缓存组)的数量。 | 1000 |
socketSendBuffer | 发送套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 | 0 |
socketReceiveBuffer | 接收套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 | 0 |
tcpNoDelay | 是否使用TCP_NODELAY 选项。 | true |
lazy | 查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端。对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停甚至OutOfMemoryError ,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 | false |
skipReducerOnUpdate | 开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总方),然后会准备一个更新值的批次发给远程节点。这个方式可能影响性能,如果一个DML操作需要移动大量数据时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 | false |
关于和安全有关的参数,请参见使用SSL章节的介绍。
jdbc:ignite:thin://myHost
:接入myHost
,其它比如端口为10800
等都是默认值;jdbc:ignite:thin://myHost:11900
:接入myHost
,自定义端口为11900
,其它为默认值;jdbc:ignite:thin://myHost:11900;user=ignite;password=ignite
:接入myHost
,自定义端口为11900
,并且带有用于认证的用户凭据;jdbc:ignite:thin://myHost:11900;distributedJoins=true&autoCloseServerCursor=true
:接入myHost
,自定义端口为11900
,开启了分布式关联和autoCloseServerCursor
优化;jdbc:ignite:thin://myHost:11900/myschema;
:接入myHost
,自定义端口为11900
,模式为MYSCHEMA
;jdbc:ignite:thin://myHost:11900/"MySchema";lazy=false
:接入myHost
,自定义端口为11900
,模式为MySchema
(模式名区分大小写),并且禁用了查询的延迟执行。在连接串中配置多个连接端点也是可以的,这样如果连接中断会开启自动故障转移,JDBC驱动会从列表中随机选择一个地址接入。如果之前的连接中断,驱动会选择另一个地址直到连接恢复,如果所有的端点都不可达,JDBC会停止重连并且抛出异常。
下面的示例会显示如何通过连接串传递3个地址:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
// Open the JDBC connection passing several connection endpoints.
Connection conn = DriverManager.getConnection(
"jdbc:ignite:thin://192.168.0.50:101,192.188.5.40:101, 192.168.10.230:101");
警告
分区感知是一个试验性特性,API和设计架构在正式发布之前可能会变更。
分区感知是一个可使JDBC驱动“感知”集群中分区分布的功能。它使得驱动可以选择持有待查询数据的节点,并将查询直接发送到那些节点(如果在驱动的配置中提供了节点的地址)。分区感知可以提高使用关联键的查询的平均性能。
没有分区感知时,JDBC驱动将连接到某个节点,然后所有查询都通过该节点执行。如果数据分布在其他节点上,则必须在集群内重新路由查询,这会增加一个额外的网络波动。分区感知通过将查询直接发送到正确的节点来消除该波动。
要使用分区感知功能,需要在连接属性中提供所有服务端节点的地址,驱动会将请求直接发送到存储查询所请求数据的节点。
警告
注意,当前需要在连接属性中提供所有服务端节点的地址,因为在打开连接后驱动不会自动加载它们。这意味着如果新的服务端节点加入集群,需要将节点的地址添加到连接属性中,然后重新连接驱动,否则驱动将无法直接向该节点发送请求。
要开启分区感知,需要将partitionAwareness=true
参数添加到连接串中,然后提供多个服务端节点的地址。
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
Connection conn = DriverManager
.getConnection("jdbc:ignite:thin://192.168.0.50,192.188.5.40,192.168.10.230?partitionAwareness=true");
提示
分区感知功能只能使用默认的关联函数。
为了接收和处理来自JDBC Thin驱动转发过来的请求,一个节点需要绑定到一个本地网络端口10800
,然后监听入站请求。
通过ClientConnectorConfiguration
,可以对参数进行修改:
IgniteConfiguration cfg = new IgniteConfiguration()
.setClientConnectorConfiguration(new ClientConnectorConfiguration());
其支持如下的参数:
参数名 | 描述 | 默认值 |
---|---|---|
host | 绑定的主机名或者IP地址,如果配置为null ,会使用localHost 。 | null |
port | 绑定的TCP端口,如果指定的端口已被占用,Ignite会使用portRange 属性来查找其它可用的端口。 | 10800 |
portRange | 定义尝试绑定的端口数量,比如,如果端口配置为10800 并且端口范围为100 ,Ignite会从10800开始,在[10800,10900]范围内查找可用端口。 | 100 |
maxOpenCursorsPerConnection | 每个连接打开的服务端游标的最大数量。 | 128 |
threadPoolSize | 线程池中负责请求处理的线程数量。 | max(8,CPU核数) |
socketSendBufferSize | TCP套接字发送缓冲区大小,如果配置为0 ,会使用操作系统默认值。 | 0 |
socketReceiveBufferSize | TCP套接字接收缓冲区大小,如果配置为0 ,会使用操作系统默认值。 | 0 |
tcpNoDelay | 是否使用TCP_NODELAY 选项。 | true |
idleTimeout | 客户端连接空闲超时时间。在空闲超过配置的超时时间后,客户端与服务端的连接会断开。如果该参数配置为0或者负值,空闲超时会被禁用。 | 0 |
isJdbcEnabled | 是否允许JDBC访问。 | true |
isThinClientEnabled | 是否允许瘦客户端访问。 | true |
sslEnabled | 如果开启SSL,只允许SSL客户端连接。一个节点只允许一种连接模式:SSL或普通,一个节点无法同时接收两种模式的客户端连接,但是这个参数集群中的各个节点可以不同。 | false |
useIgniteSslContextFactory | 在Ignite配置中是否使用SSL上下文工厂(具体可以看IgniteConfiguration.sslContextFactory )。 | true |
sslClientAuth | 是否需要客户端认证。 | false |
sslContextFactory | 提供节点侧SSL的Factory<SSLContext> 实现的类名。 | null |
JDBC Thin模式驱动并非线程安全
JDBC对象中的Connection
、Statement
和ResultSet
不是线程安全的。因此不能在多线程中使用一个JDBC连接的Statement和ResultSet。 JDBC Thin模式驱动防止并发,如果检测到了并发访问,那么会抛出SQLException
,消息为:Concurrent access to JDBC connection is not allowed [ownThread=<guard_owner_thread_name>,curThread=<current_thread_name>]",SQLSTATE="08006
。
JDBC Thin模式驱动可以使用SSL来保护与集群之间的通信,集群端和驱动端必须同时配置SSL,集群配置方面,请参见瘦客户端和JDBC/ODBC的SSL/TLS章节的介绍。
JDBC驱动中开启SSL,需要在连接串中传递sslMode=require
参数,并且提供密钥库和信任库参数:
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
String keyStore = "keystore/node.jks";
String keyStorePassword = "123456";
String trustStore = "keystore/trust.jks";
String trustStorePassword = "123456";
try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1?sslMode=require"
+ "&sslClientCertificateKeyStoreUrl=" + keyStore + "&sslClientCertificateKeyStorePassword="
+ keyStorePassword + "&sslTrustCertificateKeyStoreUrl=" + trustStore
+ "&sslTrustCertificateKeyStorePassword=" + trustStorePassword)) {
ResultSet rs = conn.createStatement().executeQuery("select 10");
rs.next();
System.out.println(rs.getInt(1));
} catch (Exception e) {
e.printStackTrace();
}
下表列出了和SSL/TLS连接有关的参数:
参数名 | 描述 | 默认值 |
---|---|---|
sslMode | 开启SSL连接。可用的模式为:1.require :在客户端开启SSL协议,只有SSL连接才可以接入。2.disable :在客户端禁用SSL协议,只支持普通连接。 | disable |
sslProtocol | 安全连接的协议名,如果未指定,会使用TLS协议。协议实现由JSSE提供:SSLv3 (SSL), TLSv1 (TLS), TLSv1.1, TLSv1.2 | TLS |
sslKeyAlgorithm | 用于创建密钥管理器的密钥管理器算法。注意多数情况使用默认值即可。算法实现由JSSE提供:PKIX (X509或SunPKIX), SunX509 | |
sslClientCertificateKeyStoreUrl | 客户端密钥存储库文件的url,这是个强制参数,因为没有密钥管理器SSL上下文无法初始化。如果sslMode 为require 并且未通过属性文件指定密钥存储库 URL,那么会使用JSSE属性javax.net.ssl.keyStore 的值。 | JSSE系统属性javax.net.ssl.keyStore 的值。 |
sslClientCertificateKeyStorePassword | 客户端密钥存储库密码。如果sslMode 为require 并且未通过属性文件指定密钥存储库密码,那么会使用JSSE属性javax.net.ssl.keyStorePassword 的值。 | JSSE属性javax.net.ssl.keyStorePassword 的值。 |
sslClientCertificateKeyStoreType | 用于上下文初始化的客户端密钥存储库类型。如果sslMode 为require 并且未通过属性文件指定密钥存储库类型,那么会使用JSSE属性javax.net.ssl.keyStoreType 的值。 | JSSE属性javax.net.ssl.keyStoreType 的值,如果属性未定义,默认值为JKS。 |
sslTrustCertificateKeyStoreUrl | truststore文件的URL。这是个可选参数,但是sslTrustCertificateKeyStoreUrl 和sslTrustAll 必须配置一个。如果sslMode 为require 并且未通过属性文件指定truststore文件URL,那么会使用JSSE属性javax.net.ssl.trustStore 的值。 | JSSE系统属性javax.net.ssl.trustStore 的值。 |
sslTrustCertificateKeyStorePassword | truststore密码。如果sslMode 为require 并且未通过属性文件指定truststore密码,那么会使用JSSE属性javax.net.ssl.trustStorePassword 的值。 | JSSE系统属性javax.net.ssl.trustStorePassword 的值。 |
sslTrustCertificateKeyStoreType | truststore类型。如果sslMode 为require 并且未通过属性文件指定truststore类型,那么会使用JSSE属性javax.net.ssl.trustStoreType 的值。 | JSSE系统属性javax.net.ssl.trustStoreType 的值。如果属性未定义,默认值为JKS。 |
sslTrustAll | 禁用服务端的证书验证。配置为true 信任任何服务端证书(撤销的、过期的或者自签名的SSL证书)。注意,如果不能完全信任网络(比如公共互联网),不要在生产中启用该选项。 | false |
sslFactory | Factory<SSLSocketFactory> 的自定义实现的类名,如果sslMode 为require 并且指定了该工厂类,自定义的工厂会替换JSSE的默认值,这时其它的SSL属性也会被忽略。 | null |
默认实现基于JSSE,并且需要处理两个Java密钥库文件。
sslClientCertificateKeyStoreUrl
:客户端认证密钥库文件,其持有客户端的密钥和证书;sslTrustCertificateKeyStoreUrl
:可信证书密钥库文件,包含用于验证服务器证书的证书信息。信任库是可选参数,但是sslTrustCertificateKeyStoreUrl
或者sslTrustAll
必须配置两者之一。
使用`sslTrustAll`参数
如果生产环境位于不完全可信网络(尤其是公共互联网),不要开启此选项。
如果希望使用自己的实现或者通过某种方式配置SSLSocketFactory
,可以使用驱动的sslFactory
参数,这是一个包含Factory<SSLSocketFactory>
接口实现的类名字符串,该类对于JDBC驱动的类加载器必须可用。
DataSource
对象可用作部署对象,其可以通过JNDI命名服务按逻辑名定位。Ignite JDBC驱动的org.apache.ignite.IgniteJdbcThinDataSource
实现了JDBC的DataSource
接口,这样就可以使用DataSource
接口了。
除了通用的DataSource属性外,IgniteJdbcThinDataSource
还支持所有可以传递给JDBC连接字符串的Ignite特有属性。例如,distributedJoins
属性可以通过IgniteJdbcThinDataSource#setDistributedJoins()
方法进行调整。
具体请参见IgniteJdbcThinDataSource的javadoc。
要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection
对象:
// Open the JDBC connection via DriverManager.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://192.168.0.50");
或者:
// Or open connection via DataSource.
IgniteJdbcThinDataSource ids = new IgniteJdbcThinDataSource();
ids.setUrl("jdbc:ignite:thin://127.0.0.1");
ids.setDistributedJoins(true);
Connection conn = ids.getConnection();
之后就可以执行SELECT
SQL查询了:
// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
stmt.setInt(1, 30);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String name = rs.getString("name");
int age = rs.getInt("age");
// ...
}
此外,可以使用DML语句对数据进行修改。
// Insert a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.execute();
// Merge a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.executeUpdate();
// Update a Person.
conn.createStatement().
executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");
conn.createStatement().execute("DELETE FROM Person WHERE age = 25");
Ignite的JDBC驱动可以通过SET STREAMING
命令对流化数据进行批量处理,具体可以看SET STREAMING的相关内容。
Ignite的JDBC驱动将错误码封装进了java.sql.SQLException
类,它简化了应用端的错误处理。可以使用java.sql.SQLException.getSQLState()
方法获取错误码,该方法会返回一个包含预定义ANSI SQLSTATE错误码的字符串:
PreparedStatement ps;
try {
ps = conn.prepareStatement("INSERT INTO Person(id, name, age) values (1, 'John', 'unparseableString')");
} catch (SQLException e) {
switch (e.getSQLState()) {
case "0700B":
System.out.println("Conversion failure");
break;
case "42000":
System.out.println("Parsing error");
break;
default:
System.out.println("Unprocessed error: " + e.getSQLState());
break;
}
}
下表中列出了Ignite目前支持的所有ANSI SQLSTATE错误码,未来这个列表可能还会扩展:
代码 | 描述 |
---|---|
0700B | 转换失败(比如,一个字符串表达式无法解析成数值或者日期) |
0700E | 无效的事务隔离级别 |
08001 | 驱动接入集群失败 |
08003 | 连接意外地处于关闭状态 |
08004 | 连接被集群拒绝 |
08006 | 通信中发生I/O错误 |
22004 | 不允许的空值 |
22023 | 不支持的参数类型 |
23000 | 违反了数据完整性约束 |
24000 | 无效的结果集状态 |
0A000 | 不支持的操作 |
40001 | 并发更新冲突,具体请参见并发更新章节的介绍。 |
42000 | 查询解析异常 |
50000 | Ignite内部错误,这个代码不是ANSI定义的,属于Ignite特有的错误,获取java.sql.SQLException 的错误信息可以了解更多的细节 |
JDBC客户端节点模式驱动使用客户端节点连接接入集群,这要求开发者提供一个完整的Spring XML配置作为JDBC连接串的一部分,然后拷贝下面所有的jar文件到应用或者SQL工具的类路径中:
{IGNITE_HOME}\libs
目录下的所有jar文件;{IGNITE_HOME}\ignite-indexing
和{IGNITE_HOME}\ignite-spring
目录下的所有jar文件;这个驱动很重,而且可能不支持Ignite的最新SQL特性,但是因为它底层使用客户端节点连接,它可以执行分布式查询,然后在应用端直接对结果进行汇总。
JDBC连接URL的规则如下:
jdbc:ignite:cfg://[<params>@]<config_url>
其中:
<config_url>
是必需的,表示指向Ignite客户端节点配置文件的任意合法URL,当驱动试图建立到集群的连接时,这个节点会在Ignite JDBC客户端节点驱动中启动;<params>
是可选的,格式如下:param1=value1:param2=value2:...:paramN=valueN
驱动类名为org.apache.ignite.IgniteJdbcDriver
,比如下面的代码,展示了如何打开一个到集群的JDBC连接:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");
安全连接
关于如何保护JDBC客户端驱动的更多信息,请参见高级安全的相关文档。
属性 | 描述 | 默认值 |
---|---|---|
cache | 缓存名,如果未定义会使用默认的缓存,区分大小写 | |
nodeId | 要执行的查询所在节点的Id,对于在本地查询是有用的 | |
local | 查询只在本地节点执行,这个参数和nodeId 参数都是通过指定节点来限制数据集 | false |
collocated | 优化标志,当Ignite执行一个分布式查询时,它会向单个的集群节点发送子查询,如果提前知道要查询的数据已经被并置到同一个节点,Ignite会有显著的性能提升和拓扑优化 | false |
distributedJoins | 可以在非并置的数据上使用分布式关联。 | false |
streaming | 通过INSERT 语句为本链接开启批量数据加载模式,具体可以参照后面的流模式 相关章节。 | false |
streamingAllowOverwrite | 通知Ignite对于重复的已有键,覆写它的值而不是忽略它们,具体可以参照后面的流模式 相关章节。 | false |
streamingFlushFrequency | 超时时间,毫秒,数据流处理器用于刷新数据,数据默认会在连接关闭时刷新,具体可以参照后面的流模式 相关章节。 | 0 |
streamingPerNodeBufferSize | 数据流处理器的每节点缓冲区大小,具体可以参照后面的流模式 相关章节。 | 1024 |
streamingPerNodeParallelOperations | 数据流处理器的每节点并行操作数。具体可以参照后面的流模式 相关章节。 | 16 |
transactionsAllowed | 目前已经支持了ACID事务,但是仅仅在键-值API层面,在SQL层面Ignite支持原子性,还不支持事务一致性,这意味着使用这个功能的时候驱动可能抛出Transactions are not supported 这样的异常。但是,有时需要使用事务语法(即使不需要事务语义),比如一些BI工具会一直强制事务行为,也需要将该参数配置为true 以避免异常。 | false |
multipleStatementsAllowed | JDBC驱动可以同时处理多个SQL语句并且返回多个ResultSet 对象,如果该参数为false,多个语句的查询会返回错误。 | false |
lazy | 查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError ,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 | false |
skipReducerOnUpdate | 开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 | false |
使用JDBC驱动,可以以流模式(批处理模式)将数据注入Ignite集群。这时驱动会在内部实例化IgniteDataStreamer
然后将数据传给它。要激活这个模式,可以在JDBC连接串中增加streaming
参数并且设置为true
:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Opening connection in the streaming mode.
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true@file:///etc/config/ignite-jdbc.xml");
目前,流模式只支持INSERT操作,对于想更快地将数据预加载进缓存的场景非常有用。JDBC驱动定义了多个连接参数来影响流模式的行为,这些参数已经在上述的参数表中列出。
缓存名
确保在JDBC连接字符串中通过cache=
参数为流操作指定目标缓存。如果未指定缓存或缓存与流式DML语句中使用的表不匹配,则更新会被忽略。
这些参数几乎覆盖了IgniteDataStreamer
的所有常规配置,这样就可以根据需要更好地调整流处理器。关于如何配置流处理器可以参考流处理器的相关文档来了解更多的信息。
基于时间的刷新
默认情况下,当要么连接关闭,要么达到了streamingPerNodeBufferSize
,数据才会被刷新,如果希望按照时间的方式来刷新,那么可以调整streamingFlushFrequency
参数。
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Opening a connection in the streaming mode and time based flushing set.
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://streaming=true:streamingFlushFrequency=1000@file:///etc/config/ignite-jdbc.xml");
PreparedStatement stmt = conn.prepareStatement(
"INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
// Adding the data.
for (int i = 1; i < 100000; i++) {
// Inserting a Person object with a Long key.
stmt.setInt(1, i);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.execute();
}
conn.close();
// Beyond this point, all data is guaranteed to be flushed into the cache.
要处理集群中的数据,需要使用下面的一种方式来创建一个JDBCConnection
对象:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");
之后就可以执行SELECT
SQL查询了:
// Query names of all people.
ResultSet rs = conn.createStatement().executeQuery("select name from Person");
while (rs.next()) {
String name = rs.getString(1);
}
// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
stmt.setInt(1, 30);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
String name = rs.getString("name");
int age = rs.getInt("age");
}
此外,可以使用DML语句对数据进行修改。
// Insert a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("INSERT INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.execute();
// Merge a Person with a Long key.
PreparedStatement stmt = conn.prepareStatement("MERGE INTO Person(_key, name, age) VALUES(CAST(? as BIGINT), ?, ?)");
stmt.setInt(1, 1);
stmt.setString(2, "John Smith");
stmt.setInt(3, 25);
stmt.executeUpdate();
// Update a Person.
conn.createStatement().
executeUpdate("UPDATE Person SET age = age + 1 WHERE age = 25");
conn.createStatement().execute("DELETE FROM Person WHERE age = 25");
Ignite包括一个ODBC驱动,可以通过标准SQL查询和原生ODBC API查询和修改存储于分布式缓存中的数据。
要了解ODBC的细节,可以参照ODBC开发者参考。
Ignite的ODBC驱动实现了ODBC API的3.0版。
Ignite的ODBC驱动在Windows中被视为一个动态库,在Linux中被视为一个共享对象,应用不会直接加载它,而是在必要时使用一个驱动加载器API来加载和卸载ODBC驱动。
Ignite的ODBC驱动在内部使用TCP来接入Ignite集群,集群范围的连接参数可以通过IgniteConfiguration.clientConnectorConfiguration
属性来配置:
IgniteConfiguration cfg = new IgniteConfiguration();
ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration();
cfg.setClientConnectorConfiguration(clientConnectorCfg);
客户端连接器配置支持下面的参数:
属性 | 描述 | 默认值 |
---|---|---|
host | 绑定的主机名或者IP地址,如果为null ,会绑定localhost | null |
port | 绑定的TCP端口,如果指定的端口被占用,Ignite会使用portRange 属性寻找其它的可用端口。 | 10800 |
portRange | 定义尝试绑定的端口范围。比如port 配置为10800 并且portRange 为100 ,那么服务端会按照顺序去尝试绑定[10800, 10900] 范围内的端口,直到找到可用的端口。 | 100 |
maxOpenCursorsPerConnection | 单个连接可以同时打开的最大游标数。 | 128 |
threadPoolSize | 线程池中负责请求处理的线程数。 | MAX(8, CPU核数) |
socketSendBufferSize | TCP套接字发送缓冲区大小,如果配置为0,会使用系统默认值 | 0 |
socketReceiveBufferSize | TCP套接字接收缓冲区大小,如果配置为0,会使用系统默认值。 | 0 |
tcpNoDelay | 是否使用TCP_NODELAY 选项。 | true |
idleTimeout | 客户端连接的空闲超时时间。如果空闲时间超过配置的超时时间,客户端会自动断开与服务端的连接。如果该参数配置为0或者为负值,空闲超时会被禁用。 | 0 |
isOdbcEnabled | 是否允许通过ODBC访问。 | true |
isThinClientEnabled | 是否允许通过瘦客户端访问。 | true |
可以通过如下方式修改参数:
IgniteConfiguration cfg = new IgniteConfiguration();
...
ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration();
clientConnectorCfg.setHost("127.0.0.1");
clientConnectorCfg.setPort(12345);
clientConnectorCfg.setPortRange(2);
clientConnectorCfg.setMaxOpenCursorsPerConnection(512);
clientConnectorCfg.setSocketSendBufferSize(65536);
clientConnectorCfg.setSocketReceiveBufferSize(131072);
clientConnectorCfg.setThreadPoolSize(4);
cfg.setClientConnectorConfiguration(clientConnectorCfg);
...
通过ClientListenerProcessor
从ODBC驱动端建立的连接也是可以配置的,关于如何从驱动端修改连接的配置,可以看这里。
Ignite ODBC驱动的当前实现仅在连接层提供了线程安全,这意味着如果没有额外的同步处理,多线程无法访问同一个连接。不过可以为每个线程创建独立的连接,然后同时使用。
Ignite的ODBC驱动官方在如下环境中进行了测试:
OS | Windows(XP及以上,32位和64位版本) Windows Server(2008及以上,32位和64位版本) Ubuntu(14.x和15.x,64位) |
---|---|
C++编译器 | MS Visual C++ (10.0及以上), g++ (4.4.0及以上) |
Visual Studio | 2010及以上 |
在Windows中,Ignite提供了预构建的32位和64位驱动的安装器,因此如果只是想在Windows中安装驱动,那么直接看下面的安装驱动章节就可以了。
对于Linux环境,安装之前还是需要进行构建,因此如果使用的是Linux或者使用Windows但是仍然想自己构建驱动,那么往下看。
Ignite的ODBC驱动的源代码随着Ignite版本一起发布,在使用之前可以自行构建。
因为ODBC驱动是用C++编写的,因此它是作为Ignite C++的一部分提供的,并且依赖于一些C++库,具体点说依赖于utils
和binary
Ignite库,这就意味着,在构建ODBC驱动本身之前,需要先构建它们。
这里假定使用的是二进制版本,如果使用的是源代码版本,那么需要将所有使用的%IGNITE_HOME%\platforms\cpp
替换为%IGNITE_HOME%\modules\platforms\cpp
。
如果要在Windows上构建ODBC驱动,需要MS Visual Studio 2010及以后的版本。一旦打开了Ignite方案%IGNITE_HOME%\platforms\cpp\project\vs\ignite.sln
(或者ignite_86.sln
,32位平台),在方案浏览器中点击odbc项目,然后选择“Build”,Visual Studio会自动地检测并且构建所有必要的依赖。
.sln文件的路径可能会有所不同,具体取决于是从源文件还是从二进制文件进行构建。如果在%IGNITE_HOME%\platforms\cpp\project\vs\
中找不到.sln文件,可以尝试在%IGNITE_HOME%\modules\platforms\cpp\project\vs\
中查找。
注意
如果使用VS 2015及以后的版本(MSVC14.0及以后),需要将legacy_stdio_definitions.lib
作为额外的库加入odbc
项目的链接器配置以构建项目。要在IDE中将库文件加入链接器,可以打开项目节点的上下文菜单,选择Properties
,然后在Project Properties
对话框中,选择Linker
,然后编辑Linker Input
,这时就可以将legacy_stdio_definitions.lib
加入分号分割的列表中。
构建过程完成之后,会生成ignite.odbc.dll
文件,对于64位版本,位于%IGNITE_HOME%\platforms\cpp\project\vs\x64\Release
中,对于32位版本,位于%IGNITE_HOME%\platforms\cpp\project\vs\Win32\Release
中。
注意
确认为系统使用相应的驱动(32位或64位)。
为了简化安装,构建完驱动之后可能想构建安装器,Ignite使用WiX工具包来生成ODBC的安装器,因此需要下载并安装WiX,记得一定要把Wix工具包的bin
目录加入PATH变量中。
一切就绪之后,打开终端然后定位到%IGNITE_HOME%\platforms\cpp\odbc\install
目录,按顺序执行如下的命令来构建安装器:
candle.exe ignite-odbc-amd64.wxs
light.exe -ext WixUIExtension ignite-odbc-amd64.wixobj
完成之后,目录中会出现ignite-odbc-amd64.msi
和ignite-odbc-x86.msi
文件,然后就可以使用它们进行安装了。
在一个基于Linux的操作系统中,如果要构建及使用Ignite ODBC驱动,需要安装选择的ODBC驱动管理器,Ignite ODBC驱动已经使用UnixODBC进行了测试。
环境要求
下面列出了几种流行发行版的安装说明:
sudo apt-get install -y build-essential cmake openjdk-11-jdk unixodbc-dev libssl-dev
提示
JDK只用于构建过程,并不会用于ODBC驱动。
构建ODBC驱动
${CPP_BUILD_DIR}
;/usr/local
),将其称为${CPP_INSTALL_DIR}
;cd ${CPP_BUILD_DIR}
cmake -DCMAKE_BUILD_TYPE=Release -DWITH_ODBC=ON ${IGNITE_HOME}/platforms/cpp -DCMAKE_INSTALL_PREFIX=${CPP_INSTALL_DIR}
make
sudo make install
构建过程完成后,可以通过如下命令找到ODBC驱动位于何处:
whereis libignite-odbc
路径很可能是:/usr/local/lib/libignite-odbc.so
。
要使用ODBC驱动,首先要在系统中进行注册,因此ODBC驱动管理器必须能找到它。
在32位的Windows上需要使用32位版本的驱动,而在64位的Windows上可以使用64位和32位版本的驱动,也可以在64位的Windows上同时安装32位和64位版本的驱动,这样32位和64位的应用都可以使用驱动。
使用安装器进行安装
注意
首先要安装微软的Microsoft Visual C++ 2010 Redistributable 32位或者64位包。
这是最简单的方式,也是建议的方式,只需要启动指定版本的安装器即可:
%IGNITE_HOME%\platforms\cpp\bin\odbc\ignite-odbc-x86.msi
%IGNITE_HOME%\platforms\cpp\bin\odbc\ignite-odbc-amd64.msi
手动安装
要在Windows上手动安装ODBC驱动,首先要为驱动在文件系统中选择一个目录,选择一个位置后就可以把驱动放在哪并且确保所有的驱动依赖可以被解析,也就是说,它们要么位于%PATH%
,要么和驱动DLL位于同一个目录。
之后,就需要使用%IGNITE_HOME%/platforms/cpp/odbc/install
目录下的安装脚本之一,注意,执行这些脚本很可能需要管理员权限。
install_x86 <absolute_path_to_32_bit_driver>
要在Linux上构建和安装ODBC驱动,首先需要安装ODBC驱动管理器,Ignite ODBC驱动已经和UnixODBC进行了测试。
如果已经构建完成并且执行了make install
命令,libignite-odbc.so
很可能会位于/usr/local/lib
,要在ODBC驱动管理器中安装ODBC驱动并且可以使用,需要按照如下的步骤进行操作:
ldd
命令像如下这样进行检查(假定ODBC驱动位于/usr/local/lib
):ldd /usr/local/lib/libignite-odbc.so
,如果存在到其它库的无法解析的链接,需要将这些库文件所在的目录添加到LD_LIBRARY_PATH
;$IGNITE_HOME/platforms/cpp/odbc/install/ignite-odbc-install.ini
文件,并且确保Apache Ignite
段的Driver
参数指向libignite-odbc.so
所在的位置;odbcinst -i -d -f $IGNITE_HOME/platforms/cpp/odbc/install/ignite-odbc-install.ini
,要执行这条命令,很可能需要root权限。到现在为止,Ignite的ODBC驱动已经安装好了并且可以用了,可以像其它ODBC驱动一样,连接、使用。
Ignite的ODBC驱动支持标准的连接串格式,下面是正常的语法:
connection-string ::= empty-string[;] | attribute[;] | attribute; connection-string
empty-string ::=
attribute ::= attribute-keyword=attribute-value | DRIVER=[{]attribute-value[}]
attribute-keyword ::= identifier
attribute-value ::= character-string
简单来说,连接串就是一个字符串,其中包含了用分号分割的参数。
Ignite的ODBC驱动可以使用一些连接串/DSN参数,所有的参数都是大小写不敏感的,因此ADDRESS
,Address
,address
都是有效的参数名,并且指向的是同一个参数。如果参数未指定,会使用默认值,其中的一个例外是ADDRESS
属性,如果未指定,会使用SERVER
和PORT
属性代替:
属性关键字 | 描述 | 默认值 |
---|---|---|
ADDRESS | 要连接的远程节点的地址,格式为:<host>[:<port>] 。比如:localhost, example.com:12345, 127.0.0.1, 192.168.3.80:5893,如果指定了这个属性,SERVER 和PORT 将会被忽略。 | |
SERVER | 要连接的节点地址,如果指定了ADDRESS 属性,本属性会被忽略。 | |
PORT | 节点的OdbcProcessor 监听的端口,如果指定了ADDRESS 属性,本属性会被忽略。 | 10800 |
USER | SQL连接的用户名。如果服务端开启了认证,该参数为必需。 | “” |
PASSWORD | SQL连接的密码。如果服务端开启了认证,该参数为必需。 | “” |
SCHEMA | 模式名。 | PUBLIC |
DSN | 要连接的DSN名 | |
PAGE_SIZE | 数据源的响应中返回的行数,默认值会适用于大多数场景,小些的值会导致获取数据变慢,大些的值会导致驱动的额外内存占用,以及获取下一页时的额外延迟。 | 1024 |
DISTRIBUTED_JOINS | 为在ODBC连接上执行的所有查询开启非并置的分布式关联特性。 | false |
ENFORCE_JOIN_ORDER | 强制SQL查询中表关联顺序,如果设置为true ,查询优化器在关联时就不会对表进行再排序。 | false |
PROTOCOL_VERSION | 使用的ODBC协议版本,目前支持如下的版本:2.1.0、2.1.5、2.3.0、2.3.2和2.5.0,因为向后兼容,也可以使用协议的早期版本。 | 2.3.0 |
REPLICATED_ONLY | 配置查询只在全复制的表上执行,这是个提示,用于更高效地执行。 | false |
COLLOCATED | 如果SQL语句包含按主键或关联键对结果集进行分组的GROUP BY子句,可以将此参数设置为true。当Ignite执行分布式查询时,会向单个集群节点发送子查询,如果事先知道待查询的数据是在同一个节点上并置在一起的,并且是按主键或关联键分组的,那么Ignite通过在参与查询的每个节点本地分组数据来实现显著的性能和网络优化。 | false |
LAZY | 查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError ,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 | false |
SKIP_REDUCER_ON_UPDATE | 开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 | false |
SSL_MODE | 确定服务端是否需要SSL连接。可以根据需要使用require 或者disable 。 | |
SSL_KEY_FILE | 指定包含服务端SSL私钥的文件名。 | |
SSL_CERT_FILE | 指定包含SSL服务器证书的文件名。 | |
SSL_CA_FILE | 指定包含SSL服务器证书颁发机构(CA)的文件名。 |
下面的串,可以用于SQLDriverConnect
ODBC调用,来建立与Ignite节点的连接。
DRIVER={Apache Ignite};
ADDRESS=localhost:10800;
SCHEMA=somecachename;
USER=yourusername;
PASSWORD=yourpassword;
SSL_MODE=[require|disable];
SSL_KEY_FILE=<path_to_private_key>;
SSL_CERT_FILE=<path_to_client_certificate>;
SSL_CA_FILE=<path_to_trusted_certificates>
如果要使用DSN(数据源名)来进行连接,可以使用同样的参数。
要在Windows上配置DSN,需要使用一个叫做odbcad32
(32位x86系统)/odbcad64
(64位)的系统工具,这是一个ODBC数据源管理器。
安装DSN工具时,如果使用的是预构建的msi文件,一定要先安装Microsoft Visual C++ 2010(32位x86,或者64位x64)。
要启动这个工具,打开Control panel
->Administrative Tools
->数据源(ODBC)
,当ODBC数据源管理器启动后,选择Add...
->Apache Ignite
,然后以正确的方式配置DSN。
在Linux上配置DSN,需要找到odbc.ini
文件,这个文件的位置各个发行版有所不同,依赖于发行版使用的驱动管理器,比如,如果使用unixODBC
,那么可以执行如下的命令来输出系统级的ODBC相关信息:
odbcinst -j
使用SYSTEM DATA SOURCES
和USER DATA SOURCES
属性,可以定位odbc.ini
文件。
找到odbc.ini
文件之后,可以用任意编辑器打开它,然后像下面这样添加DSN片段:
[DSN Name]
description=<Insert your description here>
driver=Apache Ignite
<Other arguments here...>
本章会详细描述如何接入Ignite集群,如何使用ODBC驱动执行各种SQL查询。
在实现层,Ignite的ODBC驱动使用SQL字段查询来获取Ignite缓存中的数据,这意味着通过ODBC只可以访问这些集群配置中定义的字段。
另外,ODBC驱动支持DML,这意味着通过ODBC连接不仅仅可以读取数据,还可以修改数据。
提示
这里是完整的ODBC示例。
第一步,需要对集群节点进行配置,这个配置需要包含缓存的配置以及定义了QueryEntities
的属性。如果应用(当前场景是ODBC驱动)要通过SQL语句进行数据的查询和修改,QueryEntities
是必须的,或者,也可以使用DDL创建表。
SQLHENV env;
// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);
SQLHDBC dbc;
// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";
// Connecting to Ignite Cluster.
SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query1[] = "CREATE TABLE Person ( "
"id LONG PRIMARY KEY, "
"firstName VARCHAR, "
"lastName VARCHAR, "
"salary FLOAT) "
"WITH \"template=partitioned\"";
SQLExecDirect(stmt, query1, SQL_NTS);
SQLCHAR query2[] = "CREATE TABLE Organization ( "
"id LONG PRIMARY KEY, "
"name VARCHAR) "
"WITH \"template=partitioned\"";
SQLExecDirect(stmt, query2, SQL_NTS);
SQLCHAR query3[] = "CREATE INDEX idx_organization_name ON Organization (name)";
SQLExecDirect(stmt, query3, SQL_NTS);
从上述配置中可以看出,定义了两个缓存,包含了Person
和Organization
类型的数据,它们都列出了使用SQL可以读写的字段和索引。
配置好然后启动集群,就可以从ODBC驱动端接入了。如何做呢?准备一个有效的连接串然后连接时将其作为一个参数传递给ODBC驱动就可以了。
另外,也可以像下面这样使用一个预定义的DSN来接入。
SQLHENV env;
// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);
SQLHDBC dbc;
// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";
// Connecting to Ignite Cluster.
SQLRETURN ret = SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);
if (!SQL_SUCCEEDED(ret))
{
SQLCHAR sqlstate[7] = { 0 };
SQLINTEGER nativeCode;
SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));
SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);
std::cerr << "Failed to connect to Apache Ignite: "
<< reinterpret_cast<char*>(sqlstate) << ": "
<< reinterpret_cast<char*>(errMsg) << ", "
<< "Native error code: " << nativeCode
<< std::endl;
// Releasing allocated handles.
SQLFreeHandle(SQL_HANDLE_DBC, dbc);
SQLFreeHandle(SQL_HANDLE_ENV, env);
return;
}
都准备好后,就可以使用ODBC API执行SQL查询了。
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] = "SELECT firstName, lastName, salary, Organization.name FROM Person "
"INNER JOIN \"Organization\".Organization ON Person.orgId = Organization.id";
SQLSMALLINT queryLen = static_cast<SQLSMALLINT>(sizeof(queryLen));
SQLRETURN ret = SQLExecDirect(stmt, query, queryLen);
if (!SQL_SUCCEEDED(ret))
{
SQLCHAR sqlstate[7] = { 0 };
SQLINTEGER nativeCode;
SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));
SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);
std::cerr << "Failed to perfrom SQL query upon Apache Ignite: "
<< reinterpret_cast<char*>(sqlstate) << ": "
<< reinterpret_cast<char*>(errMsg) << ", "
<< "Native error code: " << nativeCode
<< std::endl;
}
else
{
// Printing the result set.
struct OdbcStringBuffer
{
SQLCHAR buffer[BUFFER_SIZE];
SQLLEN resLen;
};
// Getting a number of columns in the result set.
SQLSMALLINT columnsCnt = 0;
SQLNumResultCols(stmt, &columnsCnt);
// Allocating buffers for columns.
std::vector<OdbcStringBuffer> columns(columnsCnt);
// Binding colums. For simplicity we are going to use only
// string buffers here.
for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
SQLBindCol(stmt, i + 1, SQL_C_CHAR, columns[i].buffer, BUFFER_SIZE, &columns[i].resLen);
// Fetching and printing data in a loop.
ret = SQLFetch(stmt);
while (SQL_SUCCEEDED(ret))
{
for (size_t i = 0; i < columns.size(); ++i)
std::cout << std::setw(16) << std::left << columns[i].buffer << " ";
std::cout << std::endl;
ret = SQLFetch(stmt);
}
}
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
列绑定
在上例中,所有的列都绑定到SQL_C_CHAR
,这意味着获取时所有的值都会被转换成字符串,这样做是为了简化,获取时进行值转换是非常慢的,因此默认的做法应该是与存储采用同样的方式进行获取。
要将新的数据插入集群,ODBC端可以使用INSERT
语句。
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] =
"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
"VALUES (?, ?, ?, ?, ?, ?)";
SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Binding columns.
int64_t key = 0;
int64_t orgId = 0;
char name[1024] = { 0 };
SQLLEN nameLen = SQL_NTS;
double salary = 0.0;
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR, sizeof(name), sizeof(name), name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);
// Filling cache.
key = 1;
orgId = 1;
strncpy(name, "John", sizeof(name));
salary = 2200.0;
SQLExecute(stmt);
SQLMoreResults(stmt);
++key;
orgId = 1;
strncpy(name, "Jane", sizeof(name));
salary = 1300.0;
SQLExecute(stmt);
SQLMoreResults(stmt);
++key;
orgId = 2;
strncpy(name, "Richard", sizeof(name));
salary = 900.0;
SQLExecute(stmt);
SQLMoreResults(stmt);
++key;
orgId = 2;
strncpy(name, "Mary", sizeof(name));
salary = 2400.0;
SQLExecute(stmt);
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
下面,是不使用预编译语句插入Organization数据:
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query1[] = "INSERT INTO \"Organization\".Organization (id, name)
VALUES (1L, 'Some company')";
SQLExecDirect(stmt, query1, static_cast<SQLSMALLINT>(sizeof(query1)));
SQLFreeStmt(stmt, SQL_CLOSE);
SQLCHAR query2[] = "INSERT INTO \"Organization\".Organization (id, name)
VALUES (2L, 'Some other company')";
SQLExecDirect(stmt, query2, static_cast<SQLSMALLINT>(sizeof(query2)));
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
错误检查
为了简化,上面的代码没有进行错误检查,但是在生产环境中不要这样做。
下面使用UPDATE
语句更新存储在集群中的部分人员的工资信息:
void AdjustSalary(SQLHDBC dbc, int64_t key, double salary)
{
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] = "UPDATE Person SET salary=? WHERE id=?";
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT,
SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG,
SQL_BIGINT, 0, 0, &key, 0, 0);
SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}
...
AdjustSalary(dbc, 3, 1200.0);
AdjustSalary(dbc, 1, 2500.0);
最后,使用DELETE
语句删除部分记录:
void DeletePerson(SQLHDBC dbc, int64_t key)
{
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] = "DELETE FROM Person WHERE id=?";
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT,
0, 0, &key, 0, 0);
SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}
...
DeletePerson(dbc, 1);
DeletePerson(dbc, 4);
Ignite的ODBC驱动支持在DML语句中通过参数数组进行批处理。
还是使用上述插入数据的示例,但是只调用一次SQLExecute
:
SQLHSTMT stmt;
// Allocating a statement handle.
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] =
"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
"VALUES (?, ?, ?, ?, ?, ?)";
SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Binding columns.
int64_t key[4] = {0};
int64_t orgId[4] = {0};
char name[1024 * 4] = {0};
SQLLEN nameLen[4] = {0};
double salary[4] = {0};
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR, 1024, 1024, name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, salary, 0, 0);
// Filling cache.
key[0] = 1;
orgId[0] = 1;
strncpy(name, "John", 1023);
salary[0] = 2200.0;
nameLen[0] = SQL_NTS;
key[1] = 2;
orgId[1] = 1;
strncpy(name + 1024, "Jane", 1023);
salary[1] = 1300.0;
nameLen[1] = SQL_NTS;
key[2] = 3;
orgId[2] = 2;
strncpy(name + 1024 * 2, "Richard", 1023);
salary[2] = 900.0;
nameLen[2] = SQL_NTS;
key[3] = 4;
orgId[3] = 2;
strncpy(name + 1024 * 3, "Mary", 1023);
salary[3] = 2400.0;
nameLen[3] = SQL_NTS;
// Asking the driver to store the total number of processed argument sets
// in the following variable.
SQLULEN setsProcessed = 0;
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &setsProcessed, SQL_IS_POINTER);
// Setting the size of the arguments array. This is 4 in our case.
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMSET_SIZE, reinterpret_cast<SQLPOINTER>(4), 0);
// Executing the statement.
SQLExecute(stmt);
// Releasing the statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
注意
注意这种类型的批处理目前只支持INSERT、UPDATE、 DELETE、和MERGE语句,还不支持SELECT,data-at-execution功能也不支持通过参数数组进行批处理。
Ignite的ODBC驱动可以通过SET STREAMING
命令对流化数据进行批量处理,具体可以看SET STREAMING的相关内容。
注意
流处理模式中,参数数组和data-at-execution参数是不支持的。
ODBC定义了若干接口一致性级别,在本章中可以知道Ignite的ODBC驱动支持了哪些特性。
特性 | 支持程度 | 备注 |
---|---|---|
通过调用SQLAllocHandle 和SQLFreeHandle 来分配和释放所有处理器类型 | 是 | |
使用SQLFreeStmt 函数的所有形式 | 是 | |
通过调用SQLBindCol ,绑定列结果集 | 是 | |
通过调用SQLBindParameter 和SQLNumParams ,处理动态参数,包括参数数组,只针对输入方向, | 是 | |
指定绑定偏移量 | 是 | |
使用数据执行对话框,涉及SQLParamData 和SQLPutData 的调用 | 是 | |
管理游标和游标名 | 部分 | 实现了SQLCloseCursor ,Ignite不支持命名游标 |
通过调用SQLColAttribute ,SQLDescribeCol ,SQLNumResultCols 和SQLRowCount ,访问结果集的描述(元数据) | 是 | |
通过调用目录函数SQLColumns ,SQLGetTypeInfo ,SQLStatistics 和SQLStatistics 查询数据字典 | 部分 | 不支持SQLStatistics |
通过调用SQLConnect ,SQLDataSources ,SQLDisconnect 和SQLDriverConnect 管理数据源和连接,通过SQLDrivers 获取驱动的信息,不管支持ODBC那个级别。 | 是 | |
通过调用SQLExecDirect ,SQLExecute 和SQLPrepare 预编译和执行SQL语句。 | 是 | |
通过调用SQLFetch ,或者将FetchOrientation 参数设置为SQL_FETCH_NEXT 之后调用SQLFetchScroll ,获取一个结果集或者多行数据中的一行,只能向前 | 是 | |
通过调用SQLGetData ,获得一个未绑定的列 | 是 | |
通过调用SQLGetConnectAttr 、SQLGetEnvAttr 、SQLGetStmtAttr ,获取所有属性的当前值,或者通过调用SQLSetConnectAttr 、SQLSetEnvAttr 、SQLSetStmtAttr ,将所有属性赋为默认值,以及为某个属性赋为非默认值。 | 部分 | 并不支持所有属性 |
通过调用SQLCopyDesc 、SQLGetDescField 、SQLGetDescRec 、SQLSetDescField 、SQLSetDescRec ,操作描述符的某字段。 | 否 | |
通过调用SQLGetDiagField 、SQLGetDiagRec ,获得诊断信息。 | 是 | |
通过调用SQLGetFunctions 和SQLGetInfo ,检测驱动兼容性,以及通过调用SQLNativeSql ,在发送到数据源之前检测SQL语句中的任何文本代换的结果 | 是 | |
使用SQLEndTran 的语法提交一个事务,驱动的核心级别不需要支持真事务,因此,应用无法指定SQL_ROLLBACK 或者为SQL_ATTR_AUTOCOMMIT 连接属性指定SQL_AUTOCOMMIT_OFF | 是 | |
调用SQLCancel 取消数据执行对话框,以及多线程环境中,在另一个线程中取消ODBC函数的执行,核心级别的接口一致性不需要支持函数的异步执行,也不需要使用SQLCancel 取消一个ODBC函数的异步执行。平台和ODBC驱动都不需要多线程地同时自主活动,不过在多线程环境中,ODBC驱动必须是线程安全的,从应用来的请求的序列化是实现这个规范的一致的方式,即使它导致了一系列的性能问题。 | 否 | 当前的ODBC驱动实现不支持异步执行 |
通过调用SQLSpecialColumns 获得表的行标识符SQL_BEST_ROWID 。 | 部分 | 当前的实现总是返回空 |
特性 | 支持程度 | 备注 |
---|---|---|
指定数据库表和视图的模式(使用两部分命名)。 | 是 | |
ODBC函数调用的真正异步执行,在给定的连接上,适用的函数要么是全同步的,要么是全异步的。 | 否 | |
使用可滚动的游标,调用SQLFetchScroll 时使用FetchOrientation 参数而不是SQL_FETCH_NEXT ,可以在方法内访问结果集而不是只能向前。 | 否 | |
通过调用SQLPrimaryKeys 获得表的主键。 | 部分 | 目前返回空结果集。 |
使用存储过程,通过调用SQLProcedureColumns 和SQLProcedures ,使用ODBC的转义序列进行存储过程数据字典的查询以及存储过程的调用。 | 否 | |
通过调用SQLBrowseConnect ,通过交互式浏览可用的服务器接入一个数据源。 | 否 | |
使用ODBC函数而不是SQL语句来执行某个数据库操作:带有SQL_POSITION 和SQL_REFRESH 的SQLSetPos 。 | 否 | |
通过调用SQLMoreResults ,访问由批处理和存储过程生成的多结果集的内容。 | 是 | |
划定跨越多个ODBC函数的事务边界,获得真正的原子性以及在SQLEndTran 中指定SQL_ROLLBACK 的能力。 | 否 | Ignite SQL不支持事务 |
特性 | 支持程度 | 备注 |
---|---|---|
使用三部分命名的数据库表和视图。 | 否 | Ignite SQL不支持catalog。 |
通过调用SQLDescribeParam 描述动态参数。 | 是 | |
不仅仅使用输入参数,还使用输出参数以及输入/输出参数,还有存储过程的结果。 | 否 | Ignite SQL不支持输出参数。 |
使用书签,通过在第0列上调用SQLDescribeCol 和SQLColAttribute 获得书签;通过调用SQLFetchScroll 时将参数FetchOrientation 配置为SQL_FETCH_BOOKMARK ,在书签上进行获取;通过调用SQLBulkOperations 时将参数配置为SQL_UPDATE_BY_BOOKMARK 、SQL_DELETE_BY_BOOKMARK 、SQL_FETCH_BY_BOOKMARK 可以进行书签的更新、删除和获取操作。 | 否 | Ignite SQL不支持书签。 |
通过调用SQLColumnPrivileges 、SQLForeignKeys 、SQLTablePrivileges 获取数据字典的高级信息。 | 部分 | SQLForeignKeys 已经实现,但是返回空的结果集。 |
通过在SQLBulkOperations 中使用SQL_ADD 或者在SQLSetPos 中使用SQL_DELETE 或SQL_UPDATE ,使用ODBC函数而不是SQL语句执行额外的数据库操作。 | 否 | |
为某个语句开启ODBC函数的异步执行。 | 否 | |
通过调用SQLSpecialColumns 获得表的SQL_ROWVER 列标识符。 | 部分 | 已实现,但是返回空结果集。 |
为SQL_ATTR_CONCURRENCY 语句参数配置除了SQL_CONCUR_READ_ONLY 以外的至少一个值。 | 否 | |
登录请求以及SQL查询的超时功能(SQL_ATTR_LOGIN_TIMEOUT 和SQL_ATTR_QUERY_TIMEOUT )。 | 部分 | SQL_ATTR_QUERY_TIMEOUT 支持已实现,SQL_ATTR_LOGIN_TIMEOUT 还未实现。 |
修改默认隔离级别的功能,在隔离级别为序列化 时支持事务的功能。 | 否 | Ignite SQL不支持事务。 |
函数名 | 支持程度 | 一致性级别 |
---|---|---|
SQLAllocHandle | 是 | Core |
SQLBindCol | 是 | Core |
SQLBindParameter | 是 | Core |
SQLBrowseConnect | 否 | Level1 |
SQLBulkOperations | 否 | Level1 |
SQLCancel | 否 | Core |
SQLCloseCursor | 是 | Core |
SQLColAttribute | 是 | Core |
SQLColumnPrivileges | 否 | Level2 |
SQLColumns | 是 | Core |
SQLConnect | 是 | Core |
SQLCopyDesc | 否 | Core |
SQLDataSources | N/A | Core |
SQLDescribeCol | 是 | Core |
SQLDescribeParam | 是 | Level2 |
SQLDisconnect | 是 | Core |
SQLDriverConnect | 是 | Core |
SQLDrivers | N/A | Core |
SQLEndTran | 部分 | Core |
SQLExecDirect | 是 | Core |
SQLExecute | 是 | Core |
SQLFetch | 是 | Core |
SQLFetchScroll | 是 | Core |
SQLForeignKeys | 部分 | Level2 |
SQLFreeHandle | 是 | Core |
SQLFreeStmt | 是 | Core |
SQLGetConnectAttr | 部分 | Core |
SQLGetCursorName | 否 | Core |
SQLGetData | 是 | Core |
SQLGetDescField | 否 | Core |
SQLGetDescRec | 否 | Core |
SQLGetDiagField | 是 | Core |
SQLGetDiagRec | 是 | Core |
SQLGetEnvAttr | 部分 | Core |
SQLGetFunctions | 否 | Core |
SQLGetInfo | 是 | Core |
SQLGetStmtAttr | 部分 | Core |
SQLGetTypeInfo | 是 | Core |
SQLMoreResults | 是 | Level1 |
SQLNativeSql | 是 | Core |
SQLNumParams | 是 | Core |
SQLNumResultCols | 是 | Core |
SQLParamData | 是 | Core |
SQLPrepare | 是 | Core |
SQLPrimaryKeys | 部分 | Level1 |
SQLProcedureColumns | 否 | Level1 |
SQLProcedures | 否 | Level1 |
SQLPutData | 是 | Core |
SQLRowCount | 是 | Core |
SQLSetConnectAttr | 部分 | Core |
SQLSetCursorName | 否 | Core |
SQLSetDescField | 否 | Core |
SQLSetDescRec | 否 | Core |
SQLSetEnvAttr | 部分 | Core |
SQLSetPos | 否 | Level1 |
SQLSetStmtAttr | 部分 | Core |
SQLSpecialColumns | 部分 | Core |
SQLStatistics | 否 | Core |
SQLTablePrivileges | 否 | Level2 |
SQLTables | 是 | Core |
特性 | 支持程度 | 一致性级别 |
---|---|---|
SQL_ATTR_CONNECTION_POOLING | 否 | 可选 |
SQL_ATTR_CP_MATCH | 否 | 可选 |
SQL_ATTR_ODBC_VER | 是 | Core |
SQL_ATTR_OUTPUT_NTS | 是 | 可选 |
特性 | 支持程度 | 一致性级别 |
---|---|---|
SQL_ATTR_ACCESS_MODE | 否 | Core |
SQL_ATTR_ASYNC_ENABLE | 否 | Level1/Level2 |
SQL_ATTR_AUTO_IPD | 否 | Level2 |
SQL_ATTR_AUTOCOMMIT | 否 | Level1 |
SQL_ATTR_CONNECTION_DEAD | 是 | Level1 |
SQL_ATTR_CONNECTION_TIMEOUT | 是 | Level2 |
SQL_ATTR_CURRENT_CATALOG | 否 | Level2 |
SQL_ATTR_LOGIN_TIMEOUT | 否 | Level2 |
SQL_ATTR_ODBC_CURSORS | 否 | Core |
SQL_ATTR_PACKET_SIZE | 否 | Level2 |
否SQL_ATTR_QUIET_MODE | 否 | Core |
SQL否_ATTR_TRACE | 否 | Core |
SQL_AT否TR_TRACEFILE | 否 | Core |
SQL_AT否TR_TRANSLATE_LIB | 否 | Core |
SQL_ATTR_TRANSLATE_OPTION | 否 | Core |
SQL_ATTR_TXN_ISOLATION | 否 | Level1/Level2 |
特性 | 支持程度 | 一致性级别 |
---|---|---|
SQL_ATTR_APP_PARAM_DESC | 部分 | Core |
SQL_ATTR_APP_ROW_DESC | 部分 | Core |
SQL_ATTR_ASYNC_ENABLE | 否 | Level1/Level2 |
SQL_ATTR_CONCURRENCY | 否 | Level1/Level2 |
SQL_ATTR_CURSOR_SCROLLABLE | 否 | Level1 |
SQL_ATTR_CURSOR_SENSITIVITY | 否 | Level2 |
SQL_ATTR_CURSOR_TYPE | 否 | Level1/Level2 |
SQL_ATTR_ENABLE_AUTO_IPD | 否 | Level2 |
SQL_ATTR_FETCH_BOOKMARK_PTR | 否 | Level2 |
SQL_ATTR_IMP_PARAM_DESC | 部分 | Core |
SQL_ATTR_IMP_ROW_DESC | 部分 | Core |
SQL_ATTR_KEYSET_SIZE | 否 | Level2 |
SQL_ATTR_MAX_LENGTH | 否 | Level1 |
SQL_ATTR_MAX_ROWS | 否 | Level1 |
SQL_ATTR_METADATA_ID | 否 | Core |
SQL_ATTR_NOSCAN | 否 | Core |
SQL_ATTR_PARAM_BIND_OFFSET_PTR | 是 | Core |
SQL_ATTR_PARAM_BIND_TYPE | 否 | Core |
SQL_ATTR_PARAM_OPERATION_PTR | 否 | Core |
SQL_ATTR_PARAM_STATUS_PTR | 是 | Core |
SQL_ATTR_PARAMS_PROCESSED_PTR | 是 | Core |
SQL_ATTR_PARAMSET_SIZE | 是 | Core |
SQL_ATTR_QUERY_TIMEOUT | 是 | Level2 |
SQL_ATTR_RETRIEVE_DATA | 否 | Level1 |
SQL_ATTR_ROW_ARRAY_SIZE | 是 | Core |
SQL_ATTR_ROW_BIND_OFFSET_PTR | 是 | Core |
SQL_ATTR_ROW_BIND_TYPE | 是 | Core |
SQL_ATTR_ROW_NUMBER | 否 | Level1 |
SQL_ATTR_ROW_OPERATION_PTR | 否 | Level1 |
SQL_ATTR_ROW_STATUS_PTR | 是 | Core |
SQL_ATTR_ROWS_FETCHED_PTR | 是 | Core |
SQL_ATTR_SIMULATE_CURSOR | 否 | Level2 |
SQL_ATTR_USE_BOOKMARKS | 否 | Level2 |
特性 | 支持程度 | 一致性级别 |
---|---|---|
SQL_DESC_ALLOC_TYPE | 否 | Core |
SQL_DESC_ARRAY_SIZE | 否 | Core |
SQL_DESC_ARRAY_STATUS_PTR | 否 | Core/Level1 |
SQL_DESC_BIND_OFFSET_PTR | 否 | Core |
SQL_DESC_BIND_TYPE | 否 | Core |
SQL_DESC_COUNT | 否 | Core |
SQL_DESC_ROWS_PROCESSED_PTR | 否 | Core |
特性 | 支持程度 | 一致性级别 |
---|---|---|
SQL_DESC_AUTO_UNIQUE_VALUE | 否 | Level2 |
SQL_DESC_BASE_COLUMN_NAME | 否 | Core |
SQL_DESC_BASE_TABLE_NAME | 否 | Level1 |
SQL_DESC_CASE_SENSITIVE | 否 | Core |
SQL_DESC_CATALOG_NAME | 否 | Level2 |
SQL_DESC_CONCISE_TYPE | 否 | Core |
SQL_DESC_DATA_PTR | 否 | Core |
SQL_DESC_DATETIME_INTERVAL_CODE | 否 | Core |
SQL_DESC_DATETIME_INTERVAL_PRECISION | 否 | Core |
SQL_DESC_DISPLAY_SIZE | 否 | Core |
SQL_DESC_FIXED_PREC_SCALE | 否 | Core |
SQL_DESC_INDICATOR_PTR | 否 | Core |
SQL_DESC_LABEL | 否 | Level2 |
SQL_DESC_LENGTH | 否 | Core |
SQL_DESC_LITERAL_PREFIX | 否 | Core |
SQL_DESC_LITERAL_SUFFIX | 否 | Core |
SQL_DESC_LOCAL_TYPE_NAME | 否 | Core |
SQL_DESC_NAME | 否 | Core |
SQL_DESC_NULLABLE | 否 | Core |
SQL_DESC_OCTET_LENGTH | 否 | Core |
SQL_DESC_OCTET_LENGTH_PTR | 否 | Core |
SQL_DESC_PARAMETER_TYPE | 否 | Core/Level2 |
SQL_DESC_PRECISION | 否 | Core |
SQL_DESC_ROWVER | 否 | Level1 |
SQL_DESC_SCALE | 否 | Core |
SQL_DESC_SCHEMA_NAME | 否 | Level1 |
SQL_DESC_SEARCHABLE | 否 | Core |
SQL_DESC_TABLE_NAME | 否 | Level1 |
SQL_DESC_TYPE | 否 | Core |
SQL_DESC_TYPE_NAME | 否 | Core |
SQL_DESC_UNNAMED | 否 | Core |
SQL_DESC_UNSIGNED | 否 | Core |
SQL_DESC_UPDATABLE | 否 | Core |
下面是支持的SQL数据类型:
数据类型 | 是否支持 |
---|---|
SQL_CHAR | 是 |
SQL_VARCHAR | 是 |
SQL_LONGVARCHAR | 是 |
SQL_WCHAR | 否 |
SQL_WVARCHAR | 否 |
SQL_WLONGVARCHAR | 否 |
SQL_DECIMAL | 是 |
SQL_NUMERIC | 否 |
SQL_SMALLINT | 是 |
SQL_INTEGER | 是 |
SQL_REAL | 否 |
SQL_FLOAT | 是 |
SQL_DOUBLE | 是 |
SQL_BIT | 是 |
SQL_TINYINT | 是 |
SQL_BIGINT | 是 |
SQL_BINARY | 是 |
SQL_VARBINARY | 是 |
SQL_LONGVARBINARY | 是 |
SQL_TYPE_DATE | 是 |
SQL_TYPE_TIME | 是 |
SQL_TYPE_TIMESTAMP | 是 |
SQL_TYPE_UTCDATETIME | 否 |
SQL_TYPE_UTCTIME | 否 |
SQL_INTERVAL_MONTH | 否 |
SQL_INTERVAL_YEAR | 否 |
SQL_INTERVAL_YEAR_TO_MONTH | 否 |
SQL_INTERVAL_DAY | 否 |
SQL_INTERVAL_HOUR | 否 |
SQL_INTERVAL_MINUTE | 否 |
SQL_INTERVAL_SECOND | 否 |
SQL_INTERVAL_DAY_TO_HOUR | 否 |
SQL_INTERVAL_DAY_TO_MINUTE | 否 |
SQL_INTERVAL_DAY_TO_SECOND | 否 |
SQL_INTERVAL_HOUR_TO_MINUTE | 否 |
SQL_INTERVAL_HOUR_TO_SECOND | 否 |
SQL_INTERVAL_MINUTE_TO_SECOND | 否 |
SQL_GUID | 是 |
下面是支持的C数据类型:
数据类型 | 是否支持 |
---|---|
SQL_C_CHAR | 是 |
SQL_C_WCHAR | 是 |
SQL_C_SHORT | 是 |
SQL_C_SSHORT | 是 |
SQL_C_USHORT | 是 |
SQL_C_LONG | 是 |
SQL_C_SLONG | 是 |
SQL_C_ULONG | 是 |
SQL_C_FLOAT | 是 |
SQL_C_DOUBLE | 是 |
SQL_C_BIT | 是 |
SQL_C_TINYINT | 是 |
SQL_C_STINYINT | 是 |
SQL_C_UTINYINT | 是 |
SQL_C_BIGINT | 是 |
SQL_C_SBIGINT | 是 |
SQL_C_UBIGINT | 是 |
SQL_C_BINARY | 是 |
SQL_C_BOOKMARK | 否 |
SQL_C_VARBOOKMARK | 否 |
SQL_C_INTERVAL* (all interval types) | 否 |
SQL_C_TYPE_DATE | 是 |
SQL_C_TYPE_TIME | 是 |
SQL_C_TYPE_TIMESTAMP | 是 |
SQL_C_NUMERIC | 是 |
SQL_C_GUID | 是 |
支持如下的SQL数据类型(规范中列出):
SQL_CHAR
SQL_VARCHAR
SQL_LONGVARCHAR
SQL_SMALLINT
SQL_INTEGER
SQL_FLOAT
SQL_DOUBLE
SQL_BIT
SQL_TINYINT
SQL_BIGINT
SQL_BINARY
SQL_VARBINARY
SQL_LONGVARBINARY
SQL_GUID
SQL_DECIMAL
SQL_TYPE_DATE
SQL_TYPE_TIMESTAMP
SQL_TYPE_TIME
要获取错误码, 可以使用SQLGetDiagRec()
函数,它会返回一个ANSI SQL标准定义的错误码字符串,比如:
SQLHENV env;
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);
SQLHDBC dbc;
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
SQLCHAR connectStr[] = "DRIVER={Apache Ignite};SERVER=localhost;PORT=10800;SCHEMA=Person;";
SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, 0, 0, 0, SQL_DRIVER_COMPLETE);
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] = "SELECT firstName, lastName, resume, salary FROM Person";
SQLRETURN ret = SQLExecDirect(stmt, query, SQL_NTS);
if (ret != SQL_SUCCESS)
{
SQLCHAR sqlstate[7] = "";
SQLINTEGER nativeCode;
SQLCHAR message[1024];
SQLSMALLINT reallen = 0;
int i = 1;
ret = SQLGetDiagRec(SQL_HANDLE_STMT, stmt, i, sqlstate,
&nativeCode, message, sizeof(message), &reallen);
while (ret != SQL_NO_DATA)
{
std::cout << sqlstate << ": " << message;
++i;
ret = SQLGetDiagRec(SQL_HANDLE_STMT, stmt, i, sqlstate,
&nativeCode, message, sizeof(message), &reallen);
}
}
下表中列出了所有Ignite目前支持的错误码,该列表未来可能会扩展:
错误码 | 描述 |
---|---|
01S00 | 无效连接串属性 |
01S02 | 驱动程序不支持指定的值,并替换了一个类似的值 |
08001 | 驱动接入集群失败 |
08002 | 连接已经建立 |
08003 | 未知原因导致的连接处于关闭状态 |
08004 | 连接被集群踢出 |
08S01 | 连接失败 |
22026 | 字符串长度与数据执行对话框不匹配 |
23000 | 违反完整性约束(比如主键重复、主键为空等等) |
24000 | 无效的游标状态 |
42000 | 请求的语法错误 |
42S01 | 表已经存在 |
42S02 | 表不存在 |
42S11 | 索引已经存在 |
42S12 | 索引不存在 |
42S21 | 列已经存在 |
42S22 | 列不存在 |
HY000 | 一般性错误,具体看错误消息 |
HY001 | 内存分配错误 |
HY003 | 无效的应用缓冲区类型 |
HY004 | 无效的SQL数据类型 |
HY009 | 无效的空指针使用 |
HY010 | 函数调用顺序错误 |
HY090 | 无效的字符串和缓冲区长度(比如长度为负或者为0) |
HY092 | 可选类型超范围 |
HY097 | 列类型超范围 |
HY105 | 无效的参数类型 |
HY106 | 获取类型超范围 |
HYC00 | 特性未实现 |
IM001 | 函数不支持 |
警告
MVCC当前处于测试阶段。
配置为TRANSACTIONAL_SNAPSHOT
原子化模式的缓存,支持SQL事务以及键-值事务,并且为两种类型的事务开启了多版本并发控制(MVCC)。
多版本并发控制,是一种在多用户并发访问时,控制数据一致性的方法,MVCC实现了快照隔离保证,确保每个事务总是看到一致的数据快照。
每个事务在开始时会获得一个数据的一致性快照,并且只能查看和修改该快照中的数据。当事务更新一个条目时,Ignite验证其它事务没有更新该条目,并创建该条目的一个新的版本,新版本只有在事务成功提交时才对其它事务可见。如果该条目已被更新,当前事务会失败并抛出异常(关于如何处理更新冲突,请参见下面的并发更新章节的介绍)。
快照不是物理快照,而是由MVCC协调器生成的逻辑快照,MVCC协调器是协调集群中的事务活动的集群节点。协调器跟踪所有活动的事务,并在每个事务完成时得到通知。启用MVCC的缓存的所有操作都需要从协调器请求一个数据的快照。
要为缓存开启MVCC,需要在缓存配置中使用TRANSACTIONAL_SNAPSHOT
原子化模式,如果使用CREATE TABLE
命令创建表,则需要在该命令的WITH
子句中将原子化模式作为参数传进去。
CREATE TABLE Person WITH "ATOMICITY=TRANSACTIONAL_SNAPSHOT"
提示
TRANSACTIONAL_SNAPSHOT
模式只支持默认的并发模型(PESSIMISTIC
)和默认的隔离级别(REPEATABLE_READ
),具体可以看上面的并发模型和隔离级别章节。
在一个事务中,如果一个条目先被读取然后被更新,那么就有一种可能性,即另一个事务可能在两个操作之间切入然后首先更新该条目,这时,当第一个事务试图更新该条目时就会抛出异常,然后该事务会被标记为只能回滚
,这时开发者就需要进行事务重试。
那么怎么知道发生了冲突呢?
CacheException
异常(异常信息为Cannot serialize transaction due to write conflict (transaction is marked for rollback)
),并且Transaction.rollbackOnly
标志为true
;SQLSTATE:40001
错误代码。for(int i = 1; i <=5 ; i++) {
try (Transaction tx = Ignition.ignite().transactions().txStart()) {
System.out.println("attempt #" + i + ", value: " + cache.get(1));
try {
cache.put(1, "new value");
tx.commit();
System.out.println("attempt #" + i + " succeeded");
break;
} catch (CacheException e) {
if (!tx.isRollbackOnly()) {
// Transaction was not marked as "rollback only",
// so it's not a concurrent update issue.
// Process the exception here.
break;
}
}
}
}
TRANSACTIONAL_SNAPSHOT
模式是缓存级的,并且事务中涉及的缓存不允许有不同的原子化模式,因此,如果希望在一个SQL事务中覆盖多个表,则必须使用TRANSACTIONAL_SNAPSHOT
模式创建所有表。
通过JDBC/ODBC连接参数,Ignite支持三种模式来处理SQL的嵌套事务:
JDBC连接串:
jdbc:ignite:thin://127.0.0.1/?nestedTransactionsMode=COMMIT
当在一个事务中出现了嵌套事务,系统的行为依赖于nestedTransactionsMode
参数:
ERROR
:如果发生了嵌套事务,会抛出错误,包含事务会回滚,这是默认的行为;COMMIT
:包含事务会被提交,嵌套事务开始并且在遇到COMMIT
语句时会提交,包含事务中的其余部分会作为隐式事务执行;IGNORE
:不要使用这个模式,嵌套事务的开始会被忽略,嵌套事务中的语句会作为包含事务的一部分来执行,然后所有的变更会随着嵌套事务的提交而提交,包含事务中的其余部分会作为隐式事务执行。如果在开启了MVCC的缓存上使用持续查询,那么有些限制需要知道:
IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD
系统属性来修改该值。开启MVCC的缓存,下面的特性是不支持的,这些限制后续的版本可能会解决: