Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ go_library(
"//pkg/util/set",
"//pkg/util/size",
"//pkg/util/slice",
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/stringutil",
Expand Down Expand Up @@ -368,6 +369,7 @@ go_test(
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/testkit/testutil",
"//pkg/tici",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
Expand Down
26 changes: 20 additions & 6 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ func createTable(jobCtx *jobContext, job *model.Job, r autoid.Requirement, args
return tbInfo, errors.Trace(err)
}

if err := createTiCIIndexes(jobCtx, job.SchemaName, tbInfo); err != nil {
return tbInfo, errors.Trace(err)
}

return tbInfo, nil
default:
return tbInfo, dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tbInfo.State)
Expand Down Expand Up @@ -204,7 +200,7 @@ func handleAutoIncID(r autoid.Requirement, job *model.Job, tbInfo *model.TableIn
return nil
}

func createTiCIIndexes(jobCtx *jobContext, schemaName string, tblInfo *model.TableInfo) error {
func (w *worker) createTiCIIndexes(jobCtx *jobContext, job *model.Job, schemaName string, tblInfo *model.TableInfo) error {
if tblInfo == nil {
return nil
}
Expand All @@ -218,7 +214,15 @@ func createTiCIIndexes(jobCtx *jobContext, schemaName string, tblInfo *model.Tab
if !index.IsTiCIIndex() {
continue
}
if err := tici.CreateFulltextIndex(ctx, jobCtx.store, tblInfo, index, schemaName); err != nil {
var parserInfo *tici.ParserInfo
if index.FullTextInfo != nil {
info, err := w.buildTiCIFulltextParserInfo(jobCtx, job, index)
if err != nil {
return errors.Trace(err)
}
parserInfo = info
}
if err := tici.CreateFulltextIndex(ctx, jobCtx.store, tblInfo, index, schemaName, parserInfo); err != nil {
return err
}
}
Expand Down Expand Up @@ -283,6 +287,9 @@ func (w *worker) onCreateTable(jobCtx *jobContext, job *model.Job) (ver int64, _
if err != nil {
return ver, errors.Trace(err)
}
if err := w.createTiCIIndexes(jobCtx, job, job.SchemaName, tbInfo); err != nil {
return ver, errors.Trace(err)
}

ver, err = updateSchemaVersion(jobCtx, job)
if err != nil {
Expand Down Expand Up @@ -314,6 +321,9 @@ func (w *worker) createTableWithForeignKeys(jobCtx *jobContext, job *model.Job,
if err != nil {
return ver, errors.Trace(err)
}
if err := w.createTiCIIndexes(jobCtx, job, job.SchemaName, tbInfo); err != nil {
return ver, errors.Trace(err)
}
tbInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tbInfo, true)
if err != nil {
Expand Down Expand Up @@ -383,6 +393,10 @@ func (w *worker) onCreateTables(jobCtx *jobContext, job *model.Job) (int64, erro
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if err := w.createTiCIIndexes(jobCtx, stubJob, stubJob.SchemaName, tbInfo); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tableInfos = append(tableInfos, tbInfo)
}
}
Expand Down
138 changes: 138 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,9 @@ func (e *executor) createTableWithInfoJob(
SessionVars: make(map[string]string),
}
job.AddSystemVars(vardef.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
if err := e.captureFullTextIndexSysvarsToJobFromTableInfo(ctx, job, tbInfo); err != nil {
return nil, errors.Trace(err)
}
args := &model.CreateTableArgs{
TableInfo: tbInfo,
OnExistReplace: cfg.OnExist == OnExistReplace,
Expand Down Expand Up @@ -1231,6 +1234,11 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context,
SessionVars: make(map[string]string),
}
job.AddSystemVars(vardef.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
for _, info := range infos {
if err := e.captureFullTextIndexSysvarsToJobFromTableInfo(ctx, job, info); err != nil {
return errors.Trace(err)
}
}

var err error

Expand Down Expand Up @@ -4775,6 +4783,10 @@ func (e *executor) createFulltextIndex(ctx sessionctx.Context, ti ast.Ident, ind
job.Type = model.ActionAddFullTextIndex
indexPartSpecifications[0].Expr = nil

if err := e.captureFullTextIndexSysvarsToJob(ctx, job, indexOption); err != nil {
return errors.Trace(err)
}

args := &model.ModifyIndexArgs{
IndexArgs: []*model.IndexArg{{
IndexName: indexName,
Expand Down Expand Up @@ -5005,6 +5017,132 @@ func (e *executor) createColumnarIndex(ctx sessionctx.Context, ti ast.Ident, ind
return errors.Trace(err)
}

func (e *executor) captureFullTextIndexSysvarsToJob(sctx sessionctx.Context, job *model.Job, indexOption *ast.IndexOption) error {
parser := model.FullTextParserTypeStandardV1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are to use the original parser type from the old full-text index, addtional checks and changes might be needed to check the parser type itself, as the old full-text has a wilder scope of support compared to the one we are implementing at the moment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads-up. I did not change the parser-type compatibility logic in this PR; will follow up separately if/when we need to support carrying over legacy FULLTEXT parser settings.

if indexOption != nil && indexOption.ParserName.L != "" {
parser = model.GetFullTextParserTypeBySQLName(indexOption.ParserName.L)
}

sessVars := sctx.GetSessionVars()
getVar := func(name string) (string, error) {
val, err := sessVars.GetSessionOrGlobalSystemVar(context.Background(), name)
return val, errors.Trace(err)
}

maxTokenSize, err := getVar(vardef.InnodbFtMaxTokenSize)
if err != nil {
return err
}
minTokenSize, err := getVar(vardef.InnodbFtMinTokenSize)
if err != nil {
return err
}
ngramTokenSize, err := getVar(vardef.NgramTokenSize)
if err != nil {
return err
}
enableStopword, err := getVar(vardef.InnodbFtEnableStopword)
if err != nil {
return err
}
serverStopwordTable, err := getVar(vardef.InnodbFtServerStopwordTable)
if err != nil {
return err
}
userStopwordTable, err := getVar(vardef.InnodbFtUserStopwordTable)
if err != nil {
return err
}

// Validate token size constraints early.
if parser == model.FullTextParserTypeStandardV1 {
minVal, err := strconv.ParseInt(minTokenSize, 10, 64)
if err != nil {
return errors.Trace(err)
}
maxVal, err := strconv.ParseInt(maxTokenSize, 10, 64)
if err != nil {
return errors.Trace(err)
}
if minVal > maxVal {
return variable.ErrWrongValueForVar.GenWithStackByArgs(vardef.InnodbFtMinTokenSize, minTokenSize)
}
}

// Validate stopword table schema if it will be used by standard parser.
if parser == model.FullTextParserTypeStandardV1 && variable.TiDBOptOn(enableStopword) {
stopwordTable := strings.TrimSpace(userStopwordTable)
stopwordTableVar := vardef.InnodbFtUserStopwordTable
if stopwordTable == "" {
stopwordTable = strings.TrimSpace(serverStopwordTable)
stopwordTableVar = vardef.InnodbFtServerStopwordTable
}
if stopwordTable != "" {
dbName, tblName, ok := splitFullTextStopwordTableName(stopwordTable)
if !ok {
return variable.ErrWrongValueForVar.GenWithStackByArgs(stopwordTableVar, stopwordTable)
}
is := e.infoCache.GetLatest()
tbl, err := is.TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tblName))
if err != nil {
return errors.Trace(err)
}
tblInfo := tbl.Meta()
// TiDB row-store tables are treated as InnoDB-compatible. TiFlash-only tables are not.
if tblInfo.IsColumnar {
return dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table must be an InnoDB table")
}
if len(tblInfo.Columns) != 1 || tblInfo.Columns[0].Name.L != "value" || tblInfo.Columns[0].FieldType.GetType() != mysql.TypeVarchar {
return dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table must contain a single VARCHAR column named value")
}
}
}

job.AddSystemVars(vardef.InnodbFtMaxTokenSize, maxTokenSize)
job.AddSystemVars(vardef.InnodbFtMinTokenSize, minTokenSize)
job.AddSystemVars(vardef.NgramTokenSize, ngramTokenSize)
job.AddSystemVars(vardef.InnodbFtEnableStopword, enableStopword)
job.AddSystemVars(vardef.InnodbFtServerStopwordTable, serverStopwordTable)
job.AddSystemVars(vardef.InnodbFtUserStopwordTable, userStopwordTable)
return nil
}

func (e *executor) captureFullTextIndexSysvarsToJobFromTableInfo(sctx sessionctx.Context, job *model.Job, tblInfo *model.TableInfo) error {
if tblInfo == nil {
return nil
}
for _, index := range tblInfo.Indices {
if index == nil || index.FullTextInfo == nil {
continue
}
indexOption := &ast.IndexOption{ParserName: ast.NewCIStr(index.FullTextInfo.ParserType.SQLName())}
if err := e.captureFullTextIndexSysvarsToJob(sctx, job, indexOption); err != nil {
return errors.Trace(err)
}
}
return nil
}

func splitFullTextStopwordTableName(raw string) (dbName string, tblName string, ok bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", "", false
}
parts := strings.Split(raw, "/")
if len(parts) != 2 {
parts = strings.Split(raw, ".")
}
if len(parts) != 2 {
return "", "", false
}
dbName = strings.TrimSpace(parts[0])
tblName = strings.TrimSpace(parts[1])
if dbName == "" || tblName == "" {
return "", "", false
}
return dbName, tblName, true
}

func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job {
charset, collate := ctx.GetSessionVars().GetCharsetInfo()
job := &model.Job{
Expand Down
Loading