(原创)golang实现Parquet文件读写(支持泛型),基于github.com/apache/arrow-go
文章目录
golang实现Parquet文件读写,支持泛型
基于github.com/apache/arrow-go
go get github.com/apache/arrow/go/v18
- 定义泛型接口 - ParquetGeneric
import (
"context"
"fmt"
"github.com/apache/arrow/go/v18/arrow"
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/memory"
"github.com/apache/arrow/go/v18/parquet"
"github.com/apache/arrow/go/v18/parquet/compress"
"github.com/apache/arrow/go/v18/parquet/file"
"github.com/apache/arrow/go/v18/parquet/pqarrow"
"os"
"os/signal"
"path"
"sync"
"syscall"
)
// ParquetGeneric 定义泛型接口
type ParquetGeneric interface {
Fields() []arrow.Field
Schema() *arrow.Schema
BuildRecord(builder *array.RecordBuilder)
ParseRecord(column arrow.Array, colIdx int, rowIdx int)
}
- 实现 - ParquetReader
type ParquetReader[T ParquetGeneric] struct {
filename string
reader pqarrow.RecordReader
readBatchSize int // 批量读取大小
newT func() T // 创建 T 实例
}
func (s *ParquetReader[T]) initReader() {
fileReader, err := file.OpenParquetFile(s.filename, false)
if err != nil {
panic(err)
}
// 默认内存分配器
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
// 开启并行读取
reader, err := pqarrow.NewFileReader(fileReader, pqarrow.ArrowReadProperties{BatchSize: int64(s.readBatchSize), Parallel: true}, mem)
if err != nil {
panic(err)
}
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
if err != nil {
panic(err)
}
s.reader = rr
}
func (s *ParquetReader[T]) ReadParquet() []T {
// 初始化 reader
s.initReader()
// 读取完成后释放资源
defer s.reader.Release()
// 读取数据
items := make([]T, 0)
for {
// 批量读取
records := s.ReadBatchRecords()
if records == nil {
break
}
items = append(items, records...)
}
return items
}
func (s *ParquetReader[T]) ReadBatchRecords() []T {
if !s.reader.Next() {
return nil
}
rec := s.reader.Record()
defer rec.Release()
rec.Retain()
// 获取行数和列数
numRows, numCols := int(rec.NumRows()), int(rec.NumCols())
// 循环创建 T 实例
records := make([]T, numRows)
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
records[rowIdx] = s.newT() // Create a new T instance
}
// 循环解析数据
for colIdx := 0; colIdx < numCols; colIdx++ {
column := rec.Column(colIdx)
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
records[rowIdx].ParseRecord(column, colIdx, rowIdx)
}
}
return records
}
- 实现 - ParquetWriter
func NewParquetWriter[T ParquetGeneric](filename string, fields []arrow.Field, writeBatchSize int, compression compress.Compression) *ParquetWriter[T] {
return &ParquetWriter[T]{
filename: filename,
fields: fields,
writeBatchSize: writeBatchSize,
compression: compression,
schema: arrow.NewSchema(fields, nil),
ch: make(chan T, writeBatchSize*3),
}
}
type ParquetWriter[T ParquetGeneric] struct {
filename string
fs *os.File
compression compress.Compression // 推荐: compress.Codecs.Snappy (更快,压缩率适中)
schema *arrow.Schema
fields []arrow.Field
writer *pqarrow.FileWriter
writeBatchSize int // 批量写入大小
ch chan T
wg sync.WaitGroup
}
func (s *ParquetWriter[T]) Run() {
s.initWriter()
s.wg.Add(1) // 添加一个任务
go func() {
defer s.wg.Done()
s.writeParquet()
}()
go s.quitSignal()
}
func (s *ParquetWriter[T]) initWriter() {
_ = os.MkdirAll(path.Dir(s.filename), 0777)
// 创建 Parquet 文件
fs, err := os.Create(s.filename)
if err != nil {
fmt.Printf("创建文件失败: %v\n", err)
return
}
s.fs = fs
// 压缩配置
props := parquet.NewWriterProperties(parquet.WithCompression(s.compression)) // compress.Codecs.Snappy
// 创建 Parquet writer
writer, err := pqarrow.NewFileWriter(s.schema, s.fs, props, pqarrow.DefaultWriterProps())
if err != nil {
fmt.Printf("创建 Parquet writer 失败: %v\n", err)
return
}
s.writer = writer
}
func (s *ParquetWriter[T]) AddRecord(item T) {
s.ch <- item
}
func (s *ParquetWriter[T]) writeParquet() {
items := make([]T, 0)
// 批量写入
for item := range s.ch {
items = append(items, item)
if len(items) >= s.writeBatchSize {
s.writeBatchRecords(items)
items = make([]T, 0)
}
}
// 处理剩余数据
if len(items) > 0 {
s.writeBatchRecords(items)
}
}
func (s *ParquetWriter[T]) writeBatchRecords(items []T) {
builder := array.NewRecordBuilder(memory.DefaultAllocator, s.schema)
for _, item := range items {
item.BuildRecord(builder)
}
rec := builder.NewRecord()
defer rec.Release() // 确保 rec 总是被释放
if err := s.writer.Write(rec); err != nil {
fmt.Printf("批量写入 Record 失败: %v\n", err)
return
}
fmt.Println("写入数据:", len(items))
}
func (s *ParquetWriter[T]) quitSignal() {
// 监听 control+c,quit 信号
// 退出时关闭 writer
// 保证数据写入完成
ch := make(chan os.Signal, 0)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-ch
// 关闭 writer
fmt.Println("接收到退出信号,准备退出...")
s.Close()
}
func (s *ParquetWriter[T]) Close() {
close(s.ch)
s.wg.Wait() // 等待 writeParquet 完成
if s.writer != nil {
if err := s.writer.Close(); err != nil {
fmt.Printf("关闭 writer 失败: %v,%v\n", s.filename, err)
}
}
if s.fs != nil {
if err := s.fs.Close(); err != nil {
fmt.Printf("关闭文件失败: %v,%v\n", s.filename, err)
}
}
}
- 示例 - Demo
func main() {
writeParquet()
readParquet()
}
type ApiLog struct {
RequestIp string `json:"request_ip" bson:"request_ip" orc:"request_ip"`
Date int `json:"date" bson:"date" orc:"date"`
Hour int `json:"hour" bson:"hour" orc:"hour"`
Start int64 `json:"start" bson:"start" orc:"start"`
End int64 `json:"end" bson:"end" orc:"end"`
Spent int64 `json:"spent" bson:"spent" orc:"spent"`
Method string `json:"method" bson:"method" orc:"method"`
Url string `json:"url" bson:"url" orc:"url"`
Req string `json:"req" bson:"req" orc:"req"`
Body string `json:"body" bson:"body" orc:"body"`
Error string `json:"error" bson:"error" orc:"error"`
}
var ApiLogFields = []arrow.Field{
{Name: "request_ip", Type: arrow.BinaryTypes.String},
{Name: "date", Type: arrow.PrimitiveTypes.Int32},
{Name: "hour", Type: arrow.PrimitiveTypes.Int32},
{Name: "start", Type: arrow.PrimitiveTypes.Int64},
{Name: "end", Type: arrow.PrimitiveTypes.Int64},
{Name: "spent", Type: arrow.PrimitiveTypes.Int64},
{Name: "method", Type: arrow.BinaryTypes.String},
{Name: "url", Type: arrow.BinaryTypes.String},
{Name: "req", Type: arrow.BinaryTypes.String},
{Name: "body", Type: arrow.BinaryTypes.String},
{Name: "error", Type: arrow.BinaryTypes.String},
}
func (s *ApiLog) Schema() *arrow.Schema {
return arrow.NewSchema(ApiLogFields, nil)
}
func (s *ApiLog) Fields() []arrow.Field {
return ApiLogFields
}
func (s *ApiLog) BuildRecord(builder *array.RecordBuilder) {
for i := 0; i < len(ApiLogFields); i++ {
switch ApiLogFields[i].Name {
case "request_ip":
builder.Field(i).(*array.StringBuilder).Append(s.RequestIp)
case "date":
builder.Field(i).(*array.Int32Builder).Append(int32(s.Date))
case "hour":
builder.Field(i).(*array.Int32Builder).Append(int32(s.Hour))
case "start":
builder.Field(i).(*array.Int64Builder).Append(s.Start)
case "end":
builder.Field(i).(*array.Int64Builder).Append(s.End)
case "spent":
builder.Field(i).(*array.Int64Builder).Append(s.Spent)
case "method":
builder.Field(i).(*array.StringBuilder).Append(s.Method)
case "url":
builder.Field(i).(*array.StringBuilder).Append(s.Url)
case "req":
builder.Field(i).(*array.StringBuilder).Append(s.Req)
case "body":
builder.Field(i).(*array.StringBuilder).Append(s.Body)
case "error":
builder.Field(i).(*array.StringBuilder).Append(s.Error)
}
}
}
func (s *ApiLog) ParseRecord(column arrow.Array, colIdx int, rowIdx int) {
fieldName := ApiLogFields[colIdx].Name
switch fieldName {
case "request_ip":
s.RequestIp = column.(*array.String).Value(rowIdx)
case "date":
s.Date = int(column.(*array.Int32).Value(rowIdx))
case "hour":
s.Hour = int(column.(*array.Int32).Value(rowIdx))
case "start":
s.Start = column.(*array.Int64).Value(rowIdx)
case "end":
s.End = column.(*array.Int64).Value(rowIdx)
case "spent":
s.Spent = column.(*array.Int64).Value(rowIdx)
case "method":
s.Method = column.(*array.String).Value(rowIdx)
case "url":
s.Url = column.(*array.String).Value(rowIdx)
case "req":
s.Req = column.(*array.String).Value(rowIdx)
case "body":
s.Body = column.(*array.String).Value(rowIdx)
case "error":
s.Error = column.(*array.String).Value(rowIdx)
}
}
func writeParquet() {
w := NewParquetWriter[*ApiLog]("test.parquet", ApiLogFields, 10000, compress.Codecs.Snappy)
w.Run()
// 生成 10 万条日志数据(模拟大数据量)
total := 100000
ts := time.Now()
for i := 0; i < total; i++ {
// 填充数据到 builder
apiLog := ApiLog{
RequestIp: "",
Date: 20230101,
Hour: 12,
Start: 1672531200000,
End: 1672531201000,
Spent: 1000,
Method: "GET",
Url: "/api/v1/data",
Req: fmt.Sprintf("req%d", i),
Body: fmt.Sprintf("body%d", i),
Error: "",
}
w.AddRecord(&apiLog)
}
time.Sleep(2 * time.Second)
w.Close()
fmt.Println("写入数据:", total, "写入数据耗时:", time.Since(ts))
return
}
func readParquet() {
ts := time.Now()
records := NewParquetReader[*ApiLog]("test.parquet", 10000, func() *ApiLog {
return new(ApiLog)
}).ReadParquet()
fmt.Println("读取到数据:", len(records), "读取数据耗时:", time.Since(ts))
return
}