(原创)golang实现Parquet文件读写(支持泛型),基于github.com/apache/arrow-go

日期 2025年03月17日 18:42

分类 Golang

标签 Parquet 泛型 arrow-go 原创

浏览 469

字数统计: 9536(字)

文章目录

golang实现Parquet文件读写,支持泛型

基于github.com/apache/arrow-go
go get github.com/apache/arrow/go/v18

  1. 定义泛型接口 - ParquetGeneric
import (
	"context"
	"fmt"
	"github.com/apache/arrow/go/v18/arrow"
	"github.com/apache/arrow/go/v18/arrow/array"
	"github.com/apache/arrow/go/v18/arrow/memory"
	"github.com/apache/arrow/go/v18/parquet"
	"github.com/apache/arrow/go/v18/parquet/compress"
	"github.com/apache/arrow/go/v18/parquet/file"
	"github.com/apache/arrow/go/v18/parquet/pqarrow"
	"os"
	"os/signal"
	"path"
	"sync"
	"syscall"
)

// ParquetGeneric 定义泛型接口
type ParquetGeneric interface {
	Fields() []arrow.Field
	Schema() *arrow.Schema
	BuildRecord(builder *array.RecordBuilder)
	ParseRecord(column arrow.Array, colIdx int, rowIdx int)
}
  1. 实现 - ParquetReader
type ParquetReader[T ParquetGeneric] struct {
	filename      string
	reader        pqarrow.RecordReader
	readBatchSize int      // 批量读取大小
	newT          func() T // 创建 T 实例
}

func (s *ParquetReader[T]) initReader() {
	fileReader, err := file.OpenParquetFile(s.filename, false)
	if err != nil {
		panic(err)
	}
	// 默认内存分配器
	mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
	// 开启并行读取
	reader, err := pqarrow.NewFileReader(fileReader, pqarrow.ArrowReadProperties{BatchSize: int64(s.readBatchSize), Parallel: true}, mem)
	if err != nil {
		panic(err)
	}
	rr, err := reader.GetRecordReader(context.Background(), nil, nil)
	if err != nil {
		panic(err)
	}
	s.reader = rr
}

func (s *ParquetReader[T]) ReadParquet() []T {
	// 初始化 reader
	s.initReader()
	// 读取完成后释放资源
	defer s.reader.Release()
	// 读取数据
	items := make([]T, 0)
	for {
		// 批量读取
		records := s.ReadBatchRecords()
		if records == nil {
			break
		}
		items = append(items, records...)
	}
	return items
}

func (s *ParquetReader[T]) ReadBatchRecords() []T {
	if !s.reader.Next() {
		return nil
	}
	rec := s.reader.Record()
	defer rec.Release()
	rec.Retain()

	// 获取行数和列数
	numRows, numCols := int(rec.NumRows()), int(rec.NumCols())

	// 循环创建 T 实例
	records := make([]T, numRows)
	for rowIdx := 0; rowIdx < numRows; rowIdx++ {
		records[rowIdx] = s.newT() // Create a new T instance
	}

	// 循环解析数据
	for colIdx := 0; colIdx < numCols; colIdx++ {
		column := rec.Column(colIdx)
		for rowIdx := 0; rowIdx < numRows; rowIdx++ {
			records[rowIdx].ParseRecord(column, colIdx, rowIdx)
		}
	}

	return records
}
  1. 实现 - ParquetWriter

func NewParquetWriter[T ParquetGeneric](filename string, fields []arrow.Field, writeBatchSize int, compression compress.Compression) *ParquetWriter[T] {
	return &ParquetWriter[T]{
		filename:       filename,
		fields:         fields,
		writeBatchSize: writeBatchSize,
		compression:    compression,
		schema:         arrow.NewSchema(fields, nil),
		ch:             make(chan T, writeBatchSize*3),
	}
}

type ParquetWriter[T ParquetGeneric] struct {
	filename       string
	fs             *os.File
	compression    compress.Compression // 推荐: compress.Codecs.Snappy (更快,压缩率适中)
	schema         *arrow.Schema
	fields         []arrow.Field
	writer         *pqarrow.FileWriter
	writeBatchSize int // 批量写入大小
	ch             chan T
	wg             sync.WaitGroup
}

