Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions cmd/config/config_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func init() {
viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_NO_OWNER")
viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_NO_PRIVILEGES")
viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDED_SECURITY_LABELS")
viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_INCLUDE_OBJECT_TYPES")
viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDE_OBJECT_TYPES")
viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_DISABLE_PROGRESS_TRACKING")

viper.BindEnv("PGSTREAM_POSTGRES_WRITER_TARGET_URL")
Expand All @@ -86,6 +88,8 @@ func init() {
viper.BindEnv("PGSTREAM_POSTGRES_WRITER_BACKOFF_MAX_RETRIES")
viper.BindEnv("PGSTREAM_POSTGRES_WRITER_DISABLE_RETRIES")
viper.BindEnv("PGSTREAM_POSTGRES_WRITER_IGNORE_DDL")
viper.BindEnv("PGSTREAM_POSTGRES_WRITER_INCLUDE_DDL_OBJECT_TYPES")
viper.BindEnv("PGSTREAM_POSTGRES_WRITER_EXCLUDE_DDL_OBJECT_TYPES")

viper.BindEnv("PGSTREAM_KAFKA_READER_SERVERS")
viper.BindEnv("PGSTREAM_KAFKA_WRITER_SERVERS")
Expand Down Expand Up @@ -313,6 +317,8 @@ func parseSchemaSnapshotConfig(pgurl string) (*snapshotbuilder.SchemaSnapshotCon
NoOwner: viper.GetBool("PGSTREAM_POSTGRES_SNAPSHOT_NO_OWNER"),
NoPrivileges: viper.GetBool("PGSTREAM_POSTGRES_SNAPSHOT_NO_PRIVILEGES"),
ExcludedSecurityLabels: viper.GetStringSlice("PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDED_SECURITY_LABELS"),
IncludeObjectTypes: viper.GetStringSlice("PGSTREAM_POSTGRES_SNAPSHOT_INCLUDE_OBJECT_TYPES"),
ExcludeObjectTypes: viper.GetStringSlice("PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDE_OBJECT_TYPES"),
},
}, nil
}
Expand Down Expand Up @@ -479,11 +485,13 @@ func parsePostgresProcessorConfig() *stream.PostgresProcessorConfig {
ConvergenceThreshold: viper.GetFloat64("PGSTREAM_POSTGRES_WRITER_BATCH_AUTO_TUNE_CONVERGENCE_THRESHOLD"),
},
},
DisableTriggers: viper.GetBool("PGSTREAM_POSTGRES_WRITER_DISABLE_TRIGGERS"),
OnConflictAction: viper.GetString("PGSTREAM_POSTGRES_WRITER_ON_CONFLICT_ACTION"),
BulkIngestEnabled: bulkIngestEnabled,
RetryPolicy: parseBackoffConfig("PGSTREAM_POSTGRES_WRITER"),
IgnoreDDL: viper.GetBool("PGSTREAM_POSTGRES_WRITER_IGNORE_DDL"),
DisableTriggers: viper.GetBool("PGSTREAM_POSTGRES_WRITER_DISABLE_TRIGGERS"),
OnConflictAction: viper.GetString("PGSTREAM_POSTGRES_WRITER_ON_CONFLICT_ACTION"),
BulkIngestEnabled: bulkIngestEnabled,
RetryPolicy: parseBackoffConfig("PGSTREAM_POSTGRES_WRITER"),
IgnoreDDL: viper.GetBool("PGSTREAM_POSTGRES_WRITER_IGNORE_DDL"),
IncludeDDLObjectTypes: viper.GetStringSlice("PGSTREAM_POSTGRES_WRITER_INCLUDE_DDL_OBJECT_TYPES"),
ExcludeDDLObjectTypes: viper.GetStringSlice("PGSTREAM_POSTGRES_WRITER_EXCLUDE_DDL_OBJECT_TYPES"),
},
}

Expand Down
34 changes: 21 additions & 13 deletions cmd/config/config_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ type PgDumpPgRestoreConfig struct {
NoPrivileges bool `mapstructure:"no_privileges" yaml:"no_privileges"`
DumpFile string `mapstructure:"dump_file" yaml:"dump_file"`
ExcludedSecurityLabels []string `mapstructure:"excluded_security_labels" yaml:"excluded_security_labels"`
IncludeObjectTypes []string `mapstructure:"include_object_types" yaml:"include_object_types"`
ExcludeObjectTypes []string `mapstructure:"exclude_object_types" yaml:"exclude_object_types"`
}

