Hatena::ブログ(Diary)

CLOVER

2017-10-01

Apache Geode を組み込みな Memcached/Redis Server として使う

ちょっとした小ネタ的に。

Apache Geode は、 Memcached Server および Redis Server になれる機能を持っています。

How Gemcached Works | Geode Docs

Geode Redis Adapter | Geode Docs

それぞれ、 GemcachedとRedis Adapter という名前のようです。

Gemcached については、組み込みとして使うサンプルがドキュメント上にあるのですが、 Redis Adapter については
gfsh 上での操作しか記載されていません。

ですが、 Redis Adapter も同じように組み込みで使うことができるようなので、合わせて動かしてみることにしました。

準備

GemcachedもRedis Adapter も、 Apache Geodeのcore モジュールに含まれています。よって、基本的には core があれば OK … 。

        <dependency>
            <groupId>org.apache.geode</groupId>
            <artifactId>geode-core</artifactId>
            <version>1.2.1</version>
        </dependency>

と言いたいところですが、 Redis Adapter については optional な依存関係が必要になります。そちらについては、
後述することにしましょう。

Memcached および Redis にアクセスするためのライブラリとしては、それぞれ spymemcachedとLettuce を使用することにします。

        <dependency>
            <groupId>net.spy</groupId>
            <artifactId>spymemcached</artifactId>
            <version>2.12.3</version>
        </dependency>
        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.0.RELEASE</version>
        </dependency>

確認はテストコードでやります。 JUnitとAssertJ を依存関係に加えておきます。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-launcher</artifactId>
            <version>1.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.8.0</version>
            <scope>test</scope>
        </dependency>

では、書いていってみましょう。

Memcached(Gemcached)

まずは、テストコードの雛形から。
src/test/java/org/littlewings/geode/embedded/EmbeddedMemcachedServerTest.java

package org.littlewings.geode.embedded;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.BinaryConnectionFactory;
import net.spy.memcached.MemcachedClient;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.memcached.ValueWrapper;
import org.apache.geode.memcached.GemFireMemcachedServer;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class EmbeddedMemcachedServerTest {
    // ここに、テストを書く!
}

最初にも書きましたが、 Memcached Server へのアクセスには spymemcached を使用します。

GitHub - couchbase/spymemcached: A simple, asynchronous, single-threaded memcached client written in java.

Gemcached を組み込みとして使ったコード例は、こんな感じです。

    @Test
    public void gettingStarted() throws IOException, InterruptedException, ExecutionException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

        MemcachedClient client = new MemcachedClient(new InetSocketAddress("localhost", 11211));

        try {
            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            TimeUnit.SECONDS.sleep(5L);

            assertThat(client.get("key1")).isNull();

            client.set("key2", 3, "value2").get();  // Future
            assertThat(client.get("key2")).isEqualTo("value2");
            client.delete("key2").get();  // Future
            assertThat(client.get("key2")).isNull();
        } finally {
            client.shutdown();
            memcachedServer.shutdown();
        }
    }

GemFireMemcachedServer クラスのインスタンスを作成して、 GemFireMemcachedServer#start するだけ。

        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

GemFireMemcachedServer のコンストラクタは 2 つあり、リッスンポートのみの指定と、バインドするアドレスとポート、
Memcached で使うプロトコルを指定できるものがあります。
https://123
https://142

今回は、全部指定する形を取りました。

リッスンポートは、負の値を指定するとデフォルトとして 11212 ポートを取ります。
https://86

プロトコルは、デフォルトで ASCII です。
https://130

使い終わったら、 shutdown しましょう。

            memcachedServer.shutdown();

これが、基本的な使い方になります。

expire とかも効いていて、良さそうですね。

            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            TimeUnit.SECONDS.sleep(5L);

            assertThat(client.get("key1")).isNull();

            client.set("key2", 3, "value2").get();  // Future
            assertThat(client.get("key2")).isEqualTo("value2");
            client.delete("key2").get();  // Future
            assertThat(client.get("key2")).isNull();

spymemcachedのset の戻り値が Future になっていることを知らなくて、最初テストが不安定になってハマりました … 。