func (s *ParquetWriter[T]) Run() {
	s.initWriter()
	s.wg.Add(1) // 添加一个任务
	go func() {
		defer s.wg.Done()
		s.writeParquet()
	}()
	go s.quitSignal()
}

func (s *ParquetWriter[T]) initWriter() {
	_ = os.MkdirAll(path.Dir(s.filename), 0777)
	// 创建 Parquet 文件
	fs, err := os.Create(s.filename)
	if err != nil {
		fmt.Printf("创建文件失败: %v\n", err)
		return
	}
	s.fs = fs
	// 压缩配置
	props := parquet.NewWriterProperties(parquet.WithCompression(s.compression)) // compress.Codecs.Snappy
	// 创建 Parquet writer
	writer, err := pqarrow.NewFileWriter(s.schema, s.fs, props, pqarrow.DefaultWriterProps())
	if err != nil {
		fmt.Printf("创建 Parquet writer 失败: %v\n", err)
		return
	}
	s.writer = writer
}

func (s *ParquetWriter[T]) AddRecord(item T) {
	s.ch <- item
}

func (s *ParquetWriter[T]) writeParquet() {
	items := make([]T, 0)
	// 批量写入
	for item := range s.ch {
		items = append(items, item)
		if len(items) >= s.writeBatchSize {
			s.writeBatchRecords(items)
			items = make([]T, 0)
		}
	}
	// 处理剩余数据
	if len(items) > 0 {
		s.writeBatchRecords(items)
	}
}

func (s *ParquetWriter[T]) writeBatchRecords(items []T) {
	builder := array.NewRecordBuilder(memory.DefaultAllocator, s.schema)
	for _, item := range items {
		item.BuildRecord(builder)
	}
	rec := builder.NewRecord()
	defer rec.Release() // 确保 rec 总是被释放
	if err := s.writer.Write(rec); err != nil {
		fmt.Printf("批量写入 Record 失败: %v\n", err)
		return
	}
	fmt.Println("写入数据:", len(items))
}

func (s *ParquetWriter[T]) quitSignal() {
	// 监听 control+c,quit 信号
	// 退出时关闭 writer
	// 保证数据写入完成
	ch := make(chan os.Signal, 0)
	signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-ch
	// 关闭 writer
	fmt.Println("接收到退出信号,准备退出...")
	s.Close()
}

func (s *ParquetWriter[T]) Close() {
	close(s.ch)
	s.wg.Wait() // 等待 writeParquet 完成
	if s.writer != nil {
		if err := s.writer.Close(); err != nil {
			fmt.Printf("关闭 writer 失败: %v,%v\n", s.filename, err)
		}
	}
	if s.fs != nil {
		if err := s.fs.Close(); err != nil {
			fmt.Printf("关闭文件失败: %v,%v\n", s.filename, err)
		}
	}
}

  1. 示例 - Demo

func main() {
	writeParquet()
	readParquet()
}

type ApiLog struct {
	RequestIp string `json:"request_ip" bson:"request_ip" orc:"request_ip"`
	Date      int    `json:"date" bson:"date" orc:"date"`
	Hour      int    `json:"hour" bson:"hour" orc:"hour"`
	Start     int64  `json:"start" bson:"start" orc:"start"`
	End       int64  `json:"end" bson:"end" orc:"end"`
	Spent     int64  `json:"spent" bson:"spent" orc:"spent"`
	Method    string `json:"method" bson:"method" orc:"method"`
	Url       string `json:"url" bson:"url" orc:"url"`
	Req       string `json:"req" bson:"req" orc:"req"`
	Body      string `json:"body" bson:"body" orc:"body"`
	Error     string `json:"error" bson:"error" orc:"error"`
}

