项目作者: khezen

项目描述 :
Apache AVRO for go
高级语言: Go
项目地址: git://github.com/khezen/avro.git
创建时间: 2019-04-07T12:22:46Z
项目社区:https://github.com/khezen/avro

开源协议:MIT License

下载


avro

Build Status codecov
Go Report Card

The purpose of this package is to facilitate use of AVRO with go strong typing.

Features

github.com/khezen/avro

GoDoc

github.com/khezen/avro/sqlavro

GoDoc

github.com/khezen/avro/redshiftavro

GoDoc

What is AVRO

Apache AVRO is a data serialization system which relies on JSON schemas.

It provides:

  • Rich data structures
  • A compact, fast, binary data format
  • A container file, to store persistent data
  • Remote procedure call (RPC)

AVRO binary encoded data comes together with its schema and therefore is fully self-describing.

When AVRO data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small.

When AVRO data is stored in a file, its schema is stored with it, so that files may be processed later by any program. If the program reading the data expects a different schema this can be easily resolved, since both schemas are present.

Examples

Schema Marshal/Unmarshal

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/khezen/avro"
  6. )
  7. func main() {
  8. schemaBytes := []byte(
  9. `{
  10. "type": "record",
  11. "namespace": "test",
  12. "name": "LongList",
  13. "aliases": [
  14. "LinkedLongs"
  15. ],
  16. "doc": "linked list of 64 bits integers",
  17. "fields": [
  18. {
  19. "name": "value",
  20. "type": "long"
  21. },
  22. {
  23. "name": "next",
  24. "type": [
  25. "null",
  26. "LongList"
  27. ]
  28. }
  29. ]
  30. }`,
  31. )
  32. // Unmarshal JSON bytes to Schema interface
  33. var anySchema avro.AnySchema
  34. err := json.Unmarshal(schemaBytes, &anySchema)
  35. if err != nil {
  36. panic(err)
  37. }
  38. schema := anySchema.Schema()
  39. // Marshal Schema interface to JSON bytes
  40. schemaBytes, err = json.Marshal(schema)
  41. if err != nil {
  42. panic(err)
  43. }
  44. fmt.Println(string(schemaBytes))
  45. }
  1. {
  2. "type": "record",
  3. "namespace": "test",
  4. "name": "LongList",
  5. "aliases": [
  6. "LinkedLongs"
  7. ],
  8. "doc": "linked list of 64 bits integers",
  9. "fields": [
  10. {
  11. "name": "value",
  12. "type": "long"
  13. },
  14. {
  15. "name": "next",
  16. "type": [
  17. "null",
  18. "LongList"
  19. ]
  20. }
  21. ]
  22. }

Convert SQL Table to AVRO Schema

  1. package main
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/khezen/avro/sqlavro"
  7. )
  8. func main() {
  9. db, err := sql.Open("mysql", "root@/blog")
  10. if err != nil {
  11. panic(err)
  12. }
  13. defer db.Close()
  14. _, err = db.Exec(
  15. `CREATE TABLE posts(
  16. ID INT NOT NULL,
  17. title VARCHAR(128) NOT NULL,
  18. body LONGBLOB NOT NULL,
  19. content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
  20. post_date DATETIME NOT NULL,
  21. update_date DATETIME,
  22. reading_time_minutes DECIMAL(3,1),
  23. PRIMARY KEY(ID)
  24. )`,
  25. )
  26. if err != nil {
  27. panic(err)
  28. }
  29. schemas, err := sqlavro.SQLDatabase2AVRO(db, "blog")
  30. if err != nil {
  31. panic(err)
  32. }
  33. schemasBytes, err := json.Marshal(schemas)
  34. if err != nil {
  35. panic(err)
  36. }
  37. fmt.Println(string(schemasBytes))
  38. }
  1. [
  2. {
  3. "type": "record",
  4. "namespace": "blog",
  5. "name": "posts",
  6. "fields": [
  7. {
  8. "name": "ID",
  9. "type": "int"
  10. },
  11. {
  12. "name": "title",
  13. "type": "string"
  14. },
  15. {
  16. "name": "body",
  17. "type": "bytes"
  18. },
  19. {
  20. "name": "content_type",
  21. "type": [
  22. "string",
  23. "null"
  24. ],
  25. "default": "text/markdown; charset=UTF-8"
  26. },
  27. {
  28. "name": "post_date",
  29. "type": {
  30. "type": "int",
  31. "doc":"datetime",
  32. "logicalType": "timestamp"
  33. }
  34. },
  35. {
  36. "name": "update_date",
  37. "type": [
  38. "null",
  39. {
  40. "type": "int",
  41. "doc":"datetime",
  42. "logicalType": "timestamp"
  43. }
  44. ]
  45. },
  46. {
  47. "name": "reading_time_minutes",
  48. "type": [
  49. "null",
  50. {
  51. "type": "bytes",
  52. "logicalType": "decimal",
  53. "precision": 3,
  54. "scale": 1
  55. }
  56. ]
  57. }
  58. ]
  59. }
  60. ]