Binary プロトコルでも、問題なく動作します。

    @Test
    public void gettingStartedAsBinary() throws IOException, InterruptedException, ExecutionException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.BINARY);
        memcachedServer.start();

        MemcachedClient client =
                new MemcachedClient(
                        new BinaryConnectionFactory(),
                        AddrUtil.getAddresses("localhost:11211")
                );

        try {
            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            TimeUnit.SECONDS.sleep(5L);

            assertThat(client.get("key1")).isNull();

            client.set("key2", 3, "value2").get();  // Future
            assertThat(client.get("key2")).isEqualTo("value2");
            client.delete("key2").get();  // Future
            assertThat(client.get("key2")).isNull();
        } finally {
            client.shutdown();
            memcachedServer.shutdown();
        }
    }

また、 Gemcached が使用する Region は、「 gemcached 」という名前となっています。
https://69

「gemcached」Region が存在しない場合は Partition Region として作成されるようになっています。
https://java#L144-L153

よって、「 gemcached」Region をカスタマイズしたい場合は、 cache.xml に「 gemcached」Region の定義をすることに
なるでしょう。

例えば、こんな感じに。

<?xml version="1.0" encoding="UTF-8"?>
<cache
        xmlns="http://geode.apache.org/schema/cache"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
        version="1.0">
    <region name="gemcached" refid="PARTITION_REDUNDANT"/>
</cache>

特に設定ファイルを与える方法がないので、「 cache.xml 」のファイル名を指定させるにはシステムプロパティを
使うことになりますねぇ … 。

「gemcached」Region に、データが入っていることを確認してみましょう。

    @Test
    public void underlyingRegion() throws IOException, InterruptedException, ExecutionException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

        MemcachedClient client = new MemcachedClient(new InetSocketAddress("localhost", 11211));

        try {
            client.set("key1", 3, "value1").get();  // Future
            assertThat(client.get("key1")).isEqualTo("value1");

            Cache cache = CacheFactory.getAnyInstance();
            Region<String, ValueWrapper> region = cache.getRegion("gemcached");
            assertThat(new String(region.get("key1").getValue(), StandardCharsets.UTF_8))
                    .isEqualTo("value1");
        } finally {
            client.shutdown();
            memcachedServer.shutdown();
        }
    }

確かに「 gemcached」Region にデータが入っているようです。

Memcached(Gemcached) については、こんな感じです。

Redis(Redis Adapter)

続いて、 Redis Adapter へ。

Redis Adapter を使用するには、依存関係として Spring Shell が必要になります。というわけで、依存関係を追加します。

        <!-- GeodeRedisServer only -->
        <dependency>
          <groupId>org.springframework.shell</groupId>
          <artifactId>spring-shell</artifactId>
          <version>1.2.0.RELEASE</version>
          <exclusions>
            <exclusion>
              <artifactId>cglib</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>asm</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>spring-aop</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>guava</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>aopalliance</artifactId>
              <groupId>*</groupId>
            </exclusion>
            <exclusion>
              <artifactId>spring-context-support</artifactId>
              <groupId>*</groupId>
            </exclusion>
          </exclusions>
        </dependency>

いろいろ exclude してあるのは、 Apache Geode 側の設定そのままに倣いました。
https://github.com/apache/geode/blob/develop/geode-core/build.gradle#L110-L118

テストコードの雛形としては、こんな感じに。
src/test/java/org/littlewings/geode/embedded/EmbeddedRedisServerTest.java

package org.littlewings.geode.embedded;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.redis.GeodeRedisServer;
import org.apache.geode.redis.internal.ByteArrayWrapper;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class EmbeddedRedisServerTest {
    // ここに、テストを書く!
}

Redis へのアクセスには、 Lettuce を使用します。

Lettuce

Redis Adapter を組み込みとして使ったコードは、こんな感じになります。

    @Test
    public void gettingStarted() throws InterruptedException {
        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");

            syncCommands.set("key2", "value2,", SetArgs.Builder.ex(3));
            TimeUnit.SECONDS.sleep(5L);
            assertThat(syncCommands.get("key2")).isNull();
        } finally {
            connection.close();
            redisClient.shutdown();
            redisServer.shutdown();
        }
    }

※StatefulRedisConnection のみ Closeable なのですが、他のクラスと合わせた形で書きました …

Gemcached と似た感じですね。 GeodeRedisServer というクラスを使用します。

        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