var ApiLogFields = []arrow.Field{
	{Name: "request_ip", Type: arrow.BinaryTypes.String},
	{Name: "date", Type: arrow.PrimitiveTypes.Int32},
	{Name: "hour", Type: arrow.PrimitiveTypes.Int32},
	{Name: "start", Type: arrow.PrimitiveTypes.Int64},
	{Name: "end", Type: arrow.PrimitiveTypes.Int64},
	{Name: "spent", Type: arrow.PrimitiveTypes.Int64},
	{Name: "method", Type: arrow.BinaryTypes.String},
	{Name: "url", Type: arrow.BinaryTypes.String},
	{Name: "req", Type: arrow.BinaryTypes.String},
	{Name: "body", Type: arrow.BinaryTypes.String},
	{Name: "error", Type: arrow.BinaryTypes.String},
}

func (s *ApiLog) Schema() *arrow.Schema {
	return arrow.NewSchema(ApiLogFields, nil)
}

func (s *ApiLog) Fields() []arrow.Field {
	return ApiLogFields
}

func (s *ApiLog) BuildRecord(builder *array.RecordBuilder) {
	for i := 0; i < len(ApiLogFields); i++ {
		switch ApiLogFields[i].Name {
		case "request_ip":
			builder.Field(i).(*array.StringBuilder).Append(s.RequestIp)
		case "date":
			builder.Field(i).(*array.Int32Builder).Append(int32(s.Date))
		case "hour":
			builder.Field(i).(*array.Int32Builder).Append(int32(s.Hour))
		case "start":
			builder.Field(i).(*array.Int64Builder).Append(s.Start)
		case "end":
			builder.Field(i).(*array.Int64Builder).Append(s.End)
		case "spent":
			builder.Field(i).(*array.Int64Builder).Append(s.Spent)
		case "method":
			builder.Field(i).(*array.StringBuilder).Append(s.Method)
		case "url":
			builder.Field(i).(*array.StringBuilder).Append(s.Url)
		case "req":
			builder.Field(i).(*array.StringBuilder).Append(s.Req)
		case "body":
			builder.Field(i).(*array.StringBuilder).Append(s.Body)
		case "error":
			builder.Field(i).(*array.StringBuilder).Append(s.Error)
		}
	}
}

func (s *ApiLog) ParseRecord(column arrow.Array, colIdx int, rowIdx int) {
	fieldName := ApiLogFields[colIdx].Name
	switch fieldName {
	case "request_ip":
		s.RequestIp = column.(*array.String).Value(rowIdx)
	case "date":
		s.Date = int(column.(*array.Int32).Value(rowIdx))
	case "hour":
		s.Hour = int(column.(*array.Int32).Value(rowIdx))
	case "start":
		s.Start = column.(*array.Int64).Value(rowIdx)
	case "end":
		s.End = column.(*array.Int64).Value(rowIdx)
	case "spent":
		s.Spent = column.(*array.Int64).Value(rowIdx)
	case "method":
		s.Method = column.(*array.String).Value(rowIdx)
	case "url":
		s.Url = column.(*array.String).Value(rowIdx)
	case "req":
		s.Req = column.(*array.String).Value(rowIdx)
	case "body":
		s.Body = column.(*array.String).Value(rowIdx)
	case "error":
		s.Error = column.(*array.String).Value(rowIdx)
	}
}

func writeParquet() {
	w := NewParquetWriter[*ApiLog]("test.parquet", ApiLogFields, 10000, compress.Codecs.Snappy)
	w.Run()
	// 生成 10 万条日志数据(模拟大数据量)
	total := 100000
	ts := time.Now()
	for i := 0; i < total; i++ {
		// 填充数据到 builder
		apiLog := ApiLog{
			RequestIp: "",
			Date:      20230101,
			Hour:      12,
			Start:     1672531200000,
			End:       1672531201000,
			Spent:     1000,
			Method:    "GET",
			Url:       "/api/v1/data",
			Req:       fmt.Sprintf("req%d", i),
			Body:      fmt.Sprintf("body%d", i),
			Error:     "",
		}
		w.AddRecord(&apiLog)
	}
	time.Sleep(2 * time.Second)
	w.Close()
	fmt.Println("写入数据:", total, "写入数据耗时:", time.Since(ts))
	return
}

func readParquet() {
	ts := time.Now()
	records := NewParquetReader[*ApiLog]("test.parquet", 10000, func() *ApiLog {
		return new(ApiLog)
	}).ReadParquet()
	fmt.Println("读取到数据:", len(records), "读取数据耗时:", time.Since(ts))
	return
}