type ReplicationConfig struct {
Expand Down Expand Up @@ -163,13 +165,15 @@ type ConstantBackoffConfig struct {
}

type PostgresTargetConfig struct {
URL string `mapstructure:"url" yaml:"url"`
Batch *BatchConfig `mapstructure:"batch" yaml:"batch"`
BulkIngest *BulkIngestConfig `mapstructure:"bulk_ingest" yaml:"bulk_ingest"`
DisableTriggers bool `mapstructure:"disable_triggers" yaml:"disable_triggers"`
OnConflictAction string `mapstructure:"on_conflict_action" yaml:"on_conflict_action"`
RetryPolicy BackoffConfig `mapstructure:"retry_policy" yaml:"retry_policy"`
IgnoreDDL bool `mapstructure:"ignore_ddl" yaml:"ignore_ddl"`
URL string `mapstructure:"url" yaml:"url"`
Batch *BatchConfig `mapstructure:"batch" yaml:"batch"`
BulkIngest *BulkIngestConfig `mapstructure:"bulk_ingest" yaml:"bulk_ingest"`
DisableTriggers bool `mapstructure:"disable_triggers" yaml:"disable_triggers"`
OnConflictAction string `mapstructure:"on_conflict_action" yaml:"on_conflict_action"`
RetryPolicy BackoffConfig `mapstructure:"retry_policy" yaml:"retry_policy"`
IgnoreDDL bool `mapstructure:"ignore_ddl" yaml:"ignore_ddl"`
IncludeDDLObjectTypes []string `mapstructure:"include_ddl_object_types" yaml:"include_ddl_object_types"`
ExcludeDDLObjectTypes []string `mapstructure:"exclude_ddl_object_types" yaml:"exclude_ddl_object_types"`
}

type KafkaTargetConfig struct {
Expand Down Expand Up @@ -551,6 +555,8 @@ func (c *YAMLConfig) parseSchemaSnapshotConfig() (*snapshotbuilder.SchemaSnapsho
streamSchemaCfg.DumpRestore.NoPrivileges = schemaSnapshotCfg.PgDumpPgRestore.NoPrivileges
streamSchemaCfg.DumpRestore.DumpDebugFile = schemaSnapshotCfg.PgDumpPgRestore.DumpFile
streamSchemaCfg.DumpRestore.ExcludedSecurityLabels = schemaSnapshotCfg.PgDumpPgRestore.ExcludedSecurityLabels
streamSchemaCfg.DumpRestore.IncludeObjectTypes = schemaSnapshotCfg.PgDumpPgRestore.IncludeObjectTypes
streamSchemaCfg.DumpRestore.ExcludeObjectTypes = schemaSnapshotCfg.PgDumpPgRestore.ExcludeObjectTypes

var err error
streamSchemaCfg.DumpRestore.RolesSnapshotMode, err = getRolesSnapshotMode(schemaSnapshotCfg.PgDumpPgRestore.RolesSnapshotMode)
Expand Down Expand Up @@ -609,12 +615,14 @@ func (c *YAMLConfig) parsePostgresProcessorConfig() *stream.PostgresProcessorCon

cfg := &stream.PostgresProcessorConfig{
BatchWriter: postgres.Config{
URL: c.Target.Postgres.URL,
BatchConfig: c.Target.Postgres.Batch.parseBatchConfig(),
DisableTriggers: c.Target.Postgres.DisableTriggers,
OnConflictAction: c.Target.Postgres.OnConflictAction,
RetryPolicy: c.Target.Postgres.RetryPolicy.parseBackoffConfig(),
IgnoreDDL: c.Target.Postgres.IgnoreDDL,
URL: c.Target.Postgres.URL,
BatchConfig: c.Target.Postgres.Batch.parseBatchConfig(),
DisableTriggers: c.Target.Postgres.DisableTriggers,
OnConflictAction: c.Target.Postgres.OnConflictAction,
RetryPolicy: c.Target.Postgres.RetryPolicy.parseBackoffConfig(),
IgnoreDDL: c.Target.Postgres.IgnoreDDL,
IncludeDDLObjectTypes: c.Target.Postgres.IncludeDDLObjectTypes,
ExcludeDDLObjectTypes: c.Target.Postgres.ExcludeDDLObjectTypes,
},
}

Expand Down
9 changes: 9 additions & 0 deletions config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ source:
roles_snapshot_mode: # enabled by default. Can be set to disabled to disable roles snapshotting, or can be set to no_passwords to exclude role passwords
exclude_security_labels: ["anon"] # list of providers whose security labels will be excluded from the snapshot. Wildcard supported.
dump_file: pg_dump.sql # name of the file where the contents of the schema pg_dump command and output will be written for debugging purposes.
# Granular object type filtering for schema snapshots. Only one of include_object_types or exclude_object_types can be set.
# Available categories: tables, sequences, types, indexes, constraints, functions, views, materialized_views, triggers, event_triggers, policies, rules, comments, extensions, collations, text_search
# include_object_types: ["tables", "sequences", "types"] # only include these object types in the schema snapshot
# exclude_object_types: ["functions", "views", "triggers"] # exclude these object types from the schema snapshot
replication: # when mode is replication or snapshot_and_replication
replication_slot: "pgstream_mydatabase_slot"
plugin:
Expand Down Expand Up @@ -75,6 +79,11 @@ target:
schema_log_store_url: "postgresql://user:password@localhost:5432/mydatabase" # url to the postgres database where the schema log is stored to be used when performing schema change diffs
disable_triggers: false # whether to disable triggers on the target database. Defaults to false
on_conflict_action: "nothing" # options are update, nothing or error. Defaults to error
ignore_ddl: false # whether to skip all DDL replication. Defaults to false
# Selective DDL object type filtering for replication. Only one of include_ddl_object_types or exclude_ddl_object_types can be set. Ignored if ignore_ddl is true.
# Available categories: tables, sequences, types, indexes, constraints, functions, views, materialized_views, triggers, event_triggers, policies, rules, extensions, collations, text_search
# include_ddl_object_types: ["tables", "sequences", "types"] # only replicate DDL for these object types
# exclude_ddl_object_types: ["functions", "views", "triggers"] # skip DDL replication for these object types
bulk_ingest:
enabled: true # whether to enable bulk ingest on the target postgres, using COPY FROM (supported for insert only workloads)
kafka:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// SPDX-License-Identifier: Apache-2.0

package pgdumprestore

import (
"bufio"
"bytes"
"fmt"
"regexp"
"strings"
)

// objectTypeCategory maps user-facing category names to pg_dump TOC Type values.
var objectTypeCategories = map[string][]string{
"tables": {"TABLE", "DEFAULT"},
"sequences": {"SEQUENCE", "SEQUENCE OWNED BY"},
"types": {"TYPE", "DOMAIN"},
"indexes": {"INDEX"},
"constraints": {"CONSTRAINT", "FK CONSTRAINT"},
"functions": {"FUNCTION", "AGGREGATE", "PROCEDURE"},
"views": {"VIEW"},
"materialized_views": {"MATERIALIZED VIEW"},
"triggers": {"TRIGGER"},
"event_triggers": {"EVENT TRIGGER"},
"policies": {"POLICY", "ROW SECURITY"},
"rules": {"RULE"},
"comments": {"COMMENT"},
"extensions": {"EXTENSION"},
"collations": {"COLLATION"},
"text_search": {"TEXT SEARCH CONFIGURATION", "TEXT SEARCH DICTIONARY", "TEXT SEARCH PARSER", "TEXT SEARCH TEMPLATE"},
}

// tocHeaderRegex matches pg_dump TOC comment headers like:
// -- Name: my_func; Type: FUNCTION; Schema: public; Owner: postgres
var tocHeaderRegex = regexp.MustCompile(`^--\s*Name:.*;\s*Type:\s*([^;]+)\s*;`)

// parseTOCHeader extracts the object Type value from a pg_dump TOC comment header line.
// Returns the type string and true if the line is a TOC header, or ("", false) otherwise.
func parseTOCHeader(line string) (string, bool) {
matches := tocHeaderRegex.FindStringSubmatch(line)
if len(matches) < 2 {
return "", false
}
return strings.TrimSpace(matches[1]), true
}

// objectTypeFilter determines which pg_dump object types should be excluded
// based on user-specified include or exclude category lists.
type objectTypeFilter struct {
excludedTypes map[string]struct{}
// categories tracks which user-facing categories are excluded for
// higher-level checks (e.g., skipping sequence dump step).
excludedCategories map[string]struct{}
}

// newObjectTypeFilter creates an objectTypeFilter from include/exclude category lists.
// Only one of include or exclude can be set (not both).
// Returns nil if neither is set (no filtering).
func newObjectTypeFilter(include, exclude []string) (*objectTypeFilter, error) {
if len(include) > 0 && len(exclude) > 0 {
return nil, fmt.Errorf("include_object_types and exclude_object_types cannot both be set")
}

if len(include) == 0 && len(exclude) == 0 {
return nil, nil
}

f := &objectTypeFilter{
excludedTypes: make(map[string]struct{}),
excludedCategories: make(map[string]struct{}),
}

if len(include) > 0 {
// Validate all included categories
includedSet := make(map[string]struct{}, len(include))
for _, cat := range include {
if _, ok := objectTypeCategories[cat]; !ok {
return nil, fmt.Errorf("unknown object type category: %q", cat)
}
includedSet[cat] = struct{}{}
}
// Exclude everything NOT in the include list
for cat, types := range objectTypeCategories {
if _, included := includedSet[cat]; !included {
f.excludedCategories[cat] = struct{}{}
for _, t := range types {
f.excludedTypes[t] = struct{}{}
}
}
}
} else {
// Validate and exclude the specified categories
for _, cat := range exclude {
types, ok := objectTypeCategories[cat]
if !ok {
return nil, fmt.Errorf("unknown object type category: %q", cat)
}
f.excludedCategories[cat] = struct{}{}
for _, t := range types {
f.excludedTypes[t] = struct{}{}
}
}
}

return f, nil
}

// isExcluded returns true if the given pg_dump Type value should be excluded.
// SCHEMA type is never excluded (required for namespace resolution).
func (f *objectTypeFilter) isExcluded(pgdumpType string) bool {
if f == nil {
return false
}
// SCHEMA is always included
if pgdumpType == "SCHEMA" {
return false
}
_, excluded := f.excludedTypes[pgdumpType]
return excluded
}

// isCategoryExcluded returns true if the given user-facing category is excluded.
func (f *objectTypeFilter) isCategoryExcluded(category string) bool {
if f == nil {
return false
}
_, excluded := f.excludedCategories[category]
return excluded
}

// cleanupStatementPrefixes maps SQL cleanup statement prefixes (from pg_dump
// --clean --if-exists output) to the object type category they belong to.
var cleanupStatementPrefixes = map[string]string{
"DROP POLICY": "policies",
"DROP TRIGGER": "triggers",
"DROP RULE": "rules",
"DROP INDEX": "indexes",
"DROP FUNCTION": "functions",
"DROP AGGREGATE": "functions",
"DROP PROCEDURE": "functions",
"DROP VIEW": "views",
"DROP MATERIALIZED VIEW": "materialized_views",
"DROP TEXT SEARCH": "text_search",
"DROP COLLATION": "collations",
"DROP EXTENSION": "extensions",
"DROP EVENT TRIGGER": "event_triggers",
"DROP SEQUENCE": "sequences",
"DROP TABLE": "tables",
"DROP TYPE": "types",
"DROP DOMAIN": "types",
"DROP SCHEMA": "schemas",
"COMMENT ON": "comments",
}

// filterCleanupDump removes lines from a cleanup dump that belong to excluded
// object type categories. This prevents errors like "relation does not exist"
// when DROP POLICY/TRIGGER/RULE statements reference tables that don't yet
// exist on the target.
func (f *objectTypeFilter) filterCleanupDump(cleanupDump []byte) []byte {
if f == nil {
return cleanupDump
}

scanner := bufio.NewScanner(bytes.NewReader(cleanupDump))
var filtered strings.Builder
for scanner.Scan() {
line := scanner.Text()
if f.shouldSkipCleanupLine(line) {
continue
}
filtered.WriteString(line)
filtered.WriteString("\n")
}
return []byte(filtered.String())
}

// shouldSkipCleanupLine returns true if a cleanup dump line should be skipped
// because it belongs to an excluded object type category.
func (f *objectTypeFilter) shouldSkipCleanupLine(line string) bool {
if f == nil {
return false
}
for prefix, cat := range cleanupStatementPrefixes {
if strings.HasPrefix(line, prefix) {
// SCHEMA is never excluded
if cat == "schemas" {
return false
}
return f.isCategoryExcluded(cat)
}
}
return false
}
Loading
Loading