コンストラクタには、リッスンポートのみ、バインドするアドレスとポート、バインドするアドレスとポートに加えてログレベルを指定できる、
3 つのコンストラクタがあります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L301-L364

今回は、バインドするアドレスとポートを指定。そして、 GeodeRedisServer#start を呼び出します。

        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

ポートについては負の値を指定すると、デフォルトのポートとして 6379 を取ります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L156

利用が終わったら、 GeodeRedisServer#shutdown を呼び出します。

            redisServer.shutdown();

アクセス自体は、 Lettuce を使って確認。こちらも expire などは良さそうですね。

        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");

            syncCommands.set("key2", "value2,", SetArgs.Builder.ex(3));
            TimeUnit.SECONDS.sleep(5L);
            assertThat(syncCommands.get("key2")).isNull();
        } finally {
            connection.close();
            redisClient.shutdown();
            redisServer.shutdown();
        }

GeodeRedisServer に保存されたデータは、こちらも内部的には Region として保持されますが、いくつか種類があります。

とりあえず、 String として保存される Region を確認してみましょう。

    @Test
    public void underlyging() throws InterruptedException {
        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");

            syncCommands.set("key2", "value2", SetArgs.Builder.ex(3));
            assertThat(syncCommands.get("key2")).isEqualTo("value2");

            Cache cache = CacheFactory.getAnyInstance();
            Region<ByteArrayWrapper, ByteArrayWrapper> stringsRegion = cache.getRegion("ReDiS_StRiNgS");
            assertThat(stringsRegion.get(new ByteArrayWrapper("key1".getBytes(StandardCharsets.UTF_8))))
                    .isEqualTo(new ByteArrayWrapper("value1".getBytes(StandardCharsets.UTF_8)));
            assertThat(stringsRegion.get(new ByteArrayWrapper("key2".getBytes(StandardCharsets.UTF_8))))
                    .isEqualTo(new ByteArrayWrapper("value2".getBytes(StandardCharsets.UTF_8)));
        } finally {
            connection.close();
            redisClient.shutdown();
            redisServer.shutdown();
        }
    }

「ReDiS_StRiNgS 」という Region に、データが保存されています。

RegionはString、HyperLogLogs 、メタデータの 3 種類を使用します。

StringとHyperLogLogs 用の Region は、デフォルトで Partition Region となります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L270

変更したい場合は、システムプロパティ「 gemfireredis.regiontype 」で指定するか、あらかじめ Region を定義しておきましょう。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L425-L434

メタデータ用の Region は、デフォルトで Replicate Region となります。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L435-L442

こちらも、変更したい場合はあらかじめ Region を定義しておきましょう。

それぞれの Region の名前は、けっこうすごいことになっていますが … ソースコードを見ましょう。
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L223-L239

Gemcached と同様、 GeodeRedisServerにCacheXML 自体を指定する手段はなさそうなので、 CacheXML 名を変えたい場合はシステムプロパティを
使うことになりますね。

注意点

とまあ、こんな感じに使えそうな Redis Adapter ですが、注意点もあるようです。

How the Redis Adapter Works

サポートしている Redis のコマンドは、 GeodeRedisServerのJavadoc を確認しましょう。

GeodeRedisServer (Apache Geode 1.4.0)

とはいえ、ドキュメントに

The Geode Redis Adapter supports all Redis commands for each of the Redis data structures. (See the Javadocs for the GemFireRedisServer class for a detailed list.)

http://geode.apache.org/docs/guide/12/tools_modules/redis_adapter.html#how-the-redis-adapter-wo

とあるので、ふつうに使う分には困らないのではないかと。

ただ、いくつか Redis と異なる振る舞いをするケースがあるようです。

SetとWATCH/UNWATCH 以外については、 Node をまたがった場合に Redis と動きが異なる感じになるみたいですね。
このあたりが許容できるのであれば、 Redis AdapterをRedis の代わりとして使えるのでしょう。

一緒に使う

