Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type MigrationContext struct {
AzureMySQL bool
AttemptInstantDDL bool

// MaxAuthFailures is the maximum number of authentication failures before aborting
// This prevents retry storms that can trigger firewall rules
MaxAuthFailures int

// SkipPortValidation allows skipping the port validation in `ValidateConnection`
// This is useful when connecting to a MySQL instance where the external port
// may not match the internal port.
Expand Down Expand Up @@ -360,6 +364,16 @@ func (this *MigrationContext) GetOldTableName() string {
return getSafeTableName(tableName, "del")
}

// GetGhostDatabaseName returns the database name for ghost/changelog tables
// If GhostDatabaseName is set (for separate schema), use it
// Otherwise, use the same database as the original table
func (this *MigrationContext) GetGhostDatabaseName() string {
if this.GhostDatabaseName != "" {
return this.GhostDatabaseName
}
return this.DatabaseName
}

// GetChangelogTableName generates the name of changelog table, based on original table name
// or a given table name.
func (this *MigrationContext) GetChangelogTableName() string {
Expand Down
87 changes: 79 additions & 8 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package binlog

import (
"fmt"
"strings"
"sync"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/pkg/errors"

"time"

Expand All @@ -28,6 +30,7 @@ type GoMySQLReader struct {
currentCoordinates mysql.BinlogCoordinates
currentCoordinatesMutex *sync.Mutex
LastAppliedRowsEventHint mysql.BinlogCoordinates
authFailureCount int
}

func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
Expand All @@ -52,6 +55,36 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
}
}

// handleAuthError processes authentication errors and applies circuit breaker logic
func (this *GoMySQLReader) handleAuthError(err error, context string) error {
if err == nil {
// Success case - reset counter if needed
if this.authFailureCount > 0 {
this.migrationContext.Log.Infof("%s successful, resetting auth failure count from %d to 0", context, this.authFailureCount)
this.authFailureCount = 0
}
return nil
}

// Check if this is an authentication error
if !this.isAuthenticationError(err) {
return err // Not an auth error, return as-is
}

// Authentication error - increment counter and check circuit breaker
this.authFailureCount++

if this.migrationContext.MaxAuthFailures > 0 && this.authFailureCount >= this.migrationContext.MaxAuthFailures {
return fmt.Errorf("authentication failed %d times (max: %d) during %s, aborting to prevent firewall blocking: %w",
this.authFailureCount, this.migrationContext.MaxAuthFailures, context, err)
}

this.migrationContext.Log.Errorf("Authentication failure #%d during %s (max: %d): %v",
this.authFailureCount, context, this.migrationContext.MaxAuthFailures, err)

return err
}

// ConnectBinlogStreamer
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
if coordinates.IsEmpty() {
Expand All @@ -66,7 +99,8 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
Pos: uint32(this.currentCoordinates.LogPos),
})

return err
// Handle the error (or success) with circuit breaker logic
return this.handleAuthError(err, "connection")
}

func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
Expand All @@ -79,7 +113,7 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
// StreamEvents
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) {
return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
return fmt.Errorf("unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
}

if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
Expand All @@ -89,7 +123,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven

dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
return fmt.Errorf("unknown DML type: %s", ev.Header.EventType.String())
}
for i, row := range rowsEvent.Rows {
if dml == UpdateDML && i%2 == 1 {
Expand Down Expand Up @@ -133,14 +167,16 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
if canStopStreaming() {
return nil
}
for {
if canStopStreaming() {
break
}
for !canStopStreaming() {
ev, err := this.binlogStreamer.GetEvent(context.Background())
if err != nil {
return err
// Handle authentication errors with circuit breaker
return this.handleAuthError(err, "streaming")
}

// Reset counter on successful event (using handleAuthError with nil)
this.handleAuthError(nil, "event retrieval")

func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
Expand Down Expand Up @@ -171,3 +207,38 @@ func (this *GoMySQLReader) Close() error {
this.binlogSyncer.Close()
return nil
}

// MySQL error codes for authentication failures
const (
ER_DBACCESS_DENIED_ERROR = 1044 // Access denied for user to database
ER_ACCESS_DENIED_ERROR = 1045 // Access denied for user (using password: YES/NO)
ER_HOST_NOT_ALLOWED = 1130 // Host is not allowed to connect
ER_ACCESS_DENIED_NO_PASSWORD = 1698 // Access denied (no password provided)
ER_ACCOUNT_HAS_BEEN_LOCKED = 3118 // Account has been locked
)

// isAuthenticationError checks if the error is an authentication failure
func (this *GoMySQLReader) isAuthenticationError(err error) bool {
if err == nil {
return false
}

// Check for MySQL protocol errors using proper type assertion
var myErr *gomysql.MyError
if errors.As(err, &myErr) {
switch myErr.Code {
case ER_ACCESS_DENIED_ERROR,
ER_DBACCESS_DENIED_ERROR,
ER_HOST_NOT_ALLOWED,
ER_ACCESS_DENIED_NO_PASSWORD,
ER_ACCOUNT_HAS_BEEN_LOCKED:
return true
}
}

// Fallback: Check error string for compatibility with errors
// that might not be properly typed (e.g., from proxy or older versions)
errStr := strings.ToLower(err.Error())
return strings.Contains(errStr, "access denied") ||
strings.Contains(errStr, "authentication failed")
}
Loading
Loading