DynamoDB - query

クエリの操作 - Amazon DynamoDB には、 "A single Query will only return a result set that fits within the 1 MB size limit." 「1 つの Query は、1 MB のサイズ制限の範囲内の結果セットだけを返します。」と書かれています。

query を実行したとき返される Result の getItems と、withSelect("COUNT") を使用した場合のように getItems のない件数取得のみの場合とではどんな違いがあるのだろうと思い、DynamoDB ローカルと東京リージョンの DynamoDB とで 同じコードを実行して試してみました。

その結果、withSelect("COUNT") の場合でも、結果セットを取得する場合でも同じ件数で分割されて返されることがわかりました。

試したコードを下に記載します。なお、DynamoDB ローカルと AWS 上の DynamoDB では分割される件数は差異がありました。

package jp.pigumer.dynamodb

import java.util.UUID

import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.dynamodbv2.model._

import scala.util.Try

case class Data(hashKey: String,
                rangeKey: String,
                rangeKey2: String) {
  val item = new Item()
    .withPrimaryKey("HashKey", hashKey,
      "RangeKey", rangeKey)
    .withString("RangeKey2", rangeKey2)
}

object TestTable {

  val hashKeyValue = "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"

  def deleteTable(dynamoDB: DynamoDB) = {
    dynamoDB.getTable("TEST1").delete()
  }

  def createTable(dynamoDB: DynamoDB) = {
    val request = new CreateTableRequest()
      .withTableName("TEST1")
      .withAttributeDefinitions(
        new AttributeDefinition()
          .withAttributeName("HashKey")
          .withAttributeType("S"),
        new AttributeDefinition()
          .withAttributeName("RangeKey")
          .withAttributeType("S"),
        new AttributeDefinition()
          .withAttributeName("RangeKey2")
          .withAttributeType("S")
      )
      .withKeySchema(
        new KeySchemaElement()
          .withAttributeName("HashKey")
          .withKeyType("HASH"),
        new KeySchemaElement()
          .withAttributeName("RangeKey")
          .withKeyType("RANGE")
      )
      .withProvisionedThroughput(
        new ProvisionedThroughput()
          .withReadCapacityUnits(1L)
          .withWriteCapacityUnits(1L)
      )
      .withGlobalSecondaryIndexes(
        new GlobalSecondaryIndex()
          .withIndexName("GlobalIndex")
          .withKeySchema(
            new KeySchemaElement()
              .withAttributeName("HashKey")
              .withKeyType("HASH"),
            new KeySchemaElement()
              .withAttributeName("RangeKey2")
              .withKeyType("RANGE")
          )
          .withProjection(
            new Projection().withProjectionType("ALL")
          )
          .withProvisionedThroughput(
            new ProvisionedThroughput()
              .withReadCapacityUnits(1L)
              .withWriteCapacityUnits(1L)
          )
      )
    val result: TableDescription = dynamoDB.createTable(request).waitForActive()
    println(result)
  }

  def saveTestData(dynamoDB: DynamoDB) = Try {
    val table = dynamoDB.getTable("TEST1")
    for (_ <- 1 to 1000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").item)
    }
    for (_ <- 1 to 40000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC").item)
    }
    for (_ <- 1 to 10000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB").item)
    }
  }.get

}
package jp.pigumer.dynamodb

import java.util.logging.Logger

import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, QueryRequest, QueryResult}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import org.specs2.mutable.Specification
import org.specs2.specification.BeforeAfterAll

import scala.collection.JavaConverters._

class DynamoDBSpec extends Specification with BeforeAfterAll {

  val logger = Logger.getLogger(this.getClass.getName)

  lazy val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder
    .standard()
    .withEndpointConfiguration(
      new AwsClientBuilder.EndpointConfiguration("http://localhost:8000", "ap-northeast-1"))
    .build()
  lazy val dynamoDB = new DynamoDB(client)

  private def query(request: QueryRequest): Iterator[QueryResult] =
    new Iterator[QueryResult] {
      var result: Option[QueryResult] = None
      override def hasNext: Boolean = result.fold(true)(
        queryResult ⇒ null != queryResult.getLastEvaluatedKey
      )
      override def next(): QueryResult =
      result match {
        case Some(previous) ⇒
          result = Some(client.query(request.withExclusiveStartKey(previous.getLastEvaluatedKey())))
          result.get
        case None ⇒
          result = Some(client.query(request))
          result.get
      }
    }

  "Test" should {
    "Count" in {
      logger.info("start")
      val request = new QueryRequest()
        .withTableName("TEST1")
        .withIndexName("GlobalIndex")
        .withSelect("COUNT")
        .withKeyConditionExpression("#H = :H and #R = :R")
        .withExpressionAttributeNames(Map("#H""HashKey",
          "#R""RangeKey2").asJava)
        .withExpressionAttributeValues(Map(":H"new AttributeValue().withS(TestTable.hashKeyValue),
          ":R"new AttributeValue().withS("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")).asJava)

      val list = query(request).toList
      logger.info(s"list size: ${list.length}")
      val result = list(0)
      logger.info(s"completed: ${result.getCount}")
      result.getCount must_== 8457

      val result2 = list(1)
      logger.info(s"completed2: ${result2.getCount}")
      result2.getCount must_== 1543
    }

    "Page" in {
      logger.info("start")
      val queryRequest = new QueryRequest()
        .withTableName("TEST1")
        .withIndexName("GlobalIndex")
        .withKeyConditionExpression("#H = :H and #R = :R")
        .withExpressionAttributeNames(Map("#H""HashKey",
        "#R""RangeKey2").asJava)
        .withExpressionAttributeValues( Map(":H"new AttributeValue().withS(TestTable.hashKeyValue),
          ":R"new AttributeValue().withS("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC")).asJava)
      val list = query(queryRequest).toList
      val s = list(0).getItems().size
      logger.info(s"size: $s")
      s must_== 8457
    }
  }

  override def beforeAll: Unit = {
    TestTable.createTable(dynamoDB)
    TestTable.saveTestData(dynamoDB)
  }

  override def afterAll: Unit = {
    TestTable.deleteTable(dynamoDB)
  }
}