Skip to content

Commit 8d140e5

Browse files
committed
feat: Bring back Impala support
- Changes the Impala driver from the abandoned bippio/go-impala to the maintained sclgo/impala-go. - Moves Impala from the "bad" to the "most" driver set - Add MetaReader for Impala - Implements copy for Impala, extending the generic CopyWithInsert with an option to work without transactions - for DBs that don't support them - Implements IsPasswordErr The PR targets release-19, not master, so that the CI confirms no regressions caused by the change in drivers.CopyWithInsert. (CI on master is broken). The commit shouldn't create issues when it is cherry-picked to master afterwards.
1 parent e6ddf2a commit 8d140e5

6 files changed

Lines changed: 226 additions & 97 deletions

File tree

drivers/drivers.go

Lines changed: 73 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -526,88 +526,100 @@ func Copy(ctx context.Context, u *dburl.URL, stdout, stderr func() io.Writer, ro
526526
return d.Copy(ctx, db, rows, table)
527527
}
528528

529-
// CopyWithInsert builds a copy handler based on insert.
529+
// CopyWithInsert builds a typical copy handler based on insert.
530530
func CopyWithInsert(placeholder func(int) string) func(ctx context.Context, db *sql.DB, rows *sql.Rows, table string) (int64, error) {
531531
if placeholder == nil {
532532
placeholder = func(n int) string { return fmt.Sprintf("$%d", n) }
533533
}
534534
return func(ctx context.Context, db *sql.DB, rows *sql.Rows, table string) (int64, error) {
535-
columns, err := rows.Columns()
536-
if err != nil {
537-
return 0, fmt.Errorf("failed to fetch source rows columns: %w", err)
538-
}
539-
clen := len(columns)
540-
query := table
541-
if !strings.HasPrefix(strings.ToLower(query), "insert into") {
542-
leftParen := strings.IndexRune(table, '(')
543-
if leftParen == -1 {
544-
colRows, err := db.QueryContext(ctx, "SELECT * FROM "+table+" WHERE 1=0")
545-
if err != nil {
546-
return 0, fmt.Errorf("failed to execute query to determine target table columns: %w", err)
547-
}
548-
columns, err := colRows.Columns()
549-
_ = colRows.Close()
550-
if err != nil {
551-
return 0, fmt.Errorf("failed to fetch target table columns: %w", err)
552-
}
553-
table += "(" + strings.Join(columns, ", ") + ")"
535+
return FlexibleCopyWithInsert(ctx, db, rows, table, placeholder, true)
536+
}
537+
}
538+
539+
func FlexibleCopyWithInsert(ctx context.Context, db *sql.DB, rows *sql.Rows, table string, placeholder func(int) string, withTransaction bool) (int64, error) {
540+
columns, err := rows.Columns()
541+
if err != nil {
542+
return 0, fmt.Errorf("failed to fetch source rows columns: %w", err)
543+
}
544+
clen := len(columns)
545+
query := table
546+
if !strings.HasPrefix(strings.ToLower(query), "insert into") {
547+
leftParen := strings.IndexRune(table, '(')
548+
if leftParen == -1 {
549+
colRows, err := db.QueryContext(ctx, "SELECT * FROM "+table+" WHERE 1=0")
550+
if err != nil {
551+
return 0, fmt.Errorf("failed to execute query to determine target table columns: %w", err)
554552
}
555-
// TODO if the db supports multiple rows per insert, create batches of 100 rows
556-
placeholders := make([]string, clen)
557-
for i := 0; i < clen; i++ {
558-
placeholders[i] = placeholder(i + 1)
553+
columns, err := colRows.Columns()
554+
_ = colRows.Close()
555+
if err != nil {
556+
return 0, fmt.Errorf("failed to fetch target table columns: %w", err)
559557
}
560-
query = "INSERT INTO " + table + " VALUES (" + strings.Join(placeholders, ", ") + ")"
558+
table += "(" + strings.Join(columns, ", ") + ")"
559+
}
560+
// TODO if the db supports multiple rows per insert, create batches of 100 rows
561+
placeholders := make([]string, clen)
562+
for i := 0; i < clen; i++ {
563+
placeholders[i] = placeholder(i + 1)
561564
}
562-
tx, err := db.BeginTx(ctx, nil)
565+
query = "INSERT INTO " + table + " VALUES (" + strings.Join(placeholders, ", ") + ")"
566+
}
567+
var stmt *sql.Stmt
568+
var tx *sql.Tx
569+
if withTransaction {
570+
tx, err = db.BeginTx(ctx, nil)
563571
if err != nil {
564572
return 0, fmt.Errorf("failed to begin transaction: %w", err)
565573
}
566-
stmt, err := tx.PrepareContext(ctx, query)
574+
stmt, err = tx.PrepareContext(ctx, query)
575+
} else {
576+
stmt, err = db.PrepareContext(ctx, query)
577+
}
578+
if err != nil {
579+
return 0, fmt.Errorf("failed to prepare insert query: %w", err)
580+
}
581+
defer stmt.Close()
582+
columnTypes, err := rows.ColumnTypes()
583+
if err != nil {
584+
return 0, fmt.Errorf("failed to fetch source column types: %w", err)
585+
}
586+
values := make([]interface{}, clen)
587+
valueRefs := make([]reflect.Value, clen)
588+
actuals := make([]interface{}, clen)
589+
for i := 0; i < len(columnTypes); i++ {
590+
valueRefs[i] = reflect.New(columnTypes[i].ScanType())
591+
values[i] = valueRefs[i].Interface()
592+
}
593+
var n int64
594+
for rows.Next() {
595+
err = rows.Scan(values...)
567596
if err != nil {
568-
return 0, fmt.Errorf("failed to prepare insert query: %w", err)
597+
return n, fmt.Errorf("failed to scan row: %w", err)
569598
}
570-
defer stmt.Close()
571-
columnTypes, err := rows.ColumnTypes()
572-
if err != nil {
573-
return 0, fmt.Errorf("failed to fetch source column types: %w", err)
599+
//We can't use values... in Exec() below, because some drivers
600+
//don't accept pointer to an argument instead of the arg itself.
601+
for i := range values {
602+
actuals[i] = valueRefs[i].Elem().Interface()
574603
}
575-
values := make([]interface{}, clen)
576-
valueRefs := make([]reflect.Value, clen)
577-
actuals := make([]interface{}, clen)
578-
for i := 0; i < len(columnTypes); i++ {
579-
valueRefs[i] = reflect.New(columnTypes[i].ScanType())
580-
values[i] = valueRefs[i].Interface()
604+
res, err := stmt.ExecContext(ctx, actuals...)
605+
if err != nil {
606+
return n, fmt.Errorf("failed to exec insert: %w", err)
581607
}
582-
var n int64
583-
for rows.Next() {
584-
err = rows.Scan(values...)
585-
if err != nil {
586-
return n, fmt.Errorf("failed to scan row: %w", err)
587-
}
588-
//We can't use values... in Exec() below, because some drivers
589-
//don't accept pointer to an argument instead of the arg itself.
590-
for i := range values {
591-
actuals[i] = valueRefs[i].Elem().Interface()
592-
}
593-
res, err := stmt.ExecContext(ctx, actuals...)
594-
if err != nil {
595-
return n, fmt.Errorf("failed to exec insert: %w", err)
596-
}
597-
rn, err := res.RowsAffected()
598-
if err != nil {
599-
return n, fmt.Errorf("failed to check rows affected: %w", err)
600-
}
601-
n += rn
608+
rn, err := res.RowsAffected()
609+
if err != nil {
610+
return n, fmt.Errorf("failed to check rows affected: %w", err)
602611
}
603-
// TODO if using batches, flush the last batch,
604-
// TODO prepare another statement and count remaining rows
612+
n += rn
613+
}
614+
// TODO if using batches, flush the last batch,
615+
// TODO prepare another statement and count remaining rows
616+
if tx != nil {
605617
err = tx.Commit()
606618
if err != nil {
607619
return n, fmt.Errorf("failed to commit transaction: %w", err)
608620
}
609-
return n, rows.Err()
610621
}
622+
return n, rows.Err()
611623
}
612624

613625
func init() {

drivers/impala/impala.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,30 @@
11
// Package impala defines and registers usql's Apache Impala driver.
22
//
33
// See: https://github.com/bippio/go-impala
4-
// Group: bad
54
package impala
65

76
import (
8-
_ "github.com/bippio/go-impala" // DRIVER
7+
"context"
8+
"database/sql"
9+
"errors"
10+
11+
"github.com/sclgo/impala-go" // DRIVER
912
"github.com/xo/usql/drivers"
13+
meta "github.com/xo/usql/drivers/metadata/impala"
1014
)
1115

1216
func init() {
13-
drivers.Register("impala", drivers.Driver{})
17+
drivers.Register("impala", drivers.Driver{
18+
NewMetadataReader: meta.New,
19+
Copy: func(ctx context.Context, db *sql.DB, rows *sql.Rows, table string) (int64, error) {
20+
placeholder := func(int) string {
21+
return "?"
22+
}
23+
return drivers.FlexibleCopyWithInsert(ctx, db, rows, table, placeholder, false)
24+
},
25+
IsPasswordErr: func(err error) bool {
26+
var authError *impala.AuthError
27+
return errors.As(err, &authError)
28+
},
29+
})
1430
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package impala
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
7+
"github.com/xo/usql/drivers"
8+
"github.com/xo/usql/drivers/metadata"
9+
10+
driver "github.com/sclgo/impala-go"
11+
)
12+
13+
type MetaReader struct {
14+
meta *driver.Metadata
15+
}
16+
17+
func (r MetaReader) Columns(filter metadata.Filter) (*metadata.ColumnSet, error) {
18+
columnIds, err := r.meta.GetColumns(context.Background(), filter.Schema, filter.Parent, filter.Name)
19+
if err != nil {
20+
return nil, err
21+
}
22+
columns := make([]metadata.Column, len(columnIds))
23+
for i, columnId := range columnIds {
24+
columns[i] = metadata.Column{
25+
Schema: columnId.Schema,
26+
Table: columnId.TableName,
27+
Name: columnId.ColumnName,
28+
}
29+
}
30+
return metadata.NewColumnSet(columns), nil
31+
}
32+
33+
func (r MetaReader) Schemas(filter metadata.Filter) (*metadata.SchemaSet, error) {
34+
schemaNames, err := r.meta.GetSchemas(context.Background(), filter.Name)
35+
if err != nil {
36+
return nil, err
37+
}
38+
schemas := make([]metadata.Schema, len(schemaNames))
39+
for i, name := range schemaNames {
40+
schemas[i] = metadata.Schema{
41+
Schema: name,
42+
}
43+
}
44+
return metadata.NewSchemaSet(schemas), nil
45+
}
46+
47+
func (r MetaReader) Tables(filter metadata.Filter) (*metadata.TableSet, error) {
48+
tableIds, err := r.meta.GetTables(context.Background(), filter.Schema, filter.Name)
49+
if err != nil {
50+
return nil, err
51+
}
52+
tables := make([]metadata.Table, len(tableIds))
53+
for i, table := range tableIds {
54+
tables[i] = metadata.Table{
55+
Schema: table.Schema,
56+
Name: table.Name,
57+
Type: table.Type,
58+
}
59+
}
60+
return metadata.NewTableSet(tables), nil
61+
}
62+
63+
var (
64+
_ metadata.SchemaReader = (*MetaReader)(nil)
65+
_ metadata.TableReader = (*MetaReader)(nil)
66+
_ metadata.ColumnReader = (*MetaReader)(nil)
67+
)
68+
69+
func New(db drivers.DB, _ ...metadata.ReaderOption) metadata.Reader {
70+
if sqlDb, ok := db.(*sql.DB); ok {
71+
return &MetaReader{
72+
meta: driver.NewMetadata(sqlDb),
73+
}
74+
} else {
75+
return struct{}{} // reader with no capabilities
76+
}
77+
}

go.mod

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ require (
1616
github.com/amsokol/ignite-go-client v0.12.2
1717
github.com/apache/arrow/go/v17 v17.0.0
1818
github.com/apache/calcite-avatica-go/v5 v5.4.0
19-
github.com/bippio/go-impala v2.1.0+incompatible
2019
github.com/btnguyen2k/gocosmos v1.1.0
2120
github.com/btnguyen2k/godynamo v1.3.0
2221
github.com/chaisql/chai v0.16.1-0.20240218103834-23e406360fd2
@@ -50,6 +49,7 @@ require (
5049
github.com/ory/dockertest/v3 v3.11.0
5150
github.com/prestodb/presto-go-client v0.0.0-20240426182841-905ac40a1783
5251
github.com/proullon/ramsql v0.1.4
52+
github.com/sclgo/impala-go v1.1.0
5353
github.com/sijms/go-ora/v2 v2.8.24
5454
github.com/snowflakedb/gosnowflake v1.13.2
5555
github.com/spf13/cobra v1.9.1
@@ -84,15 +84,15 @@ require (
8484
cloud.google.com/go/longrunning v0.6.6 // indirect
8585
cloud.google.com/go/monitoring v1.24.1 // indirect
8686
cloud.google.com/go/spanner v1.78.0 // indirect
87-
dario.cat/mergo v1.0.0 // indirect
87+
dario.cat/mergo v1.0.1 // indirect
8888
filippo.io/edwards25519 v1.1.0 // indirect
8989
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
9090
github.com/99designs/keyring v1.2.2 // indirect
9191
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.0 // indirect
9292
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.2 // indirect
9393
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.0 // indirect
9494
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 // indirect
95-
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
95+
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
9696
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect
9797
github.com/BurntSushi/toml v1.5.0 // indirect
9898
github.com/ClickHouse/ch-go v0.65.1 // indirect
@@ -236,7 +236,7 @@ require (
236236
github.com/kr/pretty v0.3.1 // indirect
237237
github.com/kr/text v0.2.0 // indirect
238238
github.com/kylelemons/godebug v1.1.0 // indirect
239-
github.com/magiconair/properties v1.8.7 // indirect
239+
github.com/magiconair/properties v1.8.9 // indirect
240240
github.com/mattn/go-colorable v0.1.14 // indirect
241241
github.com/mattn/go-runewidth v0.0.16 // indirect
242242
github.com/mattn/go-sixel v0.0.5 // indirect
@@ -249,17 +249,17 @@ require (
249249
github.com/mithrandie/ternary v1.1.1 // indirect
250250
github.com/moby/docker-image-spec v1.3.1 // indirect
251251
github.com/moby/patternmatcher v0.6.0 // indirect
252-
github.com/moby/sys/sequential v0.5.0 // indirect
253-
github.com/moby/sys/user v0.1.0 // indirect
252+
github.com/moby/sys/sequential v0.6.0 // indirect
253+
github.com/moby/sys/user v0.3.0 // indirect
254254
github.com/moby/sys/userns v0.1.0 // indirect
255-
github.com/moby/term v0.5.0 // indirect
255+
github.com/moby/term v0.5.2 // indirect
256256
github.com/mtibben/percent v0.2.1 // indirect
257257
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
258258
github.com/nakagami/chacha20 v0.1.0 // indirect
259259
github.com/nathan-fiscaletti/consolesize-go v0.0.0-20220204101620-317176b6684d // indirect
260260
github.com/ncruces/go-strftime v0.1.9 // indirect
261261
github.com/opencontainers/go-digest v1.0.0 // indirect
262-
github.com/opencontainers/image-spec v1.1.0 // indirect
262+
github.com/opencontainers/image-spec v1.1.1 // indirect
263263
github.com/opencontainers/runc v1.1.13 // indirect
264264
github.com/paulmach/orb v0.11.1 // indirect
265265
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
@@ -278,6 +278,7 @@ require (
278278
github.com/rs/zerolog v1.34.0 // indirect
279279
github.com/sagikazarmark/locafero v0.6.0 // indirect
280280
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
281+
github.com/samber/lo v1.49.1 // indirect
281282
github.com/segmentio/asm v1.2.0 // indirect
282283
github.com/shopspring/decimal v1.4.0 // indirect
283284
github.com/sirupsen/logrus v1.9.3 // indirect

0 commit comments

Comments
 (0)