C++ 列式内存布局数据存储格式 Arrow
创始人
2024-11-21 03:35:54
0

Apache Arrow 优点 :
    高性能数据处理: Arrow 使用列式内存布局,这特别适合于数据分析和查询操作,因为它允许对数据进行高效批量处理,减少CPU缓存未命中,从而提升处理速度。
    零拷贝数据共享: Arrow 允许不同系统和进程之间直接共享内存中的数据而无需复制,这对于提高数据密集型应用的效率至关重要,减少了内存使用和CPU开销。
    跨平台兼容性: Arrow 是一个跨语言开发平台,支持C++, Java, Python等多种编程语言,促进了不同软件组件间的互操作性。
    标准化数据格式: 定义了一套统一的数据格式规范,使得数据可以在不同系统间无缝传递,降低了数据转换的成本和复杂性。
    优化大数据处理: 特别是在与大数据框架(如Spark、Pandas)集成时,Arrow 可显著加速数据加载、处理和分析的速度,例如,与PySpark集成后数据处理速度提升高达53倍。
    集成广泛: 被众多数据处理工具和库采用,如Pandas、Parquet、Drill、Spark等,形成了强大的生态系统。
Apache Arrow 缺点 :
    内存消耗: 列式存储相对于行式存储可能需要更多的内存,尤其是在处理稀疏数据或宽表时,因为每一列都需要分配连续的内存空间。
    不适合所有场景: 对于需要频繁随机访问记录或更新操作的场景,Arrow 的列式存储可能不如传统的行式存储高效。
    学习曲线: 对于新用户来说,理解和掌握Arrow的数据结构和API可能需要一定时间,尤其是当他们习惯于使用其他数据处理模型时。
    生态成熟度: 虽然Arrow的生态系统正在快速发展,但在某些特定领域或小众技术栈中,相关支持和工具可能不够丰富或成熟。
    实现复杂性: 对于开发者来说,实现Arrow的高效利用可能涉及到复杂的内存管理和优化策略,这在某些情况下可能会增加开发难度。

 #define ARROW_COMPUTE  #include  #include "arrow/pretty_print.h" #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #include   template  using numbuildT = arrow::NumericBuilder;   struct ArrowUtil {     static arrow::Status read_csv(char const* file_name, std::shared_ptr& tb);     static arrow::Status read_ipc(char const* file_name, std::shared_ptr& tb);     static arrow::Status read_parquet(char const* file_name, std::shared_ptr& tb);     static arrow::Status read_json(char const* file_name, std::shared_ptr& tb);      static arrow::Status write_ipc(arrow::Table const& tb, char const* file_name);     static arrow::Status write_parquet(arrow::Table const& tb, char const* file_name);      template      inline static std::shared_ptr chunked_array_to_array(std::shared_ptr const& array_a) {         buildT int64_builder;         int64_builder.Resize(array_a->length());         std::vector int64_values;         int64_values.reserve(array_a->length());         for (int i = 0; i < array_a->num_chunks(); ++i) {             auto inner_arr = array_a->chunk(i);             auto int_a = std::static_pointer_cast(inner_arr);             for (int j = 0; j < int_a->length(); ++j) {                 int64_values.push_back(int_a->Value(j));             }         }          int64_builder.AppendValues(int64_values);         std::shared_ptr array_a_res;         int64_builder.Finish(&array_a_res);         return array_a_res;     }       template      inline static std::vector chunked_array_to_vector(std::shared_ptr const& array_a) {         std::vector int64_values;         int64_values.reserve(array_a->length());         for (int i = 0; i < array_a->num_chunks(); ++i) {             auto inner_arr = array_a->chunk(i);             auto int_a = std::static_pointer_cast(inner_arr);             for (int j = 0; j < int_a->length(); ++j) {                 int64_values.push_back(int_a->Value(j));             }         }         return int64_values;     }      inline static std::vector chunked_array_to_str_vector(std::shared_ptr const& array_a) {         std::vector int64_values;         int64_values.reserve(array_a->length());         for (int i = 0; i < array_a->num_chunks(); ++i) {             auto inner_arr = array_a->chunk(i);             auto int_a = std::static_pointer_cast(inner_arr);             for (int j = 0; j < int_a->length(); ++j) {                 int64_values.push_back(int_a->Value(j).data());             }         }         return int64_values;     }       inline static std::shared_ptr chunked_array_to_str_array(std::shared_ptr const& array_a) {         arrow::StringBuilder int64_builder;         int64_builder.Resize(array_a->length());         std::vector int64_values;         int64_values.reserve(array_a->length());         for (int i = 0; i < array_a->num_chunks(); ++i) {             auto inner_arr = array_a->chunk(i);             auto int_a = std::static_pointer_cast(inner_arr);             for (int j = 0; j < int_a->length(); ++j) {                 int64_values.push_back(int_a->Value(j).data());             }         }         int64_builder.AppendValues(int64_values);         std::shared_ptr array_a_res;         int64_builder.Finish(&array_a_res);         return array_a_res;     } };   arrow::Status ArrowUtil::read_csv(char const* file_name, std::shared_ptr& tb) {     ARROW_ASSIGN_OR_RAISE(auto input_file,         arrow::io::ReadableFile::Open(file_name));     ARROW_ASSIGN_OR_RAISE(auto csv_reader,         arrow::csv::TableReader::Make(             arrow::io::default_io_context(), input_file,             arrow::csv::ReadOptions::Defaults(),             arrow::csv::ParseOptions::Defaults(),             arrow::csv::ConvertOptions::Defaults()));     ARROW_ASSIGN_OR_RAISE(auto table, csv_reader->Read());     tb = table;     return arrow::Status::OK(); }  arrow::Status ArrowUtil::read_ipc(char const* file_name, std::shared_ptr& tb) {     ARROW_ASSIGN_OR_RAISE(auto input_file,         arrow::io::ReadableFile::Open(file_name));      ARROW_ASSIGN_OR_RAISE(auto ipc_reader, arrow::ipc::RecordBatchFileReader::Open(input_file));      std::vector> batches;     batches.reserve(ipc_reader->num_record_batches());      for (int i = 0; i < ipc_reader->num_record_batches(); ++i) {         ARROW_ASSIGN_OR_RAISE(auto a_record, ipc_reader->ReadRecordBatch(i));         batches.emplace_back(std::move(a_record));     }      arrow::Table::FromRecordBatches(ipc_reader->schema(), std::move(batches)).Value(&tb);      return arrow::Status::OK(); }  arrow::Status ArrowUtil::read_parquet(char const* file_name, std::shared_ptr& tb) {     std::shared_ptr infile;     PARQUET_ASSIGN_OR_THROW(infile,         arrow::io::ReadableFile::Open(file_name,             arrow::default_memory_pool()));      std::unique_ptr reader;     PARQUET_THROW_NOT_OK(         parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));     std::shared_ptr table;     PARQUET_THROW_NOT_OK(reader->ReadTable(&table));     tb = table;     return arrow::Status::OK(); }  arrow::Status ArrowUtil::read_json(char const* file_name, std::shared_ptr& tb) {     std::shared_ptr infile;     PARQUET_ASSIGN_OR_THROW(infile,         arrow::io::ReadableFile::Open(file_name,             arrow::default_memory_pool()));      ARROW_ASSIGN_OR_RAISE(auto reader, arrow::json::TableReader::Make(arrow::default_memory_pool(), infile, arrow::json::ReadOptions::Defaults(), arrow::json::ParseOptions::Defaults()));      ARROW_ASSIGN_OR_RAISE(auto res_tb, reader->Read());     tb = res_tb;     return arrow::Status::OK(); }  arrow::Status ArrowUtil::write_ipc(arrow::Table const& tb, char const* file_name) {      ARROW_ASSIGN_OR_RAISE(auto output_file,         arrow::io::FileOutputStream::Open(file_name));     ARROW_ASSIGN_OR_RAISE(auto batch_writer,         arrow::ipc::MakeFileWriter(output_file, tb.schema()));     ARROW_RETURN_NOT_OK(batch_writer->WriteTable(tb));     ARROW_RETURN_NOT_OK(batch_writer->Close());      return arrow::Status::OK(); }  arrow::Status ArrowUtil::write_parquet(arrow::Table const& tb, char const* file_name) {     std::shared_ptr outfile;     PARQUET_ASSIGN_OR_THROW(         outfile, arrow::io::FileOutputStream::Open(file_name));     // The last argument to the function call is the size of the RowGroup in     // the parquet file. Normally you would choose this to be rather large but     // for the example, we use a small value to have multiple RowGroups.     PARQUET_THROW_NOT_OK(         parquet::arrow::WriteTable(tb, arrow::default_memory_pool(), outfile, 3));     return arrow::Status::OK(); }    void testReadCSV() {     // 读取CSV文件     char const* csv_path = "./test.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(csv_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     assert(tb_.num_rows() == 2); }  void testWriteIpc() {     // 读取CSV文件并写入IPC文件     char const* csv_path = "./test.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(csv_path, tb);     auto const& tb_ = *tb;      char const* write_csv_path = "./test_dst.arrow";     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto write_res = ArrowUtil::write_ipc(tb_, write_csv_path);     assert(write_res == arrow::Status::OK()); }  void testReadIPC() {     // 读取Arrow IPC 文件     char const* ipc_path = "./test_dst.arrow";     std::shared_ptr tb;     ArrowUtil::read_ipc(ipc_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     assert(tb_.num_rows() == 2); }   void testWriteParquet() {     // 写入Parquet文件     char const* csv_path = "./test.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(csv_path, tb);     auto const& tb_ = *tb;      char const* write_parquet_path = "./test_dst.parquet";     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto write_res = ArrowUtil::write_parquet(tb_, write_parquet_path);     assert(write_res == arrow::Status::OK()); }   void testReadParquet() {     // 读取 Parquet     char const* parquet_path = "./test_dst.parquet";     std::shared_ptr tb;     ArrowUtil::read_parquet(parquet_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     assert(tb_.num_rows() == 2); }  void testReadJson() {     // 读取Json文件     char const* json_path = "./test.json";     std::shared_ptr tb;     ArrowUtil::read_json(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     assert(tb_.num_rows() == 2); }  void testComputeGreater() {     // 比较两列 int 值中 int1 > int2的值, greater函数     char const* json_path = "./comp_gt.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto array_a = tb_.GetColumnByName("int1");     auto array_b = tb_.GetColumnByName("int2");      auto array_a_res = ArrowUtil::chunked_array_to_array, arrow::Int64Array>(array_a);     auto array_b_res = ArrowUtil::chunked_array_to_array, arrow::Int64Array>(array_b);      auto compared_datum = arrow::compute::CallFunction("greater", { array_a_res, array_b_res });     auto array_a_gt_b_compute = compared_datum->make_array();      arrow::PrettyPrint(*array_a_gt_b_compute, {}, &std::cerr);      auto schema =         arrow::schema({ arrow::field("int1", arrow::int64()), arrow::field("int2", arrow::int64()),                        arrow::field("a>b? (arrow)", arrow::boolean()) });      std::shared_ptr my_table = arrow::Table::Make(         schema, { array_a_res, array_b_res, array_a_gt_b_compute }, tb_.num_rows());      arrow::PrettyPrint(*my_table, {}, &std::cerr); }  void testComputeMinMax() {     // 计算int1列的最大值和最小值     char const* json_path = "./comp_gt.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto array_a = tb_.GetColumnByName("int1");     auto array_a_res = ArrowUtil::chunked_array_to_array, arrow::Int64Array>(array_a);      arrow::compute::ScalarAggregateOptions scalar_aggregate_options;     scalar_aggregate_options.skip_nulls = false;      auto min_max = arrow::compute::CallFunction("min_max", { array_a_res }, &scalar_aggregate_options);      // Unpack struct scalar result (a two-field {"min", "max"} scalar)     auto min_value = min_max->scalar_as().value[0];     auto max_value = min_max->scalar_as().value[1];      assert(min_value->ToString() == "1");     assert(max_value->ToString() == "8"); }  #define GTEST_TEST(a, b) void a##_##b() #define ASSERT_EQ(a, b) assert(a == b)  GTEST_TEST(RWTests, ComputeMean) {     // 计算int1列的平均值     char const* json_path = "../data/comp_gt.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto array_a = tb_.GetColumnByName("int1");     auto array_a_res = ArrowUtil::chunked_array_to_array, arrow::Int64Array>(array_a);      arrow::compute::ScalarAggregateOptions scalar_aggregate_options;     scalar_aggregate_options.skip_nulls = false;      auto mean = arrow::compute::CallFunction("mean", { array_a_res }, &scalar_aggregate_options);      auto const& mean_value = mean->scalar_as();      ASSERT_EQ(mean_value.ToString(), "4.5"); }  GTEST_TEST(RWTests, ComputeAdd) {     // 将第一列的值加3     char const* json_path = "../data/comp_gt.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto array_a = tb_.GetColumnByName("int1");     auto array_a_res = ArrowUtil::chunked_array_to_array, arrow::Int64Array>(array_a);      arrow::compute::ScalarAggregateOptions scalar_aggregate_options;     scalar_aggregate_options.skip_nulls = false;      std::shared_ptr increment = std::make_shared(3);      auto add = arrow::compute::CallFunction("add", { array_a_res, increment }, &scalar_aggregate_options);     std::shared_ptr incremented_array = add->array_as();     arrow::PrettyPrint(*incremented_array, {}, &std::cerr); }   GTEST_TEST(RWTests, ComputeAddArray) {     // int1和int2两列相加     char const* json_path = "../data/comp_gt.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto array_a = tb_.GetColumnByName("int1");     auto array_a_res = ArrowUtil::chunked_array_to_array, arrow::Int64Array>(array_a);      auto array_b = tb_.GetColumnByName("int2");     auto array_b_res = ArrowUtil::chunked_array_to_array, arrow::Int64Array>(array_b);      arrow::compute::ScalarAggregateOptions scalar_aggregate_options;     scalar_aggregate_options.skip_nulls = false;      auto add = arrow::compute::CallFunction("add", { array_a_res, array_b_res }, &scalar_aggregate_options);     std::shared_ptr incremented_array = add->array_as();     arrow::PrettyPrint(*incremented_array, {}, &std::cerr); }  GTEST_TEST(RWTests, ComputeStringEqual) {     // 比较s1和s2两列是否相等     char const* json_path = "../data/comp_s_eq.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);      auto array_a = tb_.GetColumnByName("s1");     auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);      auto array_b = tb_.GetColumnByName("s2");     auto array_b_res = ArrowUtil::chunked_array_to_str_array(array_b);      arrow::compute::ScalarAggregateOptions scalar_aggregate_options;     scalar_aggregate_options.skip_nulls = false;      auto eq_ = arrow::compute::CallFunction("equal", { array_a_res, array_b_res }, &scalar_aggregate_options);     std::shared_ptr eq_array = eq_->array_as();     arrow::PrettyPrint(*eq_array, {}, &std::cerr); }  GTEST_TEST(RWTests, ComputeCustom) {     // 自己写算法逐个比较相等      char const* json_path = "../data/comp_s_eq.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto arr1 = tb_.GetColumnByName("s1");     auto arr2 = tb_.GetColumnByName("s2");     auto v1 = ArrowUtil::chunked_array_to_str_vector(arr1);     auto v2 = ArrowUtil::chunked_array_to_str_vector(arr2);     for (std::size_t i = 0; i < v1.size(); ++i) {         if (v1[i] != v2[i]) {             std::cerr << v1[i] << "!=" << v2[i] << "\n";         }     } }  GTEST_TEST(RWTests, ComputeCustomDbl) {     // 自己写算法比较double值     char const* json_path = "../data/custom_dbl.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto arr1 = tb_.GetColumnByName("dbl1");     auto arr2 = tb_.GetColumnByName("dbl2");     auto v1 = ArrowUtil::chunked_array_to_vector(arr1);     auto v2 = ArrowUtil::chunked_array_to_vector(arr2);     for (std::size_t i = 0; i < v1.size(); ++i) {         if (v1[i] != v2[i]) {             std::cerr << v1[i] << "!=" << v2[i] << "\n";         }     } }  GTEST_TEST(RWTests, ComputeEqualDbl) {     // 使用equal函数比较double值     char const* json_path = "../data/custom_dbl.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);     auto arr1 = tb_.GetColumnByName("dbl1");     auto arr2 = tb_.GetColumnByName("dbl2");      auto dbl_arr1 = ArrowUtil::chunked_array_to_array, arrow::DoubleArray>(arr1);     auto dbl_arr2 = ArrowUtil::chunked_array_to_array, arrow::DoubleArray>(arr2);      arrow::compute::ScalarAggregateOptions scalar_aggregate_options;     scalar_aggregate_options.skip_nulls = false;      auto eq_ = arrow::compute::CallFunction("equal", { dbl_arr1, dbl_arr2 }, &scalar_aggregate_options);     std::shared_ptr eq_array = eq_->array_as();     arrow::PrettyPrint(*eq_array, {}, &std::cerr); }  GTEST_TEST(RWTests, StrStartsWith) {     // 计算s1列以是否以 Zha开头的值     char const* json_path = "../data/comp_s_eq.csv";     std::shared_ptr tb;     ArrowUtil::read_csv(json_path, tb);     auto const& tb_ = *tb;     arrow::PrettyPrint(tb_, {}, &std::cerr);      auto array_a = tb_.GetColumnByName("s1");     auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);      arrow::compute::MatchSubstringOptions options("Zha");      auto eq_ = arrow::compute::CallFunction("starts_with", { array_a_res }, &options);     std::shared_ptr eq_array = eq_->array_as();     arrow::PrettyPrint(*eq_array, {}, &std::cerr); }    using arrow::Int32Builder; using arrow::Int64Builder; using arrow::DoubleBuilder; using arrow::StringBuilder;  struct row_data {     int32_t col1;     int64_t col2;     double col3;     std::string col4; };//行结构  #define EXIT_ON_FAILURE(expr)                      \   do {                                             \     arrow::Status status_ = (expr);                \     if (!status_.ok()) {                           \       std::cerr << status_.message() << std::endl; \       return EXIT_FAILURE;                         \     }                                              \   } while (0);  arrow::Status CreateTable(const std::vector& rows, std::shared_ptr* table) {      //使用arrow::jemalloc::MemoryPool::default_pool()构建器更有效,因为这可以适当增加底层内存区域的大小.     arrow::MemoryPool* pool = arrow::default_memory_pool();      Int32Builder col1_builder(pool);     Int64Builder col2_builder(pool);     DoubleBuilder col3_builder(pool);     StringBuilder col4_builder(pool);      //现在我们可以循环我们现有的数据,并将其插入到构建器中。这里的' Append '调用可能会失败(例如,我们无法分配足够的额外内存)。因此我们需要检查它们的返回值。     for (const row_data& row : rows) {         ARROW_RETURN_NOT_OK(col1_builder.Append(row.col1));         ARROW_RETURN_NOT_OK(col2_builder.Append(row.col2));         ARROW_RETURN_NOT_OK(col3_builder.Append(row.col3));         ARROW_RETURN_NOT_OK(col4_builder.Append(row.col4));     }      //添加空值,末尾值的元素为空     ARROW_RETURN_NOT_OK(col1_builder.AppendNull());     ARROW_RETURN_NOT_OK(col2_builder.AppendNull());     ARROW_RETURN_NOT_OK(col3_builder.AppendNull());     ARROW_RETURN_NOT_OK(col4_builder.AppendNull());      std::shared_ptr col1_array;     ARROW_RETURN_NOT_OK(col1_builder.Finish(&col1_array));     std::shared_ptr col2_array;     ARROW_RETURN_NOT_OK(col2_builder.Finish(&col2_array));     std::shared_ptr col3_array;     ARROW_RETURN_NOT_OK(col3_builder.Finish(&col3_array));     std::shared_ptr col4_array;     ARROW_RETURN_NOT_OK(col4_builder.Finish(&col4_array));      std::vector> schema_vector = {             arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),             arrow::field("col4", arrow::utf8()) };      auto schema = std::make_shared(schema_vector);      //最终的' table '变量是我们可以传递给其他可以使用Apache Arrow内存结构的函数的变量。这个对象拥有所有引用数据的所有权,     //因此一旦我们离开构建表及其底层数组的函数的作用域,就不必关心未定义的引用。     *table = arrow::Table::Make(schema, { col1_array, col2_array, col3_array,col4_array });      return arrow::Status::OK(); }  arrow::Status TableToVector(const std::shared_ptr& table,     std::vector* rows) {     //检查表结构是否一致     std::vector> schema_vector = {             arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),             arrow::field("col4", arrow::utf8()) };     auto expected_schema = std::make_shared(schema_vector);      if (!expected_schema->Equals(*table->schema())) {         // The table doesn't have the expected schema thus we cannot directly         // convert it to our target representation.         return arrow::Status::Invalid("Schemas are not matching!");     }      //获取对应列数据指针     auto col1s =         std::static_pointer_cast(table->column(0)->chunk(0));     auto col2s =         std::static_pointer_cast(table->column(1)->chunk(0));     auto col3s =         std::static_pointer_cast(table->column(2)->chunk(0));     auto col4s =         std::static_pointer_cast(table->column(3)->chunk(0));      for (int64_t i = 0; i < table->num_rows(); i++) {         if (col1s->IsNull(i)) {             assert(i == 3);//第四行为null         }         else {             int32_t col1 = col1s->Value(i);             int64_t col2 = col2s->Value(i);             double col3 = col3s->Value(i);             std::string col4 = col4s->GetString(i);             rows->push_back({ col1, col2, col3,col4 });         }     }      return arrow::Status::OK(); }  // 行数组和列数组相互转换 int testTableConvertSTL() {     //行数组     std::vector rows = {             {1, 11,1.0, "John"}, {2, 22,2.0, "Tom"}, {3,33, 3.0,"Susan"} };      std::shared_ptr table;     EXIT_ON_FAILURE(CreateTable(rows, &table));      std::vector expected_rows;     EXIT_ON_FAILURE(TableToVector(table, &expected_rows));     std::cout << expected_rows.size() << std::endl;     assert(rows.size() == expected_rows.size());     return 0; }  void test() {     // 构建一个int8数组     arrow::Int8Builder builder;     arrow::Int16Builder int16builder;     int8_t days_raw[5] = { 1, 12, 17, 23, 28 };     int8_t months_raw[5] = { 1, 3, 5, 7, 1 };     int16_t years_raw[5] = { 1990, 2000, 1995, 2000, 1995 };     builder.AppendValues(days_raw, 5);     std::shared_ptr days = builder.Finish().MoveValueUnsafe();         builder.AppendValues(months_raw, 5);     std::shared_ptr months = builder.Finish().MoveValueUnsafe();         int16builder.AppendValues(years_raw, 5);     std::shared_ptr years = int16builder.Finish().MoveValueUnsafe();      // Schema 自定义table     // Now, we want a RecordBatch, which has columns and labels for said columns.     // This gets us to the 2d data structures we want in Arrow.     // These are defined by schema, which have fields -- here we get both those object types     // ready.     std::shared_ptr field_day, field_month, field_year;     std::shared_ptr schema;      // Every field needs its name and data type.     field_day = arrow::field("Day", arrow::int8());     field_month = arrow::field("Month", arrow::int8());     field_year = arrow::field("Year", arrow::int16());      // The schema can be built from a vector of fields, and we do so here.     schema = arrow::schema({ field_day, field_month, field_year });      // 打印     // With the schema and Arrays full of data, we can make our RecordBatch! Here,     // each column is internally contiguous. This is in opposition to Tables, which we'll     // see next.     std::shared_ptr rbatch;     // The RecordBatch needs the schema, length for columns, which all must match,     // and the actual data itself.     rbatch = arrow::RecordBatch::Make(schema, days->length(), { days, months, years });     std::cout << rbatch->ToString();     /*     Day:   [         1,         12,         17,         23,         28       ]     Month:   [         1,         3,         5,         7,         1       ]     Year:   [         1990,         2000,         1995,         2000,         1995       ]     */      // stl vector容器     arrow::ArrayVector day_vecs{days};     std::shared_ptr day_chunks =         std::make_shared(day_vecs);     testTableConvertSTL();      testReadCSV();     /*     col1: string     col2: string     col3: string     ----     col1:       [         [           "val1",           "val1"         ]       ]     col2:       [         [           "val2",           "val2"         ]       ]     col3:       [         [           "val3",           "val3"         ]     ]     */     testWriteIpc();     testReadIPC();     //testComputeGreater();     //testComputeMinMax(); } 