で、ここまで書いていると、 GemcachedとRedis Adapter は別々に使うことになるというか、同時に使用できない ? というような気もしますが
( 両方とも、 Server#start しますし ) 、そんなことはなさそうです。

両方合わせて使ってみました。
src/test/java/org/littlewings/geode/embedded/EmbeddedMemcachedRedisServerTest.java

package org.littlewings.geode.embedded;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import net.spy.memcached.MemcachedClient;
import org.apache.geode.memcached.GemFireMemcachedServer;
import org.apache.geode.redis.GeodeRedisServer;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class EmbeddedMemcachedRedisServerTest {
    @Test
    public void mixedServer() throws IOException, ExecutionException, InterruptedException {
        GemFireMemcachedServer memcachedServer =
                new GemFireMemcachedServer("localhost", 11211, GemFireMemcachedServer.Protocol.ASCII);
        memcachedServer.start();

        GeodeRedisServer redisServer = new GeodeRedisServer("localhost", 6379);
        redisServer.start();

        MemcachedClient memcachedClient = new MemcachedClient(new InetSocketAddress("localhost", 11211));
        RedisClient redisClient = RedisClient.create("redis://@localhost:6379/0");
        StatefulRedisConnection<String, String> connection = redisClient.connect();

        try {
            memcachedClient.set("key1", 3, "value1").get();  // Future
            assertThat(memcachedClient.get("key1")).isEqualTo("value1");

            RedisCommands<String, String> syncCommands = connection.sync();

            syncCommands.set("key1", "value1");
            assertThat(syncCommands.get("key1")).isEqualTo("value1");
        } finally {
            redisClient.shutdown();
            memcachedClient.shutdown();
            redisClient.shutdown();
            memcachedServer.shutdown();
        }
    }
}

どちらの Server も、 Apache GeodeのCache がなければ作成し、あればそちらを使用するように実装されているからです。
https://167-L171
https://github.com/apache/geode/blob/develop/geode-core/src/main/java/org/apache/geode/redis/GeodeRedisServer.java#L401-L412

Apache Geode 自体は、ひとつの JavaVM 上で複数の Cache を作成することは許容していません。

なので、こういう実装になっていればうまくいきますよね、と。

まとめ

Apache Geode が提供する、 Memcached および RedisのServer としての機能を、 Java アプリケーションに組み込んで使ってみました。

内部のコードも少し追いつつ、各クライアントライブラリで確認できたので、まあ良しとしましょう。

こうやって組み込んで使えるのは便利ですが、ちょっと依存ライブラリが多いのが難点でしょうかねぇ … 。

## 「mvn depencency:tree」のApache Geode+Spring Shellの部分の結果
[INFO] +- org.apache.geode:geode-core:jar:1.2.1:compile
[INFO] |  +- com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
[INFO] |  +- org.jgroups:jgroups:jar:3.6.10.Final:compile
[INFO] |  +- antlr:antlr:jar:2.7.7:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.8.6:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.8.6:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.8.6:compile
[INFO] |  +- commons-io:commons-io:jar:2.5:compile
[INFO] |  +- commons-lang:commons-lang:jar:2.6:compile
[INFO] |  +- it.unimi.dsi:fastutil:jar:7.1.0:compile
[INFO] |  +- javax.resource:javax.resource-api:jar:1.7:compile
[INFO] |  |  \- javax.transaction:javax.transaction-api:jar:1.2:compile
[INFO] |  +- net.java.dev.jna:jna:jar:4.0.0:compile
[INFO] |  +- net.sf.jopt-simple:jopt-simple:jar:5.0.3:compile
[INFO] |  +- org.apache.logging.log4j:log4j-api:jar:2.7:compile
[INFO] |  +- org.apache.logging.log4j:log4j-core:jar:2.7:compile
[INFO] |  +- org.apache.shiro:shiro-core:jar:1.3.2:compile
[INFO] |  |  \- org.slf4j:slf4j-api:jar:1.6.4:compile
[INFO] |  +- commons-beanutils:commons-beanutils:jar:1.9.3:compile
[INFO] |  |  +- commons-logging:commons-logging:jar:1.2:compile
[INFO] |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] |  +- io.github.lukehutch:fast-classpath-scanner:jar:2.0.11:compile
[INFO] |  +- org.apache.geode:geode-common:jar:1.2.1:compile
[INFO] |  \- org.apache.geode:geode-json:jar:1.2.1:compile
[INFO] +- org.springframework.shell:spring-shell:jar:1.2.0.RELEASE:compile
[INFO] |  +- jline:jline:jar:2.12:compile
[INFO] |  \- org.springframework:spring-core:jar:4.2.4.RELEASE:compile

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/Kazuhira/20171001/1506866676