Skip to content

Commit 6dd111c

Browse files
authored
GEOMESA-3565 FSDS - Add step to datetime partition scheme (#3525)
1 parent 9ac21ac commit 6dd111c

9 files changed

Lines changed: 121 additions & 36 deletions

File tree

docs/user/filesystem/partition_schemes.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The following options are supported:
2626

2727
* ``attribute`` - The name of a ``Date``\ -type attribute from the SimpleFeatureType to use. If not specified, the default
2828
date attribute is used.
29+
* ``step`` - The number of time units (hours, days, etc) to include in each partition.
2930

3031
Spatial Schemes
3132
---------------

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/AttributeScheme.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,7 @@ object AttributeScheme {
141141
if (opts.name != Name) { None } else {
142142
val attribute = opts.getSingle("attribute").orNull
143143
require(attribute != null, s"Attribute scheme requires an attribute to be specified with 'attribute=<attribute>'")
144-
val index = sft.indexOf(attribute)
145-
require(index != -1, s"Attribute '$attribute' does not exist in schema '${sft.getTypeName}'")
144+
val index = attributeIndex(sft, attribute)
146145
val binding = sft.getDescriptor(index).getType.getBinding
147146
require(AttributeIndexKey.encodable(binding), s"Invalid type binding '${binding.getName}' of attribute '$attribute'")
148147

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/DateTimeScheme.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,16 @@ import java.time.temporal.ChronoUnit
2222
import java.time.{Instant, ZoneOffset, ZonedDateTime}
2323
import java.util.{Date, Locale}
2424

25-
case class DateTimeScheme(
26-
dtg: String,
27-
dtgIndex: Int,
28-
unit: ChronoUnit,
29-
) extends PartitionScheme {
25+
case class DateTimeScheme(dtg: String, dtgIndex: Int, unit: ChronoUnit, step: Int = 1) extends PartitionScheme {
3026

3127
import FilterHelper.ff
3228

3329
private val encoder = LexiTypeEncoders.integerEncoder()
3430

35-
override val name: String = s"${unit.name().toLowerCase(Locale.US)}:attribute=$dtg"
31+
override val name: String = {
32+
val stepOpt = if (step == 1) { "" } else { s":step=$step"}
33+
s"${unit.name().toLowerCase(Locale.US)}:attribute=$dtg$stepOpt"
34+
}
3635

3736
override def getPartition(feature: SimpleFeature): PartitionKey = {
3837
val instant = feature.getAttribute(dtgIndex).asInstanceOf[Date].toInstant
@@ -65,16 +64,13 @@ case class DateTimeScheme(
6564
}
6665

6766
override def getCoveringFilter(partition: PartitionKey): Filter = {
68-
val offset = encoder.decode(partition.value)
67+
val offset = encoder.decode(partition.value) * step
6968
val start = DateTimeScheme.Epoch.plus(offset.longValue(), unit)
70-
val end = ff.literal(DateParsing.format(start.plus(1, unit)))
69+
val end = ff.literal(DateParsing.format(start.plus(step, unit)))
7170
ff.and(ff.greaterOrEqual(ff.property(dtg), ff.literal(DateParsing.format(start))), ff.less(ff.property(dtg), end))
7271
}
7372

74-
private def toPartition(dt: ZonedDateTime): Int = {
75-
require(!dt.isBefore(DateTimeScheme.Epoch), s"Date exceeds minimum indexable value (${DateTimeScheme.Epoch}): $dt")
76-
unit.between(DateTimeScheme.Epoch, dt).toInt
77-
}
73+
private def toPartition(dt: ZonedDateTime): Int = unit.between(DateTimeScheme.Epoch, dt).toInt / step
7874

7975
private def getBounds(filter: Filter): Option[Seq[Bounds[ZonedDateTime]]] = {
8076
val bounds = FilterHelper.extractIntervals(filter, dtg)
@@ -107,7 +103,6 @@ object DateTimeScheme {
107103

108104
private val UnboundedUpper = "zzz"
109105

110-
// TODO allow for 2 hours, etc
111106
class DateTimePartitionSchemeFactory extends PartitionSchemeFactory {
112107
override def load(sft: SimpleFeatureType, scheme: String): Option[PartitionScheme] = {
113108
val opts = SchemeOpts(scheme)
@@ -122,10 +117,9 @@ object DateTimeScheme {
122117
unit.map { u =>
123118
val dtg = opts.getSingle("attribute").orElse(sft.getDtgField).orNull
124119
require(dtg != null, s"Date scheme requires an attribute to be specified with 'attribute=<attribute>'")
125-
val index = sft.indexOf(dtg)
126-
require(index != -1, s"Attribute '$dtg' does not exist in schema '${sft.getTypeName}'")
127-
require(classOf[Date].isAssignableFrom(sft.getDescriptor(index).getType.getBinding), s"Attribute '$dtg' is not a date")
128-
DateTimeScheme(dtg, index, u)
120+
val index = attributeIndex(sft, dtg, Some(classOf[Date]))
121+
val step = opts.getSingle("step").map(_.toInt).getOrElse(1)
122+
DateTimeScheme(dtg, index, u, step)
129123
}
130124
}
131125
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/HierarchicalDateTimeScheme.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,7 @@ object HierarchicalDateTimeScheme extends PartitionSchemeFactory {
219219
lazy val dtg = opts.opts.get(Config.DtgAttribute).orElse(sft.getDtgField).getOrElse {
220220
throw new IllegalArgumentException(s"DateTime scheme requires valid attribute '${Config.DtgAttribute}'")
221221
}
222-
lazy val dtgIndex = Some(sft.indexOf(dtg)).filter(_ != -1).getOrElse {
223-
throw new IllegalArgumentException(s"Attribute '$dtg' does not exist in feature type ${sft.getTypeName}")
224-
}
222+
lazy val dtgIndex = attributeIndex(sft, dtg, Some(classOf[Date]))
225223

226224
if (opts.name == Name) {
227225
val unit = opts.opts.get(Config.StepUnitOpt).map(c => ChronoUnit.valueOf(c.toUpperCase)).getOrElse {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/SpatialScheme.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionKey
1616
import org.locationtech.geomesa.fs.storage.api.{PartitionScheme, PartitionSchemeFactory}
1717
import org.locationtech.geomesa.utils.geotools.GeometryUtils
1818
import org.locationtech.geomesa.zorder.sfcurve.IndexRange
19+
import org.locationtech.jts.geom.Geometry
1920

2021
import java.util.regex.Pattern
22+
import scala.reflect.ClassTag
2123

2224
abstract class SpatialScheme(id: String, bits: Int, geom: String) extends PartitionScheme {
2325

@@ -69,7 +71,7 @@ object SpatialScheme {
6971

7072
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType
7173

72-
abstract class SpatialPartitionSchemeFactory(name: String) extends PartitionSchemeFactory {
74+
abstract class SpatialPartitionSchemeFactory[T <: Geometry : ClassTag](name: String) extends PartitionSchemeFactory {
7375

7476
private val namePattern: Pattern = Pattern.compile(s"$name-([0-9]+)bits?:?")
7577

@@ -80,8 +82,7 @@ object SpatialScheme {
8082
def build(resolution: Short): Option[PartitionScheme] = {
8183
val geom = opts.getSingle("attribute").orElse(Option(sft.getGeomField)).orNull
8284
require(geom != null, s"Spatial schemes requires an attribute to be specified with 'attribute=<attribute>'")
83-
val index = sft.indexOf(geom)
84-
require(index != -1, s"Attribute '$geom' does not exist in schema '${sft.getTypeName}'")
85+
val index = attributeIndex(sft, geom, Some(implicitly[ClassTag[T]].runtimeClass))
8586
Some(buildPartitionScheme(resolution, geom, index))
8687
}
8788

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/XZ2Scheme.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ object XZ2Scheme {
4141

4242
val Name = "xz2"
4343

44-
class XZ2PartitionSchemeFactory extends SpatialPartitionSchemeFactory(Name) {
44+
class XZ2PartitionSchemeFactory extends SpatialPartitionSchemeFactory[Geometry](Name) {
4545
override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme =
4646
XZ2Scheme(bits, geom, geomIndex)
4747
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/Z2Scheme.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ object Z2Scheme {
5656

5757
val Name = "z2"
5858

59-
class Z2PartitionSchemeFactory extends SpatialPartitionSchemeFactory(Name) {
59+
class Z2PartitionSchemeFactory extends SpatialPartitionSchemeFactory[Point](Name) {
6060
override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme =
6161
Z2Scheme(bits, geom, geomIndex)
6262
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/partitions/package.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.locationtech.geomesa.fs.storage.common
1010

11+
import org.geotools.api.feature.simple.SimpleFeatureType
12+
1113
import java.nio.charset.StandardCharsets
1214
import java.util.Locale
1315

@@ -17,6 +19,15 @@ package object partitions {
1719
// note: we use 1 instead of 0 b/c 0 is not a valid char in postgres so breaks jdbc metadata filtering
1820
val ZeroChar = new String(Array[Byte](1), StandardCharsets.UTF_8)
1921

22+
private[partitions] def attributeIndex(sft: SimpleFeatureType, name: String, binding: Option[Class[_]] = None): Int = {
23+
val index = sft.indexOf(name)
24+
require(index != -1, s"Attribute '$name' does not exist in schema '${sft.getTypeName}'")
25+
binding.foreach { b =>
26+
require(b.isAssignableFrom(sft.getDescriptor(index).getType.getBinding), s"Attribute '$name' is not a ${b.getSimpleName}")
27+
}
28+
index
29+
}
30+
2031
case class SchemeOpts(name: String, opts: Map[String, String], multiOpts: Map[String, Seq[String]]) {
2132
def getSingle(k: String): Option[String] = {
2233
if (multiOpts.contains(k)) {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/partitions/DateTimeSchemeTest.scala

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,22 @@ class DateTimeSchemeTest extends Specification {
3636

3737
"DateTimeScheme" should {
3838

39+
"partition based on hours" >> {
40+
val ps = DateTimeScheme("dtg", 0, ChronoUnit.HOURS)
41+
val partition = ps.getPartition(sf)
42+
partition.value mustEqual "80064c8a"
43+
val hours = AttributeIndexKey.decode("integer", partition.value).asInstanceOf[Int]
44+
ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC).plusHours(hours) mustEqual truncate(date, ChronoUnit.HOURS)
45+
}
46+
47+
"partition based on hours with step" >> {
48+
val ps = DateTimeScheme("dtg", 0, ChronoUnit.HOURS, step = 2)
49+
val partition = ps.getPartition(sf)
50+
partition.value mustEqual "80032645"
51+
val hours = AttributeIndexKey.decode("integer", partition.value).asInstanceOf[Int] * 2
52+
ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC).plusHours(hours) mustEqual truncate(date, ChronoUnit.HOURS, 2)
53+
}
54+
3955
"partition based on days" >> {
4056
val ps = DateTimeScheme("dtg", 0, ChronoUnit.DAYS)
4157
val partition = ps.getPartition(sf)
@@ -44,12 +60,12 @@ class DateTimeSchemeTest extends Specification {
4460
ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC).plusDays(days) mustEqual truncate(date, ChronoUnit.DAYS)
4561
}
4662

47-
"partition based on hours" >> {
48-
val ps = DateTimeScheme("dtg", 0, ChronoUnit.HOURS)
63+
"partition based on days with step" >> {
64+
val ps = DateTimeScheme("dtg", 0, ChronoUnit.DAYS, step = 2)
4965
val partition = ps.getPartition(sf)
50-
partition.value mustEqual "80064c8a"
51-
val hours = AttributeIndexKey.decode("integer", partition.value).asInstanceOf[Int]
52-
ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC).plusHours(hours) mustEqual truncate(date, ChronoUnit.HOURS)
66+
partition.value mustEqual "80002198"
67+
val days = AttributeIndexKey.decode("integer", partition.value).asInstanceOf[Int] * 2
68+
ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC).plusDays(days) mustEqual truncate(date, ChronoUnit.DAYS, 2)
5369
}
5470

5571
"partition based on week" >> {
@@ -61,13 +77,29 @@ class DateTimeSchemeTest extends Specification {
6177
ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC).plusWeeks(weeks) mustEqual truncate(date, ChronoUnit.WEEKS)
6278
}
6379

80+
"partition based on week with step" >> {
81+
val ps = PartitionSchemeFactory.load(sft, "weekly:step=2")
82+
ps must beAnInstanceOf[DateTimeScheme]
83+
val partition = ps.getPartition(sf)
84+
partition.value mustEqual "800004cc"
85+
val weeks = AttributeIndexKey.decode("integer", partition.value).asInstanceOf[Int] * 2
86+
ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC).plusWeeks(weeks) mustEqual truncate(date, ChronoUnit.WEEKS, 2)
87+
}
88+
6489
"enumerate partition ranges" >> {
6590
val ps = DateTimeScheme("dtg", 0, ChronoUnit.HOURS)
6691
val partitions = ps.getPartitionsForFilter(ECQL.toFilter("dtg >= '2017-02-03T10:15:00Z' AND dtg < '2017-02-03T11:18:00Z'"))
6792
partitions must beSome
6893
partitions.get.map(_.value) mustEqual Seq("80064c8a", "80064c8b")
6994
}
7095

96+
"enumerate partition ranges with step" >> {
97+
val ps = DateTimeScheme("dtg", 0, ChronoUnit.HOURS, step = 2)
98+
val partitions = ps.getPartitionsForFilter(ECQL.toFilter("dtg >= '2017-02-03T10:15:00Z' AND dtg < '2017-02-03T13:18:00Z'"))
99+
partitions must beSome
100+
partitions.get.map(_.value) mustEqual Seq("80032645", "80032646")
101+
}
102+
71103
"calculate covering filters for partitions" >> {
72104
foreach(Seq(ChronoUnit.HOURS, ChronoUnit.DAYS, ChronoUnit.WEEKS, ChronoUnit.MONTHS, ChronoUnit.YEARS)) { unit =>
73105
val ps = DateTimeScheme("dtg", 0, unit)
@@ -82,6 +114,20 @@ class DateTimeSchemeTest extends Specification {
82114
}
83115
}
84116

117+
"calculate covering filters for partitions with step" >> {
118+
foreach(Seq(ChronoUnit.HOURS, ChronoUnit.DAYS, ChronoUnit.WEEKS, ChronoUnit.MONTHS, ChronoUnit.YEARS)) { unit =>
119+
val ps = DateTimeScheme("dtg", 0, unit, step = 2)
120+
val partition = ps.getPartition(sf)
121+
val covering = ps.getCoveringFilter(partition)
122+
val expected = {
123+
val start = truncate(date, unit, 2)
124+
val end = start.plus(2, unit)
125+
ECQL.toFilter(s"dtg >= '${DateParsing.format(start)}' AND dtg < '${DateParsing.format(end)}'")
126+
}
127+
decomposeAnd(covering) must containTheSameElementsAs(decomposeAnd(expected))
128+
}
129+
}
130+
85131
"calculate intersecting partitions for filters" >> {
86132
foreach(Seq(ChronoUnit.HOURS, ChronoUnit.DAYS, ChronoUnit.WEEKS, ChronoUnit.MONTHS, ChronoUnit.YEARS)) { unit =>
87133
val ps = DateTimeScheme("dtg", 0, unit)
@@ -96,8 +142,21 @@ class DateTimeSchemeTest extends Specification {
96142
}
97143
}
98144

145+
"calculate intersecting partitions for filters with step" >> {
146+
foreach(Seq(ChronoUnit.HOURS, ChronoUnit.DAYS, ChronoUnit.WEEKS, ChronoUnit.MONTHS, ChronoUnit.YEARS)) { unit =>
147+
val ps = DateTimeScheme("dtg", 0, unit, step = 2)
148+
val partition = ps.getPartition(sf)
149+
val expectedEndPartition = java.lang.Long.toHexString(java.lang.Long.parseLong(partition.value, 16) + 1)
150+
val start = truncate(date, unit, 2)
151+
val end = start.plus(2, unit)
152+
val filter = ECQL.toFilter(s"dtg >= '${DateParsing.format(start)}' AND dtg < '${DateParsing.format(end)}'")
153+
val partitions = ps.getRangesForFilter(filter).orNull
154+
partitions must not(beNull)
155+
partitions mustEqual Seq(PartitionRange(ps.name, partition.value, expectedEndPartition))
156+
}
157+
}
158+
99159
"handle edge boundaries" >> {
100-
// note: these will change when we fix simplified filters
101160
val dtScheme = DateTimeScheme("dtg", 0, ChronoUnit.DAYS)
102161
val startpoint = dtScheme.getPartition(ScalaSimpleFeature.create(sft, "1", "2017-01-02T00:00:00.000Z", "POINT (10 10)"))
103162
val endpoint = dtScheme.getPartition(ScalaSimpleFeature.create(sft, "1", "2017-01-04T00:00:00.000Z", "POINT (10 10)"))
@@ -108,7 +167,26 @@ class DateTimeSchemeTest extends Specification {
108167
twoDays.get.head.contains(startpoint.value) must beTrue
109168
twoDays.get.head.contains(endpoint.value) must beFalse
110169

111-
val inclusive = ECQL.toFilter("dtg >= '2017-01-02T00:00:00.000Z' and dtg <= '2017-01-04T00:00:00.001Z'")
170+
val inclusive = ECQL.toFilter("dtg >= '2017-01-02T00:00:00.000Z' and dtg <= '2017-01-04T00:00:00.000Z'")
171+
val threeDays = dtScheme.getRangesForFilter(inclusive)
172+
threeDays must beSome
173+
threeDays.get must haveSize(1)
174+
threeDays.get.head.contains(startpoint.value) must beTrue
175+
threeDays.get.head.contains(endpoint.value) must beTrue
176+
}
177+
178+
"handle edge boundaries with step" >> {
179+
val dtScheme = DateTimeScheme("dtg", 0, ChronoUnit.DAYS, step = 2)
180+
val startpoint = dtScheme.getPartition(ScalaSimpleFeature.create(sft, "1", "2017-01-02T00:00:00.000Z", "POINT (10 10)"))
181+
val endpoint = dtScheme.getPartition(ScalaSimpleFeature.create(sft, "1", "2017-01-04T00:00:00.000Z", "POINT (10 10)"))
182+
val exclusive = ECQL.toFilter("dtg > '2017-01-02T00:00:00.000Z' and dtg < '2017-01-04T00:00:00.000Z'")
183+
val twoDays = dtScheme.getRangesForFilter(exclusive)
184+
twoDays must beSome
185+
twoDays.get must haveSize(1)
186+
twoDays.get.head.contains(startpoint.value) must beTrue
187+
twoDays.get.head.contains(endpoint.value) must beFalse
188+
189+
val inclusive = ECQL.toFilter("dtg >= '2017-01-02T00:00:00.000Z' and dtg <= '2017-01-04T00:00:00.000Z'")
112190
val threeDays = dtScheme.getRangesForFilter(inclusive)
113191
threeDays must beSome
114192
threeDays.get must haveSize(1)
@@ -117,10 +195,13 @@ class DateTimeSchemeTest extends Specification {
117195
}
118196
}
119197

120-
private def truncate(date: ZonedDateTime, unit: ChronoUnit): ZonedDateTime = {
121-
unit match {
198+
private def truncate(date: ZonedDateTime, unit: ChronoUnit, step: Int = 1): ZonedDateTime = {
199+
val base = unit match {
122200
case ChronoUnit.HOURS | ChronoUnit.DAYS => date.truncatedTo(unit)
123201
case _ => epoch.plus(unit.between(epoch, date), unit)
124202
}
203+
if (step == 1) { base } else {
204+
base.minus(unit.between(epoch, base).toInt % step, unit)
205+
}
125206
}
126207
}

0 commit comments

Comments
 (0)