Skip to content

Commit fc7ecd1

Browse files
committed
started json_cursor functionality; some fixes to json-parser functionality
1 parent 6e0c2a0 commit fc7ecd1

8 files changed

Lines changed: 171 additions & 306 deletions

File tree

libconfluo/confluo/atomic_multilog.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "conf/configuration_params.h"
2323
#include "container/data_log.h"
2424
#include "container/cursor/record_cursors.h"
25+
#include "container/cursor/json_cursors.h"
2526
#include "container/cursor/alert_cursor.h"
2627
#include "container/monolog/monolog.h"
2728
#include "container/radix_tree.h"
@@ -252,7 +253,7 @@ class atomic_multilog {
252253
* @param json_data The json-formatted data to be stored
253254
* @return The offset of where the data is located
254255
*/
255-
size_t append_json(std::string json_data);
256+
size_t append_json(const std::string &json_data);
256257

257258
// TODO: Add a std::tuple based variant
258259
// TODO: Add a JSON based variant
@@ -333,6 +334,13 @@ class atomic_multilog {
333334
*/
334335
std::unique_ptr<record_cursor> execute_filter(const std::string &expr) const;
335336

337+
/**
338+
* Executes the filter expression
339+
* @param expr The filter expression
340+
* @return The result of applying the filter to the atomic multilog
341+
*/
342+
// std::unique_ptr<json_cursor> execute_filter_json(const std::string &expr) const;
343+
336344
// TODO: Add tests
337345
/**
338346
* Executes an aggregate
@@ -355,6 +363,17 @@ class atomic_multilog {
355363
uint64_t begin_ms,
356364
uint64_t end_ms) const;
357365

366+
/**
367+
* Queries an existing filter
368+
* @param filter_name Name of the filter
369+
* @param begin_ms Beginning of time-range in ms
370+
* @param end_ms End of time-range in ms
371+
* @return A stream containing the results of the filter
372+
*/
373+
// std::unique_ptr<json_cursor> query_filter_json(const std::string &filter_name,
374+
// uint64_t begin_ms,
375+
// uint64_t end_ms) const;
376+
358377
/**
359378
* Queries an existing filter
360379
* @param filter_name The name of the filter
@@ -366,6 +385,17 @@ class atomic_multilog {
366385
std::unique_ptr<record_cursor> query_filter(const std::string &filter_name, uint64_t begin_ms, uint64_t end_ms,
367386
const std::string &additional_filter_expr) const;
368387

388+
/**
389+
* Queries an existing filter
390+
* @param filter_name The name of the filter
391+
* @param begin_ms Beginning of time-range in ms
392+
* @param end_ms End of time-range in ms
393+
* @param additional_filter_expr Additional filter expression
394+
* @return A stream containing the results of the filter
395+
*/
396+
// std::unique_ptr<json_cursor> query_filter_json(const std::string &filter_name, uint64_t begin_ms, uint64_t end_ms,
397+
// const std::string &additional_filter_expr) const;
398+
369399
/**
370400
* Query a stored aggregate.
371401
* @param aggregate_name The name of the aggregate
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#ifndef CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_
2+
#define CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_
3+
4+
#include <unordered_set>
5+
6+
#include "batched_cursor.h"
7+
#include "offset_cursors.h"
8+
#include "schema/record.h"
9+
#include "parser/expression_compiler.h"
10+
#include "container/data_log.h"
11+
12+
namespace confluo {
13+
14+
typedef batched_cursor<std::string> json_cursor;
15+
16+
/**
17+
* A json cursor that make records into a json formatted string
18+
*/
19+
class aggregated_json_cursor : public json_cursor {
20+
public:
21+
/**
22+
* Initializes the filter record
23+
*
24+
* @param o_cursor The offset cursor
25+
* @param dlog The data log pointer
26+
* @param schema The schema
27+
* @param cexpr The filter expression
28+
* @param batch_size The number of records in the batch
29+
*/
30+
aggregated_json_cursor(std::unique_ptr<offset_cursor> o_cursor,
31+
const data_log *dlog, const schema_t *schema,
32+
const parser::compiled_expression &cexpr,
33+
size_t batch_size = 64);
34+
35+
/**
36+
* Loads the next batch from the cursor
37+
*
38+
* @return The size of the batch
39+
*/
40+
virtual size_t load_next_batch() override;
41+
42+
private:
43+
std::unique_ptr<offset_cursor> o_cursor_;
44+
const data_log *dlog_;
45+
const schema_t *schema_;
46+
const parser::compiled_expression &cexpr_;
47+
};
48+
49+
}
50+
51+
#endif /* CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_ */

libconfluo/confluo/schema/schema.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,13 @@ class schema_t {
169169
*
170170
* @return A pointer to the record data
171171
*/
172-
void *json_string_to_data(const std::string json_record) const;
172+
void *json_string_to_data(const std::string &json_record) const;
173173

174174
/**
175175
* Converts the records into a pointer to the record data
176176
*
177+
* @param ret The json string that is filled up with the data
177178
* @param record The records used for conversion
178-
*
179-
* @return A pointer to the record data
180179
*/
181180
void data_to_json_string(std::string &ret, const void *data) const;
182181

libconfluo/src/atomic_multilog.cc

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ size_t atomic_multilog::append(void *data) {
228228
return offset;
229229
}
230230

231-
size_t atomic_multilog::append_json(std::string json_record) {
231+
size_t atomic_multilog::append_json(const std::string &json_record) {
232232
void *buf = schema_.json_string_to_data(json_record);
233233
size_t off = append(buf);
234234
delete[] reinterpret_cast<uint8_t *>(buf);
@@ -299,6 +299,15 @@ std::unique_ptr<record_cursor> atomic_multilog::execute_filter(const std::string
299299
return plan.execute(version);
300300
}
301301

302+
//std::unique_ptr<json_cursor> atomic_multilog::execute_filter_json(const std::string &expr) const {
303+
// uint64_t version = rt_.get();
304+
// auto t = parser::parse_expression(expr);
305+
// auto cexpr = parser::compile_expression(t, schema_);
306+
// query_plan plan = planner_.plan(cexpr);
307+
// return plan.execute(version);
308+
// // TODO: figure out return for this
309+
//}
310+
302311
numeric atomic_multilog::execute_aggregate(const std::string &aggregate_expr, const std::string &filter_expr) {
303312
auto pa = parser::parse_aggregate(aggregate_expr);
304313
aggregator agg = aggregate_manager::get_aggregator(pa.agg);
@@ -331,6 +340,27 @@ std::unique_ptr<record_cursor> atomic_multilog::query_filter(const std::string &
331340
parser::compiled_expression()));
332341
}
333342

343+
//std::unique_ptr<json_cursor> atomic_multilog::query_filter_json(const std::string &filter_name,
344+
// uint64_t begin_ms,
345+
// uint64_t end_ms) const {
346+
// filter_id_t filter_id;
347+
// if (filter_map_.get(filter_name, filter_id) == -1) {
348+
// throw invalid_operation_exception(
349+
// "Filter " + filter_name + " does not exist.");
350+
// }
351+
//
352+
// filter::range_result res = filters_.at(filter_id)->lookup_range(begin_ms,
353+
// end_ms);
354+
// uint64_t version = rt_.get();
355+
// std::unique_ptr<offset_cursor> o_cursor(
356+
// new offset_iterator_cursor<filter::range_result::iterator>(res.begin(),
357+
// res.end(),
358+
// version));
359+
// return std::unique_ptr<json_cursor>(
360+
// new json_cursor(std::move(o_cursor), &data_log_, &schema_,
361+
// parser::compiled_expression()));
362+
//}
363+
334364
std::unique_ptr<record_cursor> atomic_multilog::query_filter(const std::string &filter_name,
335365
uint64_t begin_ms,
336366
uint64_t end_ms,
@@ -350,6 +380,25 @@ std::unique_ptr<record_cursor> atomic_multilog::query_filter(const std::string &
350380
return std::unique_ptr<record_cursor>(new filter_record_cursor(std::move(o_cursor), &data_log_, &schema_, e));
351381
}
352382

383+
//std::unique_ptr<json_cursor> atomic_multilog::query_filter_json(const std::string &filter_name,
384+
// uint64_t begin_ms,
385+
// uint64_t end_ms,
386+
// const std::string &additional_filter_expr) const {
387+
// auto t = parser::parse_expression(additional_filter_expr);
388+
// auto e = parser::compile_expression(t, schema_);
389+
// filter_id_t filter_id;
390+
// if (filter_map_.get(filter_name, filter_id) == -1) {
391+
// throw invalid_operation_exception(
392+
// "Filter " + filter_name + " does not exist.");
393+
// }
394+
//
395+
// filter::range_result res = filters_.at(filter_id)->lookup_range(begin_ms, end_ms);
396+
// uint64_t version = rt_.get();
397+
// std::unique_ptr<offset_cursor> o_cursor(
398+
// new offset_iterator_cursor<filter::range_result::iterator>(res.begin(), res.end(), version));
399+
// return std::unique_ptr<json_cursor>(new json_cursor(std::move(o_cursor), &data_log_, &schema_, e));
400+
//}
401+
353402
numeric atomic_multilog::get_aggregate(const std::string &aggregate_name, uint64_t begin_ms, uint64_t end_ms) {
354403
aggregate_id_t aggregate_id;
355404
if (aggregate_map_.get(aggregate_name, aggregate_id) == -1) {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#include "container/cursor/json_cursors.h"
2+
3+
namespace confluo {
4+
5+
aggregated_json_cursor::aggregated_json_cursor(std::unique_ptr<offset_cursor> o_cursor,
6+
const data_log *dlog,
7+
const schema_t *schema,
8+
const parser::compiled_expression &cexpr,
9+
size_t batch_size)
10+
: json_cursor(batch_size),
11+
o_cursor_(std::move(o_cursor)),
12+
dlog_(dlog),
13+
schema_(schema),
14+
cexpr_(cexpr) {
15+
init();
16+
}
17+
18+
size_t aggregated_json_cursor::load_next_batch() {
19+
// TODO add functionality to make into a json_string
20+
size_t i = 0;
21+
for (; i < current_batch_.size() && o_cursor_->has_more();
22+
++i, o_cursor_->advance()) {
23+
uint64_t o = o_cursor_->get();
24+
read_only_data_log_ptr ptr;
25+
dlog_->cptr(o, ptr);
26+
if (!cexpr_.test(current_batch_[i] = schema_->apply(o, ptr))) {
27+
i--;
28+
}
29+
}
30+
return i;
31+
}
32+
33+
}

libconfluo/src/schema/schema.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ std::string schema_t::to_string() const {
9494
return str;
9595
}
9696

97-
void *schema_t::json_string_to_data(const std::string json_record) const {
97+
void *schema_t::json_string_to_data(const std::string &json_record) const {
9898
// need to convert json_data into a record vector
9999
std::stringstream ss;
100100
// putting the json data into a stream

librpc/src/rpc_server.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,11 @@ void rpc_service_handler::read(std::string &_return, int64_t id, const int64_t o
178178
_return.assign(data, size);
179179
}
180180
void rpc_service_handler::read_json(std::string &_return, int64_t id, const int64_t offset, const int64_t nrecords) {
181+
if (nrecords > 1) {
182+
THROW(unsupported_exception, "Reading more than 1 JSON records at a time is unsupported!");
183+
}
181184
atomic_multilog *mlog = store_->get_atomic_multilog(id);
182185
_return = mlog->read_json((uint64_t) offset);
183-
// TODO: put in functionality for nrecords to be read
184186
}
185187
void rpc_service_handler::query_aggregate(std::string &_return,
186188
int64_t id,

0 commit comments

Comments
 (0)