add messaging support (kafka/nats) with platform.toml

This commit is contained in:
stackops
2026-04-09 17:19:55 +03:00
parent 9314e1f7ba
commit 185019c99f
6 changed files with 168 additions and 20 deletions

View File

@@ -35,6 +35,15 @@ func main() {
}
d := deployer.New(s)
if platformPath := getEnv("PLATFORM_CONFIG", ""); platformPath != "" {
data, err := os.ReadFile(platformPath)
if err != nil {
log.Printf("warning: could not read platform config %s: %v", platformPath, err)
} else {
d.SetPlatformConfig(data)
log.Printf("loaded platform config from %s", platformPath)
}
}
h := handler.New(s, d)
mux := http.NewServeMux()

View File

@@ -14,13 +14,19 @@ import (
)
type Deployer struct {
store *store.SQLiteStore
store *store.SQLiteStore
platformConfig []byte // raw platform.toml content, optional
}
func New(s *store.SQLiteStore) *Deployer {
return &Deployer{store: s}
}
// SetPlatformConfig sets the platform.toml content for messaging resolution.
func (d *Deployer) SetPlatformConfig(data []byte) {
d.platformConfig = data
}
func (d *Deployer) Run(ctx context.Context, deployID, projectID, env string, stackfileContent []byte) {
log := func(line string) { _ = d.store.AppendLog(deployID, line) }
fail := func(msg string) {
@@ -36,7 +42,11 @@ func (d *Deployer) Run(ctx context.Context, deployID, projectID, env string, sta
})
log("→ converting stackfile.toml")
result, err := convert.FromBytes(stackfileContent, env)
var opts []convert.Options
if len(d.platformConfig) > 0 {
opts = append(opts, convert.Options{PlatformConfig: d.platformConfig})
}
result, err := convert.FromBytes(stackfileContent, env, opts...)
if err != nil {
fail(fmt.Sprintf("convert: %v", err))
return

View File

@@ -14,6 +14,7 @@ import (
func main() {
input := flag.String("f", "stackfile.toml", "path to stackfile.toml")
env := flag.String("env", "", "deployment environment override (e.g. staging, prod)")
platform := flag.String("platform", "", "path to platform.toml (shared infra config)")
output := flag.String("o", "-", "output file path (- for stdout)")
flag.Parse()
@@ -26,7 +27,15 @@ func main() {
sf = merger.Apply(sf, *env)
}
out, err := generator.Generate(sf)
var pc *schema.PlatformConfig
if *platform != "" {
pc, err = schema.ParsePlatform(*platform)
if err != nil {
log.Fatalf("parse platform: %v", err)
}
}
out, err := generator.Generate(sf, pc)
if err != nil {
log.Fatalf("generate: %v", err)
}

View File

@@ -12,6 +12,18 @@ import (
"github.com/stackops/toml-converter/internal/schema"
)
// Result holds the generated YAML and metadata extracted from the stackfile.
type Result struct {
YAML []byte
AppName string
}
// Options configures the conversion.
type Options struct {
Env string // environment override (e.g. "staging", "prod")
PlatformConfig []byte // optional platform.toml content for messaging resolution
}
// FromFile parses a stackfile at the given path, applies an optional env
// override, and returns the KubeVela Application YAML bytes.
func FromFile(path, env string) ([]byte, error) {
@@ -25,14 +37,8 @@ func FromFile(path, env string) ([]byte, error) {
return generator.Generate(sf)
}
// Result holds the generated YAML and metadata extracted from the stackfile.
type Result struct {
YAML []byte
AppName string
}
// FromBytes is like FromFile but accepts the raw TOML content.
func FromBytes(content []byte, env string) (*Result, error) {
// FromBytes converts raw TOML content with optional options.
func FromBytes(content []byte, env string, opts ...Options) (*Result, error) {
tmp, err := os.CreateTemp("", "stackfile-*.toml")
if err != nil {
return nil, fmt.Errorf("create temp: %w", err)
@@ -50,7 +56,16 @@ func FromBytes(content []byte, env string) (*Result, error) {
if env != "" {
sf = merger.Apply(sf, env)
}
yaml, err := generator.Generate(sf)
var pc *schema.PlatformConfig
if len(opts) > 0 && len(opts[0].PlatformConfig) > 0 {
pc, err = schema.ParsePlatformBytes(opts[0].PlatformConfig)
if err != nil {
return nil, fmt.Errorf("parse platform config: %w", err)
}
}
yaml, err := generator.Generate(sf, pc)
if err != nil {
return nil, fmt.Errorf("generate: %w", err)
}

View File

@@ -123,8 +123,13 @@ type workflowStep struct {
}
// Generate converts a fully-merged Stackfile into a KubeVela Application YAML.
func Generate(sf *schema.Stackfile) ([]byte, error) {
components, workflow := buildComponents(sf)
// platformCfg is optional — pass nil if no platform.toml is available.
func Generate(sf *schema.Stackfile, platformCfg ...*schema.PlatformConfig) ([]byte, error) {
var pc *schema.PlatformConfig
if len(platformCfg) > 0 {
pc = platformCfg[0]
}
components, workflow := buildComponents(sf, pc)
app := application{
APIVersion: "core.oam.dev/v1beta1",
@@ -141,7 +146,7 @@ func Generate(sf *schema.Stackfile) ([]byte, error) {
// buildComponents returns the full component list and an optional workflow.
// Infra components (postgres, redis) are prepended so the workflow deploys
// them before the webservice.
func buildComponents(sf *schema.Stackfile) ([]component, *workflowSpec) {
func buildComponents(sf *schema.Stackfile, pc *schema.PlatformConfig) ([]component, *workflowSpec) {
var components []component
var steps []workflowStep
@@ -188,7 +193,7 @@ func buildComponents(sf *schema.Stackfile) ([]component, *workflowSpec) {
Image: sf.App.Image,
Cmd: splitCommand(sf.Migrations.Command),
Restart: "Never",
Env: buildEnvVars(sf),
Env: buildEnvVars(sf, pc),
},
})
steps = append(steps, workflowStep{
@@ -201,7 +206,7 @@ func buildComponents(sf *schema.Stackfile) ([]component, *workflowSpec) {
components = append(components, component{
Name: sf.App.Name,
Type: "webservice",
Properties: buildWebserviceProps(sf),
Properties: buildWebserviceProps(sf, pc),
Traits: buildTraits(sf),
})
steps = append(steps, workflowStep{
@@ -219,13 +224,13 @@ func buildComponents(sf *schema.Stackfile) ([]component, *workflowSpec) {
return components, wf
}
func buildWebserviceProps(sf *schema.Stackfile) compProps {
func buildWebserviceProps(sf *schema.Stackfile, pc *schema.PlatformConfig) compProps {
props := compProps{
Image: sf.App.Image,
Ports: []portSpec{{Port: sf.App.Port, Expose: true}},
CPU: sf.Resources.CPULimit,
Memory: sf.Resources.MemoryLimit,
Env: buildEnvVars(sf),
Env: buildEnvVars(sf, pc),
}
if sf.Health.Liveness != "" {
props.LivenessProbe = &probe{
@@ -240,7 +245,7 @@ func buildWebserviceProps(sf *schema.Stackfile) compProps {
return props
}
func buildEnvVars(sf *schema.Stackfile) []envVar {
func buildEnvVars(sf *schema.Stackfile, pc *schema.PlatformConfig) []envVar {
var envs []envVar
// Plain vars — sorted for stable output.
@@ -323,6 +328,43 @@ func buildEnvVars(sf *schema.Stackfile) []envVar {
})
}
// Messaging — resolve Kafka/NATS instances from platform config.
if pc != nil {
// Kafka instances
kafkaNames := make([]string, 0, len(sf.Messaging.Kafka))
for name := range sf.Messaging.Kafka {
kafkaNames = append(kafkaNames, name)
}
sort.Strings(kafkaNames)
for _, name := range kafkaNames {
ref := sf.Messaging.Kafka[name]
prefix := fmt.Sprintf("KAFKA_%s", strings.ToUpper(strings.ReplaceAll(name, "-", "_")))
if platKafka, ok := pc.Kafka[name]; ok {
envs = append(envs, envVar{Name: prefix + "_BROKERS", Value: platKafka.Brokers})
}
if len(ref.Topics) > 0 {
envs = append(envs, envVar{Name: prefix + "_TOPICS", Value: strings.Join(ref.Topics, ",")})
}
}
// NATS instances
natsNames := make([]string, 0, len(sf.Messaging.Nats))
for name := range sf.Messaging.Nats {
natsNames = append(natsNames, name)
}
sort.Strings(natsNames)
for _, name := range natsNames {
ref := sf.Messaging.Nats[name]
prefix := fmt.Sprintf("NATS_%s", strings.ToUpper(strings.ReplaceAll(name, "-", "_")))
if platNats, ok := pc.Nats[name]; ok {
envs = append(envs, envVar{Name: prefix + "_URL", Value: platNats.URL})
}
if len(ref.Subjects) > 0 {
envs = append(envs, envVar{Name: prefix + "_SUBJECTS", Value: strings.Join(ref.Subjects, ",")})
}
}
}
return envs
}

View File

@@ -9,6 +9,7 @@ type Stackfile struct {
Ingress IngressConfig `toml:"ingress"`
Health HealthConfig `toml:"health"`
Migrations MigrationsConfig `toml:"migrations"`
Messaging MessagingConfig `toml:"messaging"`
Vars map[string]string `toml:"vars"`
Secrets map[string]SecretRef `toml:"secrets"`
Dependencies Dependencies `toml:"dependencies"`
@@ -16,6 +17,68 @@ type Stackfile struct {
Env map[string]EnvOverride `toml:"env"`
}
// MessagingConfig declares which message brokers the service needs.
// Keys are named instances defined in platform.toml.
//
// Example:
//
// [messaging.kafka.main]
// topics = ["orders.created", "payments.completed"]
//
// [messaging.nats.default]
// subjects = ["notifications.>"]
type MessagingConfig struct {
Kafka map[string]KafkaRef `toml:"kafka"`
Nats map[string]NatsRef `toml:"nats"`
}
type KafkaRef struct {
Topics []string `toml:"topics"`
}
type NatsRef struct {
Subjects []string `toml:"subjects"`
}
// PlatformConfig is a cluster-level config (platform.toml) that describes
// shared infrastructure. Stored as a ConfigMap, read by the API at startup.
//
// Example:
//
// [kafka.main]
// brokers = "kafka.shared.svc:9092"
//
// [nats.default]
// url = "nats://nats.shared.svc:4222"
type PlatformConfig struct {
Kafka map[string]KafkaPlatform `toml:"kafka"`
Nats map[string]NatsPlatform `toml:"nats"`
}
type KafkaPlatform struct {
Brokers string `toml:"brokers"`
}
type NatsPlatform struct {
URL string `toml:"url"`
}
func ParsePlatform(path string) (*PlatformConfig, error) {
var pc PlatformConfig
if _, err := toml.DecodeFile(path, &pc); err != nil {
return nil, err
}
return &pc, nil
}
func ParsePlatformBytes(data []byte) (*PlatformConfig, error) {
var pc PlatformConfig
if _, err := toml.Decode(string(data), &pc); err != nil {
return nil, err
}
return &pc, nil
}
// MigrationsConfig describes a one-shot Job that runs DB migrations
// before the application is deployed. Uses the same image as [app].
type MigrationsConfig struct {