Manifests
This page covers the repository structure, mx-validator source code, and Kubernetes manifests for the email relay.
Email relay series
- Email relay
- Architecture
- Manifests - You are here
- Flux integration
- Operations
Repository structure
The email relay uses two repositories: one for the custom mx-validator application and one for the Kubernetes deployment.
mx-validator repo
Contains the Go source code for the MX validation proxy.
your-org/mx-validator/
├── main.go # MX validator source
├── go.mod # Go module definition
├── Dockerfile # Multi-stage build
└── .gitlab-ci.yml # Kaniko build pipeline
email-relay repo
Contains the Kubernetes manifests for deployment.
your-org/email-relay/
├── .sops.yaml # SOPS encryption config
└── k8s/prod/
├── kustomization.yaml
├── 00-namespace.yaml
├── 10-secret-mailpit-auth.enc.yaml
├── 15-secret-smtp2graph-config.enc.yaml
├── 20-deployment.yaml # 3-container pod
├── 25-pvc-mailpit.yaml # Persistent storage for email logs
├── 30-service.yaml
├── 40-networkpolicy.yaml
├── 50-servicemonitor.yaml
└── 80-prometheusrule.yaml
MX validator code
main.go
The core SMTP proxy that validates MX records and optionally strips ICS calendar attachments before forwarding.
// mx-validator - SMTP proxy with MX record validation and ICS stripping
// Validates recipient domains have MX records before forwarding to upstream SMTP
// Optionally strips ICS calendar attachments for specific recipients (fixes Microsoft Graph ATTENDEE routing)
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"mime"
"mime/multipart"
"net"
"net/http"
"os"
"regexp"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
// Configuration from environment
listenAddr = getEnv("LISTEN_ADDR", ":25")
upstreamHost = getEnv("UPSTREAM_HOST", "127.0.0.1")
upstreamPort = getEnv("UPSTREAM_PORT", "1025")
metricsAddr = getEnv("METRICS_ADDR", ":9090")
hostname = getEnv("HOSTNAME", "mx-validator")
stripICSRecipients = getEnv("STRIP_ICS_RECIPIENTS", "") // Comma-separated list
// Prometheus metrics
acceptedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "mx_validator_accepted_total",
Help: "Total number of emails accepted (valid MX)",
})
rejectedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "mx_validator_rejected_total",
Help: "Total number of emails rejected (no MX)",
})
connectionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "mx_validator_connections_total",
Help: "Total number of SMTP connections",
})
activeConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "mx_validator_active_connections",
Help: "Number of active SMTP connections",
})
icsStrippedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "mx_validator_ics_stripped_total",
Help: "Total number of emails with ICS stripped",
})
// MX cache (1 hour TTL)
mxCache = make(map[string]mxCacheEntry)
mxCacheLock sync.RWMutex
// Email regex for extracting domain and full address
emailRegex = regexp.MustCompile(`[<]?([^<>@\s]+@([^<>@\s]+))[>]?`)
// Set of recipients to strip ICS from
stripICSSet = make(map[string]bool)
)
type mxCacheEntry struct {
hasMX bool
expiresAt time.Time
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
// checkMX performs DNS lookup for MX records, with A record fallback
func checkMX(domain string) bool {
mxCacheLock.RLock()
entry, found := mxCache[domain]
mxCacheLock.RUnlock()
if found && time.Now().Before(entry.expiresAt) {
return entry.hasMX
}
mxRecords, err := net.LookupMX(domain)
hasMX := err == nil && len(mxRecords) > 0
// RFC 5321: If no MX, fall back to A record
if !hasMX {
addrs, err := net.LookupHost(domain)
hasMX = err == nil && len(addrs) > 0
}
mxCacheLock.Lock()
mxCache[domain] = mxCacheEntry{
hasMX: hasMX,
expiresAt: time.Now().Add(1 * time.Hour),
}
mxCacheLock.Unlock()
return hasMX
}
// stripICSFromMessage removes ICS calendar parts from a MIME message (recursive)
func stripICSFromMessage(data []byte) ([]byte, bool, error) {
headerEnd := bytes.Index(data, []byte("\r\n\r\n"))
if headerEnd == -1 {
headerEnd = bytes.Index(data, []byte("\n\n"))
if headerEnd == -1 {
return data, false, nil
}
}
headers := string(data[:headerEnd])
body := data[headerEnd:]
// Parse Content-Type - PRESERVE BOUNDARY CASE (case-sensitive!)
contentType := ""
for _, line := range strings.Split(headers, "\n") {
if strings.HasPrefix(strings.ToLower(strings.TrimSpace(line)), "content-type:") {
contentType = line
continue
}
if contentType != "" && (strings.HasPrefix(line, " ") || strings.HasPrefix(line, "\t")) {
contentType += line
} else if contentType != "" {
break
}
}
if contentType == "" {
return data, false, nil
}
headerValue := contentType
colonIdx := strings.Index(contentType, ":")
if colonIdx != -1 {
headerValue = strings.TrimSpace(contentType[colonIdx+1:])
}
mediaType, params, err := mime.ParseMediaType(headerValue)
if err != nil {
return data, false, nil
}
mediaType = strings.ToLower(mediaType)
// Handle single-part ICS
if mediaType == "text/calendar" || mediaType == "application/ics" {
return []byte(headers + "\r\n\r\nCalendar attachment removed.\r\n"), true, nil
}
if !strings.HasPrefix(mediaType, "multipart/") {
return data, false, nil
}
boundary := params["boundary"]
if boundary == "" {
return data, false, nil
}
bodyReader := bytes.NewReader(body)
if bytes.HasPrefix(body, []byte("\r\n\r\n")) {
bodyReader = bytes.NewReader(body[4:])
} else if bytes.HasPrefix(body, []byte("\n\n")) {
bodyReader = bytes.NewReader(body[2:])
}
mr := multipart.NewReader(bodyReader, boundary)
var newParts [][]byte
stripped := false
for {
part, err := mr.NextPart()
if err == io.EOF {
break
}
if err != nil {
return data, false, nil
}
partContentType := part.Header.Get("Content-Type")
partMediaType, partParams, _ := mime.ParseMediaType(partContentType)
// Skip ICS parts
if partMediaType == "text/calendar" || partMediaType == "application/ics" {
stripped = true
log.Printf("Stripping ICS part: %s", partContentType)
part.Close()
continue
}
// Check for .ics attachments
contentDisp := part.Header.Get("Content-Disposition")
if strings.Contains(strings.ToLower(contentDisp), ".ics") {
stripped = true
log.Printf("Stripping ICS attachment: %s", contentDisp)
part.Close()
continue
}
partData, err := io.ReadAll(part)
if err != nil {
part.Close()
return data, false, nil
}
part.Close()
// RECURSIVE: Strip ICS from nested multipart
if strings.HasPrefix(partMediaType, "multipart/") {
nestedBoundary := partParams["boundary"]
if nestedBoundary != "" {
var nestedMsg bytes.Buffer
for key, values := range part.Header {
for _, value := range values {
nestedMsg.WriteString(key + ": " + value + "\r\n")
}
}
nestedMsg.WriteString("\r\n")
nestedMsg.Write(partData)
processedNested, nestedStripped, _ := stripICSFromMessage(nestedMsg.Bytes())
if nestedStripped {
stripped = true
log.Printf("Stripped ICS from nested %s", partMediaType)
nestedHeaderEnd := bytes.Index(processedNested, []byte("\r\n\r\n"))
if nestedHeaderEnd == -1 {
nestedHeaderEnd = bytes.Index(processedNested, []byte("\n\n"))
}
if nestedHeaderEnd != -1 {
partData = processedNested[nestedHeaderEnd:]
partData = bytes.TrimPrefix(partData, []byte("\r\n\r\n"))
partData = bytes.TrimPrefix(partData, []byte("\n\n"))
}
}
}
}
// Rebuild part
var partBuffer bytes.Buffer
partBuffer.WriteString("--" + boundary + "\r\n")
for key, values := range part.Header {
for _, value := range values {
partBuffer.WriteString(key + ": " + value + "\r\n")
}
}
partBuffer.WriteString("\r\n")
partBuffer.Write(partData)
partBuffer.WriteString("\r\n")
newParts = append(newParts, partBuffer.Bytes())
}
if !stripped {
return data, false, nil
}
var result bytes.Buffer
result.WriteString(headers)
result.WriteString("\r\n\r\n")
for _, part := range newParts {
result.Write(part)
}
result.WriteString("--" + boundary + "--\r\n")
return result.Bytes(), true, nil
}
func init() {
prometheus.MustRegister(acceptedTotal)
prometheus.MustRegister(rejectedTotal)
prometheus.MustRegister(connectionsTotal)
prometheus.MustRegister(activeConnections)
prometheus.MustRegister(icsStrippedTotal)
// Parse STRIP_ICS_RECIPIENTS into set
if stripICSRecipients != "" {
for _, email := range strings.Split(stripICSRecipients, ",") {
email = strings.TrimSpace(strings.ToLower(email))
if email != "" {
stripICSSet[email] = true
}
}
log.Printf("ICS stripping enabled for: %v", stripICSSet)
}
}
func main() {
go func() {
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
log.Printf("Metrics server listening on %s", metricsAddr)
log.Fatal(http.ListenAndServe(metricsAddr, nil))
}()
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Fatalf("Failed to listen on %s: %v", listenAddr, err)
}
defer listener.Close()
log.Printf("MX Validator listening on %s, upstream %s:%s",
listenAddr, upstreamHost, upstreamPort)
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Accept error: %v", err)
continue
}
go handleConnection(conn)
}
}
// handleConnection manages SMTP proxy with MX validation and optional ICS stripping
func handleConnection(clientConn net.Conn) {
defer clientConn.Close()
connectionsTotal.Inc()
activeConnections.Inc()
defer activeConnections.Dec()
upstreamConn, err := net.Dial("tcp", upstreamHost+":"+upstreamPort)
if err != nil {
log.Printf("Upstream connect failed: %v", err)
return
}
defer upstreamConn.Close()
clientReader := bufio.NewReader(clientConn)
upstreamReader := bufio.NewReader(upstreamConn)
greeting, _ := upstreamReader.ReadString('\n')
clientConn.Write([]byte(greeting))
// Track recipients for ICS stripping
var stripICS bool
var currentRecipients []string
for {
line, err := clientReader.ReadString('\n')
if err != nil {
break
}
upperLine := strings.ToUpper(strings.TrimSpace(line))
// Track RCPT TO for ICS stripping decision
if strings.HasPrefix(upperLine, "RCPT TO:") {
matches := emailRegex.FindStringSubmatch(line)
if len(matches) >= 3 {
domain := strings.ToLower(matches[2])
email := strings.ToLower(matches[1])
if !checkMX(domain) {
rejectedTotal.Inc()
log.Printf("Rejected: no MX for %s", domain)
clientConn.Write([]byte(
"550 5.1.2 Bad destination - no MX record\r\n"))
continue
}
acceptedTotal.Inc()
currentRecipients = append(currentRecipients, email)
if stripICSSet[email] {
stripICS = true
log.Printf("ICS stripping will be applied for %s", email)
}
}
}
upstreamConn.Write([]byte(line))
response, _ := upstreamReader.ReadString('\n')
clientConn.Write([]byte(response))
// Handle DATA with optional ICS stripping
if strings.HasPrefix(upperLine, "DATA") &&
strings.HasPrefix(response, "354") {
if stripICS {
// Buffer entire message for ICS stripping
var msgBuffer bytes.Buffer
for {
dataLine, err := clientReader.ReadString('\n')
if err != nil {
break
}
if strings.TrimSpace(dataLine) == "." {
break
}
msgBuffer.WriteString(dataLine)
}
modified, wasStripped, _ := stripICSFromMessage(msgBuffer.Bytes())
if wasStripped {
icsStrippedTotal.Inc()
log.Printf("Stripped ICS from message")
}
upstreamConn.Write(modified)
upstreamConn.Write([]byte("\r\n.\r\n"))
} else {
// Stream through unchanged
for {
dataLine, err := clientReader.ReadString('\n')
if err != nil {
break
}
upstreamConn.Write([]byte(dataLine))
if strings.TrimSpace(dataLine) == "." {
break
}
}
}
finalResponse, _ := upstreamReader.ReadString('\n')
clientConn.Write([]byte(finalResponse))
// Reset for next message
stripICS = false
currentRecipients = nil
}
if strings.HasPrefix(upperLine, "QUIT") {
break
}
}
}
Dockerfile
Multi-stage build with NET_BIND_SERVICE capability for port 25.
# Build stage
FROM golang:1.21-alpine AS builder
WORKDIR /app
RUN apk add --no-cache git
COPY go.mod go.sum* ./
COPY *.go ./
RUN go mod tidy
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o mx-validator .
# Runtime stage
FROM alpine:3.19
# Install libcap for setcap (allows binding to port 25 without root)
RUN apk add --no-cache ca-certificates libcap && \
adduser -D -u 1000 appuser
WORKDIR /app
COPY --from=builder /app/mx-validator .
# Grant NET_BIND_SERVICE capability for port 25
RUN setcap 'cap_net_bind_service=+ep' ./mx-validator
USER appuser
EXPOSE 25 9090
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:9090/healthz || exit 1
ENTRYPOINT ["./mx-validator"]
go.mod
module mx-validator
go 1.21
require github.com/prometheus/client_golang v1.18.0
Kubernetes manifests
Deployment
Three containers in a single pod communicating via localhost.
# k8s/prod/20-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: email-relay
namespace: email-relay
spec:
replicas: 1
selector:
matchLabels:
app: email-relay
template:
metadata:
labels:
app: email-relay
spec:
imagePullSecrets:
- name: mx-validator-registry
containers:
# MX Validator - validates recipient domains, strips ICS for specific recipients
- name: mx-validator
image: registry.example.local/your-org/mx-validator:prod-YYYYMMDD.N # {"$imagepolicy": "flux-system:mx-validator-prod-policy"}
securityContext:
capabilities:
add:
- NET_BIND_SERVICE
ports:
- containerPort: 25
name: smtp-ingress
- containerPort: 9090
name: mx-metrics
env:
- name: UPSTREAM_HOST
value: "127.0.0.1"
- name: UPSTREAM_PORT
value: "1025"
# Strip ICS from emails to these recipients (fixes Microsoft Graph ATTENDEE routing)
- name: STRIP_ICS_RECIPIENTS
value: "organizer@yourdomain.com" # Comma-separated list of organizer emails
resources:
requests:
cpu: "25m"
memory: "32Mi"
limits:
cpu: "100m"
memory: "64Mi"
livenessProbe:
httpGet:
path: /healthz
port: 9090
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /healthz
port: 9090
initialDelaySeconds: 5
periodSeconds: 5
# Mailpit - logs emails, relays to smtp2graph
- name: mailpit
image: axllent/mailpit:v1.28.3
ports:
- containerPort: 1025
name: smtp
- containerPort: 8025
name: http
- containerPort: 9091
name: metrics
env:
- name: MP_ENABLE_PROMETHEUS
value: "0.0.0.0:9091"
- name: MP_SMTP_RELAY_HOST
value: "localhost"
- name: MP_SMTP_RELAY_PORT
value: "2525"
- name: MP_SMTP_RELAY_ALL
value: "true"
- name: MP_SMTP_AUTH_ACCEPT_ANY
value: "true"
- name: MP_MAX_MESSAGES
value: "500"
- name: MP_DATABASE
value: "/data/mailpit.db"
volumeMounts:
- name: mailpit-data
mountPath: /data
resources:
requests:
cpu: "50m"
memory: "128Mi"
limits:
cpu: "200m"
memory: "256Mi"
livenessProbe:
httpGet:
path: /livez
port: 8025
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /readyz
port: 8025
initialDelaySeconds: 5
periodSeconds: 10
# smtp2graph - sends via Microsoft 365 Graph API
- name: smtp2graph
image: smtp2graph/smtp2graph:v1.1.4
workingDir: /data
ports:
- containerPort: 2525
name: smtp-internal
volumeMounts:
- name: smtp2graph-config
mountPath: /data/config.yml
subPath: config.yml
readOnly: true
- name: workdir
mountPath: /data/logs
- name: workdir
mountPath: /data/mailroot
resources:
requests:
cpu: "25m"
memory: "64Mi"
limits:
cpu: "100m"
memory: "128Mi"
livenessProbe:
tcpSocket:
port: 2525
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
tcpSocket:
port: 2525
initialDelaySeconds: 5
periodSeconds: 10
volumes:
- name: mailpit-data
persistentVolumeClaim:
claimName: mailpit-data
- name: smtp2graph-config
secret:
secretName: smtp2graph-config
- name: smtp2graph-data
emptyDir: {}
PersistentVolumeClaim
Persistent storage for Mailpit email logs. Survives pod restarts.
# k8s/prod/25-pvc-mailpit.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mailpit-data
namespace: email-relay
spec:
accessModes:
- ReadWriteOnce
storageClassName: nfs-client
resources:
requests:
storage: 1Gi
Service
Exposes all ports for internal access and Prometheus scraping.
# k8s/prod/30-service.yaml
apiVersion: v1
kind: Service
metadata:
name: email-relay
namespace: email-relay
labels:
app: email-relay
spec:
selector:
app: email-relay
ports:
- name: smtp
port: 25
targetPort: 25
- name: http
port: 8025
targetPort: 8025
- name: mailpit-metrics
port: 9091
targetPort: 9091
- name: mx-metrics
port: 9090
targetPort: 9090
NetworkPolicy
Restricts access to only necessary namespaces.
# k8s/prod/40-networkpolicy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: email-relay-ingress
namespace: email-relay
spec:
podSelector:
matchLabels:
app: email-relay
policyTypes:
- Ingress
ingress:
# SMTP from any namespace (for app access)
- ports:
- port: 25
protocol: TCP
# Metrics from monitoring namespace
- from:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: monitoring
ports:
- port: 8025
- port: 9091
- port: 9090
# Web UI from ingress-nginx (optional)
- from:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: ingress-nginx
ports:
- port: 8025
ServiceMonitor
Prometheus scrape configuration for both mx-validator and Mailpit metrics.
# k8s/prod/50-servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: email-relay
namespace: email-relay
labels:
release: kube-prometheus-stack
spec:
selector:
matchLabels:
app: email-relay
endpoints:
- port: mx-metrics
interval: 30s
path: /metrics
- port: mailpit-metrics
interval: 30s
path: /metrics
PrometheusRule
Alert on high rejection rates or relay failures.
# k8s/prod/80-prometheusrule.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: email-relay-alerts
namespace: email-relay
labels:
release: kube-prometheus-stack
spec:
groups:
- name: email-relay
rules:
- alert: MXValidatorHighRejectionRate
expr: rate(mx_validator_rejected_total[5m]) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "High email rejection rate"
description: "MX validator rejecting >1 email/min - possible spam attempt"
- alert: EmailRelayDown
expr: up{job="email-relay"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Email relay is down"
description: "Email relay pod is not responding to health checks"
- alert: MailpitHighMessageCount
expr: mailpit_messages > 1000
for: 10m
labels:
severity: warning
annotations:
summary: "Mailpit message count high"
description: "Over 1000 messages stored in Mailpit - consider cleanup"
Secrets
smtp2graph config (SOPS encrypted)
# k8s/prod/15-secret-smtp2graph-config.enc.yaml (before encryption)
apiVersion: v1
kind: Secret
metadata:
name: smtp2graph-config
namespace: email-relay
type: Opaque
stringData:
config.yml: |
mode: full
send:
appReg:
tenant: <your-azure-tenant-id>
id: <your-azure-client-id>
secret: <your-azure-client-secret>
retryLimit: 3
retryInterval: 5
receive:
port: 2525
banner: "Email Relay"
maxSize: 25m
ipWhitelist:
- 127.0.0.1
allowedFrom:
- you@yourdomain.com
Kustomization
# k8s/prod/kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- 00-namespace.yaml
- 10-secret-mailpit-auth.enc.yaml
- 15-secret-smtp2graph-config.enc.yaml
- 20-deployment.yaml
- 25-pvc-mailpit.yaml
- 30-service.yaml
- 40-networkpolicy.yaml
- 50-servicemonitor.yaml
- 80-prometheusrule.yaml