Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2972c4d
Update store link
May 30, 2018
ff3cca0
fix for SNAP-2365
May 30, 2018
608bf3d
[SNAP-2366] row buffer fault-in, forced rollover, merge small batches
May 31, 2018
cd21c4d
update store link
May 31, 2018
00f6cda
Some optimizations and fixed few issues
Jun 4, 2018
a1dadd9
minor change
Jun 4, 2018
cf1eb84
Fixing precheckin failure in SNAP-2365 and and adding similar fix for…
Jun 5, 2018
215c6e3
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jun 6, 2018
7f69db0
Merge remote-tracking branch 'origin/SNAP-2365' into SNAP-2366
Jun 6, 2018
9595f20
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jun 8, 2018
88045eb
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jun 28, 2018
c858273
coarse container locking to fix rollover/merge when running in parall…
Jun 28, 2018
0ea9eb9
temp
Jun 28, 2018
dbdbcb7
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jul 10, 2018
c824cd6
minor formatting change
Jul 10, 2018
bf26063
fixes
Aug 2, 2018
f423108
fix a ClassCast
Aug 2, 2018
59588c0
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 2, 2018
6ec25d0
fixing putInto which clears context prematurely
Aug 3, 2018
4905afc
fix COMMIT call
Aug 3, 2018
fa3f639
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 3, 2018
778bb4b
update store link
Aug 3, 2018
647d243
updates and fixes
Aug 6, 2018
86f23d9
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 10, 2018
94c45d6
fix build issues after master merge
Aug 10, 2018
4fb08bf
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 31, 2018
e7960a9
minor cleanups
Aug 31, 2018
f2be757
Merge remote-tracking branch 'origin/master' into SNAP-2366
Nov 2, 2018
872bd8b
Merge remote-tracking branch 'origin/master' into SNAP-2366
Nov 2, 2018
ce7fac9
minor updates to tests
Nov 2, 2018
08a6956
Merge remote-tracking branch 'origin/master' into SNAP-2366
Dec 22, 2018
772c4af
Merge remote-tracking branch 'origin/master' into SNAP-2366
Dec 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ class SplitSnappyClusterDUnitTest(s: String)
// added in SNAP-2012
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = true
try {
ColumnUpdateDeleteTests.testBasicUpdate(session)
ColumnUpdateDeleteTests.testDeltaStats(session)
ColumnUpdateDeleteTests.testBasicDelete(session)
ColumnUpdateDeleteTests.testSNAP1925(session)
ColumnUpdateDeleteTests.testSNAP1926(session)
ColumnUpdateDeleteTests.testConcurrentOps(session)
ColumnUpdateDeleteTests.testSNAP2124(session)
ColumnUpdateDeleteTests.testBasicUpdate(session, redundancy = 1)
ColumnUpdateDeleteTests.testDeltaStats(session, redundancy = 1)
ColumnUpdateDeleteTests.testBasicDelete(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1925(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1926(session, redundancy = 1)
ColumnUpdateDeleteTests.testConcurrentOps(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP2124(session, redundancy = 1)
} finally {
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ import org.apache.spark.sql.execution.columnar.impl.ColumnFormatRelation
class SnappyStorageEvictor extends Logging {

private def getAllRegionList(offHeap: Boolean,
hasOffHeap: Boolean): ArrayBuffer[LocalRegion] = {
hasOffHeap: Boolean, onlyRowBuffers: Boolean): ArrayBuffer[LocalRegion] = {
val cache = GemFireCacheImpl.getExisting
val allRegionList = new ArrayBuffer[LocalRegion]()
val irm: InternalResourceManager = cache.getResourceManager
for (listener <- irm.getResourceListeners(
SnappyStorageEvictor.resourceType).asScala) listener match {
case pr: PartitionedRegion =>
if (includePartitionedRegion(pr, offHeap, hasOffHeap)) {
if (includePartitionedRegion(pr, offHeap, hasOffHeap, onlyRowBuffers)) {
allRegionList ++= pr.getDataStore.getAllLocalBucketRegions.asScala
}
// no off-heap local regions yet in SnappyData
case lr: LocalRegion =>
if (!offHeap && includeLocalRegion(lr)) {
if (!offHeap && !onlyRowBuffers && includeLocalRegion(lr)) {
allRegionList += lr
}
case _ =>
Expand All @@ -64,7 +64,8 @@ class SnappyStorageEvictor extends Logging {
}

@throws(classOf[Exception])
def evictRegionData(bytesRequired: Long, offHeap: Boolean): Long = {
def evictRegionData(bytesRequired: Long, offHeap: Boolean,
onlyRowBuffers: Boolean = false): Long = {
val cache = GemFireCacheImpl.getInstance()
if (cache eq null) return 0L

Expand All @@ -76,7 +77,7 @@ class SnappyStorageEvictor extends Logging {
val stats = cache.getCachePerfStats
stats.incEvictorJobsStarted()
var totalBytesEvicted: Long = 0
val regionSet = Random.shuffle(getAllRegionList(offHeap, hasOffHeap))
val regionSet = Random.shuffle(getAllRegionList(offHeap, hasOffHeap, onlyRowBuffers))
val start = CachePerfStats.getStatTime
try {
while (regionSet.nonEmpty) {
Expand Down Expand Up @@ -119,19 +120,26 @@ class SnappyStorageEvictor extends Logging {
}
stats.incEvictorJobsCompleted()
}
totalBytesEvicted
// evict row-buffers as the last resort
if (!onlyRowBuffers && !offHeap && totalBytesEvicted < bytesRequired) {
totalBytesEvicted + evictRegionData(bytesRequired - totalBytesEvicted,
offHeap, onlyRowBuffers = true)
} else totalBytesEvicted
}

protected def includePartitionedRegion(region: PartitionedRegion,
offHeap: Boolean, hasOffHeap: Boolean): Boolean = {
val hasLRU = (region.getEvictionAttributes.getAlgorithm.isLRUHeap
&& (region.getDataStore != null)
&& !region.getAttributes.getEnableOffHeapMemory && !region.isRowBuffer())
offHeap: Boolean, hasOffHeap: Boolean, onlyRowBuffers: Boolean): Boolean = {
var hasLRU = (region.getEvictionAttributes.getAlgorithm.isLRUHeap
&& (region.getDataStore != null)
&& !region.getAttributes.getEnableOffHeapMemory)
val isRowBuffer = region.isRowBuffer
if (onlyRowBuffers) return hasLRU && isRowBuffer

hasLRU &&= !isRowBuffer
if (hasOffHeap) {
// when off-heap is enabled then all column tables use off-heap
val regionPath = Misc.getFullTableNameFromRegionPath(region.getFullPath)
if (offHeap) hasLRU && ColumnFormatRelation.isColumnTable(regionPath)
else hasLRU && !ColumnFormatRelation.isColumnTable(regionPath)
hasLRU && offHeap == ColumnFormatRelation.isColumnTable(
Misc.getFullTableNameFromRegionPath(region.getFullPath))
} else {
assert(!offHeap,
"unexpected invocation for hasOffHeap=false and offHeap=true")
Expand All @@ -141,7 +149,7 @@ class SnappyStorageEvictor extends Logging {

protected def includeLocalRegion(region: LocalRegion): Boolean = {
(region.getEvictionAttributes.getAlgorithm.isLRUHeap
&& !region.getAttributes.getEnableOffHeapMemory)
&& !region.getAttributes.getEnableOffHeapMemory)
}
}

Expand Down
10 changes: 9 additions & 1 deletion cluster/src/test/scala/io/snappydata/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,19 @@ class QueryTest extends SnappyFunSuite {

val query = "select k, v from t1 inner join t2 where t1.id = t2.k order by k, v"
val df = session.sql(query)
val result1 = df.collect().mkString(" ")
var result1 = df.collect().mkString(" ")
val result2 = spark.sql(query).collect().mkString(" ")
if (result1 != result2) {
fail(s"Expected result: $result2\nGot: $result1")
}

// force run stats so that small batches have been merged repeatedly
SnappyEmbeddedTableStatsProviderService.publishColumnTableRowCountStats()
Thread.sleep(10000)
result1 = df.collect().mkString(" ")
if (result1 != result2) {
fail(s"Expected result: $result2\nGot: $result1")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ trait SplitClusterDUnitTestBase extends Logging {
// using random bucket assignment for cases like SNAP-2175
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = true
try {
ColumnUpdateDeleteTests.testBasicUpdate(session)
ColumnUpdateDeleteTests.testDeltaStats(session)
ColumnUpdateDeleteTests.testBasicDelete(session)
ColumnUpdateDeleteTests.testSNAP1925(session)
ColumnUpdateDeleteTests.testSNAP1926(session)
ColumnUpdateDeleteTests.testConcurrentOps(session)
ColumnUpdateDeleteTests.testSNAP2124(session)
ColumnUpdateDeleteTests.testBasicUpdate(session, redundancy = 1)
ColumnUpdateDeleteTests.testDeltaStats(session, redundancy = 1)
ColumnUpdateDeleteTests.testBasicDelete(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1925(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1926(session, redundancy = 1)
ColumnUpdateDeleteTests.testConcurrentOps(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP2124(session, redundancy = 1)
} finally {
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = false
}
Expand Down
Loading