Query records from SQL into AVRO or CSV binary

  1. package main
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "io/ioutil"
  6. "time"
  7. "github.com/khezen/avro"
  8. "github.com/khezen/avro/sqlavro"
  9. )
  10. func main() {
  11. db, err := sql.Open("mysql", "root@/blog")
  12. if err != nil {
  13. panic(err)
  14. }
  15. defer db.Close()
  16. _, err = db.Exec(
  17. `CREATE TABLE posts(
  18. ID INT NOT NULL,
  19. title VARCHAR(128) NOT NULL,
  20. body LONGBLOB NOT NULL,
  21. content_type VARCHAR(128) DEFAULT 'text/markdown; charset=UTF-8',
  22. post_date DATETIME NOT NULL,
  23. update_date DATETIME,
  24. reading_time_minutes DECIMAL(3,1),
  25. PRIMARY KEY(ID)
  26. )`,
  27. )
  28. if err != nil {
  29. panic(err)
  30. }
  31. _, err = db.Exec(
  32. // statement
  33. `INSERT INTO posts(ID,title,body,content_type,post_date,update_date,reading_time_minutes)
  34. VALUES (?,?,?,?,?,?,?)`,
  35. // values
  36. 42,
  37. "lorem ispum",
  38. []byte(`Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.`),
  39. "text/markdown; charset=UTF-8",
  40. "2009-04-10 00:00:00",
  41. "2009-04-10 00:00:00",
  42. "4.2",
  43. )
  44. if err != nil {
  45. panic(err)
  46. }
  47. schema, err := sqlavro.SQLTable2AVRO(db, "blog", "posts")
  48. if err != nil {
  49. panic(err)
  50. }
  51. limit := 1000
  52. order := avro.Ascending
  53. from, err := time.Parse("2006-02-01 15:04:05", "2009-04-10 00:00:00")
  54. if err != nil {
  55. panic(err)
  56. }
  57. avroBytes, updatedCriteria, err := sqlavro.Query(sqlavro.QueryConfig{
  58. DB: db,
  59. DBName: "blog",
  60. Schema: schema,
  61. Limit: limit,
  62. Criteria: []sqlavro.Criterion{
  63. *sqlavro.NewCriterionDateTime("post_date", &from, order),
  64. },
  65. Output: "avro",
  66. })
  67. if err != nil {
  68. panic(err)
  69. }
  70. err = ioutil.WriteFile("/tmp/blog_posts.avro", avroBytes, 0644)
  71. if err != nil {
  72. panic(err)
  73. }
  74. fmt.Println(updatedCriteria)
  75. }

Notes

  • When record fields contains aliases, the first alias is used in the query instead of the field name.

Types

