Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions rust/perspective-python/perspective/tests/table/test_leaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,54 @@ def test_table_delete_with_view(self, sentinel):
tbl.delete()
mem2 = process.memory_info().rss
assert (mem2 - mem) < 2000000


class TestExpressionVocab(object):
# Regression tests for the `t_expression_vocab` leak fixed in PR #3175.
# Pre-fix, updates against a view with a string-producing expression
# interned each result into a new 4KB vocab page once the prior page
# filled, and never deduplicated against older pages. RSS grew without
# bound. The fix adds cross-page deduplication; these tests assert the
# steady state is flat.

def test_string_expression_update_no_leak(self):
long_literal = "X" * 256
tbl = Table({"x": "integer", "c": "string"}, index="x")
view = tbl.view(expressions={"e": 'concat("c", \'' + long_literal + "')"})

for _ in range(100):
tbl.update([{"x": 1, "c": "value"}])

process = psutil.Process(os.getpid())
mem = process.memory_info().rss
for _ in range(5000):
tbl.update([{"x": 1, "c": "value"}])
mem2 = process.memory_info().rss

view.delete()
tbl.delete()

assert (mem2 - mem) < 2000000

def test_string_expression_update_bounded_vocab_no_leak(self):
# Cycles through a small fixed set of distinct values. Exercises the
# cross-page `string_exists` lookup the fix relies on: once the
# vocabulary is fully populated, subsequent interns must resolve to
# existing pointers rather than allocating new ones.
values = ["alpha", "bravo", "charlie", "delta", "echo"]
tbl = Table({"x": "integer", "c": "string"}, index="x")
view = tbl.view(expressions={"e": 'upper("c")'})

for i in range(100):
tbl.update([{"x": 1, "c": values[i % len(values)]}])

process = psutil.Process(os.getpid())
mem = process.memory_info().rss
for i in range(5000):
tbl.update([{"x": 1, "c": values[i % len(values)]}])
mem2 = process.memory_info().rss

view.delete()
tbl.delete()

assert (mem2 - mem) < 2000000
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
namespace perspective {

t_expression_vocab::t_expression_vocab() {
// Allocate 4096 bytes per page
m_max_vocab_size = 64 * 64;
// Pre-reserve each page to its full size so that `intern()` never
// triggers a realloc inside `t_vocab`; previously-returned `const char*`
// pointers must remain valid for the lifetime of the vocab.
m_max_vocab_size = 16 * 64 * 64;

// Always start with one vocab
allocate_new_vocab();
Expand All @@ -25,14 +27,22 @@ t_expression_vocab::t_expression_vocab() {
const char*
t_expression_vocab::intern(const char* str) {
std::size_t bytelength = strlen(str);
std::size_t hash = boost::hash_range(str, str + bytelength);
t_uindex existing_idx;

for (auto& current_vocab : m_vocabs) {
if (current_vocab.string_exists(str, hash, existing_idx)) {
return current_vocab.unintern_c(existing_idx);
}
}

if (m_current_vocab_size + bytelength + 1 > m_max_vocab_size) {
allocate_new_vocab();
}

t_vocab& current_vocab = m_vocabs.front();
m_current_vocab_size += bytelength + 1;
t_vocab& current_vocab = m_vocabs[0];
t_uindex interned_idx = current_vocab.get_interned(str);
t_uindex interned_idx = current_vocab.get_interned(str, bytelength, hash);
return current_vocab.unintern_c(interned_idx);
}

Expand Down Expand Up @@ -63,7 +73,7 @@ void
t_expression_vocab::allocate_new_vocab() {
t_vocab vocab;
vocab.init(false);
vocab.reserve(m_max_vocab_size, 64);
vocab.reserve(m_max_vocab_size, m_max_vocab_size / 64);
m_vocabs.insert(m_vocabs.begin(), std::move(vocab));
m_current_vocab_size = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,12 +877,6 @@ t_stree::update_shape_from_static(const t_dtree_ctx& ctx) {
if (iter == m_nodes->get<by_pidx_hash>().end()) {
// create node and enqueue
sptidx = genidx();
t_uindex aggsize = m_aggregates->size();
if (sptidx == aggsize) {
double scale = 1.3;
t_uindex new_size = scale * aggsize;
m_aggregates->extend(new_size);
}

t_uindex dst_ridx = gen_aggidx();

Expand Down Expand Up @@ -1105,7 +1099,7 @@ t_stree::gen_aggidx() {
t_uindex rval = m_cur_aggidx;
++m_cur_aggidx;
if (rval >= cur_cap) {
double nrows = ceil(.3 * double(rval));
double nrows = ceil(1.3 * double(rval));
m_aggregates->extend(static_cast<t_uindex>(nrows));
}

Expand Down
26 changes: 24 additions & 2 deletions rust/perspective-server/cpp/perspective/src/cpp/vocab.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,40 @@ t_vocab::string_exists(const char* c, t_uindex& interned) const {
return true;
}

bool
t_vocab::string_exists(
const char* c, std::size_t hash, t_uindex& interned
) const {
auto iter = m_map.find(c, hash);

if (iter == m_map.end()) {
return false;
}

interned = iter->second;
return true;
}

t_uindex
t_vocab::get_interned(const char* s) {
std::size_t bytelength = strlen(s);
return get_interned(s, bytelength, boost::hash_range(s, s + bytelength));
}

t_uindex
t_vocab::get_interned(
const char* s, std::size_t bytelength, std::size_t hash
) {
#ifdef PSP_COLUMN_VERIFY
PSP_VERBOSE_ASSERT(s != 0, "Null string");
#endif

t_sidxmap::iterator iter = m_map.find(s);
t_sidxmap::iterator iter = m_map.find(s, hash);

t_uindex idx;
t_uindex bidx;
t_uindex eidx;
t_uindex len = strlen(s) + 1;
t_uindex len = bytelength + 1;

if (iter == m_map.end()) {
idx = genidx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ class PERSPECTIVE_EXPORT t_vocab {

t_uindex get_interned(const std::string& s);
t_uindex get_interned(const char* s);
t_uindex
get_interned(const char* s, std::size_t bytelength, std::size_t hash);
void copy_vocabulary(const t_vocab& other);
const char* unintern_c(t_uindex idx) const;

bool string_exists(const char* c, t_uindex& interned) const;
bool string_exists(
const char* c, std::size_t hash, t_uindex& interned
) const;

void reserve(size_t total_string_size, size_t string_count);

Expand Down
Loading