Compute Functions — Apache Arrow v17.0.0

GitHub - apache/arrow: Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing


创作不易,小小的支持一下吧!

相关内容

热门资讯

一分钟了解"wpk透... 一分钟了解"wpk透视辅助(WPK存在)外挂透明挂辅助测试",介绍教程(2020已更新)(哔哩哔哩)...
二分钟透视辅助!wpk辅助神器... 《wpk辅助软件透明挂》是一款多人竞技的wpk辅助辅助透视游戏,你将微扑克对手来到同一个战场,为至高...
推荐一款!wepokEr辅助透... 推荐一款!wepokEr辅助透视插件,wpk辅助是有辅助,透明教程(有挂来袭)-哔哩哔哩是一款可以让...
五分钟最新版!wepoker免... 五分钟最新版!wepoker免费透视脚本(透视器),wepoKEr原来真的是有挂,大神讲解(有挂分析...
8分钟代打!今日长牌有外挂,太... 8分钟代打!今日长牌有外挂,太难了外挂透明挂辅助挂,详细教程(有挂同胞);今日长牌有外挂简单的灵活控...
技术分享!(博雅大连棋牌)外挂... 技术分享!(博雅大连棋牌)外挂透视辅助助手!(辅助挂)详细教程(2022已更新)(哔哩哔哩);超受欢...
推荐一款!wePOker透视辅... 您好,wePOker这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩家在这...
1分钟辅助挂!pokernow... 您好,微扑克这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩家在这款游戏中...
五分钟私人局!wepoker免... 五分钟私人局!wepoker免费钻石(辅助挂),wEPOKEr有挂确实有辅助挂,实用技巧(有挂了解)...
玩家必备教程"wpk... 您好,Wepoke总结这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩家在...