Avro Go SQL
null nil NULL
bytes []byte BLOB,MEDIUMBLOB,LONGBLOB
fixed []byte CHAR,NCHAR
string,enum string VARCHAR, NVARCHAR,TEXT,TINYTEXT,MEDIUMTEXT,LONGTEXT,ENUM,SET
float float32 FLOAT
double float64 DOUBLE
long int64 BIGINT
int int32 TINYINT,SMALLINT,INT,YEAR
decimal *big.Rat DECIMAL
time int32 TIME
timestamp int32 TIMESTAMP,DATETIME
date time.Time DATE
array []interface{} N/A
map,record map[string]interface{} N/A
union see below any type nullable

Because of encoding rules for Avro unions, when an union’s value is
null, a simple Go nil is returned. However when an union’s value
is non-nil, a Go map[string]interface{} with a single key is
returned for the union. The map’s single key is the Avro type name and
its value is the datum’s value.

Produce Redshift create statement from AVRO schema

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/khezen/avro"
  6. "github.com/khezen/avro/redshiftavro"
  7. )
  8. func main() {
  9. schemaBytes := []byte(`
  10. {
  11. "type": "record",
  12. "namespace": "blog",
  13. "name": "posts",
  14. "fields": [
  15. {
  16. "name": "ID",
  17. "type": "int"
  18. },
  19. {
  20. "name": "title",
  21. "type": "string"
  22. },
  23. {
  24. "name": "body",
  25. "type": "bytes"
  26. },
  27. {
  28. "name": "content_type",
  29. "type": [
  30. "string",
  31. "null"
  32. ],
  33. "default": "text/markdown; charset=UTF-8"
  34. },
  35. {
  36. "name": "post_date",
  37. "type": {
  38. "type": "int",
  39. "doc":"datetime",
  40. "logicalType": "timestamp"
  41. }
  42. },
  43. {
  44. "name": "update_date",
  45. "type": [
  46. "null",
  47. {
  48. "type": "int",
  49. "doc":"datetime",
  50. "logicalType": "timestamp"
  51. }
  52. ]
  53. },
  54. {
  55. "name": "reading_time_minutes",
  56. "type": [
  57. "null",
  58. {
  59. "type": "bytes",
  60. "logicalType": "decimal",
  61. "precision": 3,
  62. "scale": 1
  63. }
  64. ]
  65. }
  66. ]
  67. }`)
  68. var anySchema avro.AnySchema
  69. err := json.Unmarshal(schemaBytes, &anySchema)
  70. if err != nil {
  71. panic(err)
  72. }
  73. schema := anySchema.Schema().(*avro.RecordSchema)
  74. cfg := redshiftavro.CreateConfig{
  75. Schema: *schema,
  76. SortKeys: []string{"post_date", "title"},
  77. IfNotExists: true,
  78. }
  79. statement, err := redshiftavro.CreateTableStatement(cfg)
  80. if err != nil {
  81. panic(err)
  82. }
  83. fmt.Println(statement)
  84. }
  1. CREATE TABLE IF NOT EXISTS posts(
  2. ID INTEGER ENCODE LZO NOT NULL,
  3. title VARCHAR(65535) ENCODE RAW NOT NULL,
  4. body VARCHAR(65535) ENCODE ZSTD NOT NULL,
  5. content_type VARCHAR(65535) ENCODE ZSTD NULL,
  6. post_date TIMESTAMP WITHOUT TIME ZONE ENCODE RAW NOT NULL,
  7. update_date TIMESTAMP WITHOUT TIME ZONE ENCODE LZO NULL,
  8. reading_time_minutes DECIMAL(3,1) ENCODE RAW NULL
  9. )
  10. SORTKEY(
  11. post_date,
  12. title
  13. )

Issues

If you have any problems or questions, please ask for help through a GitHub issue.

Contributions

Help is always welcome! For example, documentation (like the text you are reading now) can always use improvement. There’s always code that can be improved. If you ever see something you think should be fixed, you should own it. If you have no idea what to start on, you can browse the issues labeled with help wanted.

As a potential contributor, your changes and ideas are welcome at any hour of the day or night, weekdays, weekends, and holidays. Please do not ever hesitate to ask a question or send a pull request.

Code of conduct.