diff --git a/docs/services/nats.md b/docs/services/nats.md new file mode 100644 index 00000000..a69ab782 --- /dev/null +++ b/docs/services/nats.md @@ -0,0 +1,49 @@ +# Nats + +## Parameters + +This notification service is capable of sending simple messages via Nats. + +* Url - Nats server URL, e.g. `nats://nats:4222` +* Headers - optional, additional headers to be sent with the message +* User - optional, Nats user for authentication used in combination with password +* Password - optional, Nats password for authentication used in combination with user +* Nkey - optional, Nats key for authentication + +## Example + +Resource Annotation: +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx-deployment + annotations: + notifications.argoproj.io/subscribe.on-deployment-ready.nats: "mytopic" +``` + +* ConfigMap +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: argocd-notifications-cm +data: + service.nats: | + url: "nats://nats:4222" + headers: + my-header: "my-value" + +template.deployment-ready: | + message: | + Deployment {{.obj.metadata.name}} is ready! + + trigger.on-deployment-ready: | + - when: any(obj.status.conditions, {.type == 'Available' && .status == 'True'}) + send: [deployment-ready] + - oncePer: obj.metadata.annotations["generation"] + +``` + + + diff --git a/go.mod b/go.mod index d746a196..f12d4574 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,9 @@ require ( github.com/google/uuid v1.6.0 github.com/gregdel/pushover v1.3.1 github.com/hashicorp/go-retryablehttp v0.7.7 + github.com/nats-io/nats-server/v2 v2.11.4 + github.com/nats-io/nats.go v1.43.0 + github.com/nats-io/nkeys v0.4.11 github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.23 github.com/prometheus/client_golang v1.21.0 github.com/sirupsen/logrus v1.9.3 @@ -27,7 +30,7 @@ require ( github.com/spf13/cast v1.7.1 github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 - golang.org/x/time v0.10.0 + golang.org/x/time v0.11.0 gomodules.xyz/notify v0.1.1 google.golang.org/api v0.223.0 gopkg.in/yaml.v3 v3.0.1 @@ -71,6 +74,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-querystring v1.1.0 // indirect + github.com/google/go-tpm v0.9.5 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect @@ -81,12 +85,16 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/minio/highwayhash v1.0.3 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/jwt/v2 v2.7.4 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -101,12 +109,12 @@ require ( go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.38.0 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/oauth2 v0.26.0 // indirect - golang.org/x/sys v0.30.0 // indirect - golang.org/x/term v0.29.0 // indirect - golang.org/x/text v0.22.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/term v0.32.0 // indirect + golang.org/x/text v0.25.0 // indirect gomodules.xyz/envconfig v1.3.1-0.20190308184047-426f31af0d45 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250219182151-9fdb1cabc7b2 // indirect google.golang.org/grpc v1.70.0 // indirect diff --git a/go.sum b/go.sum index 73b5c01b..6dfc35fe 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/PagerDuty/go-pagerduty v1.8.0 h1:MTFqTffIcAervB83U7Bx6HERzLbyaSPL/+ox github.com/PagerDuty/go-pagerduty v1.8.0/go.mod h1:nzIeAqyFSJAFkjWKvMzug0JtwDg+V+UoCWjFrfFH5mI= github.com/RocketChat/Rocket.Chat.Go.SDK v0.0.0-20240116134246-a8cbe886bab0 h1:ztLQGVQsey3BjCoh0TvHc/iKTQmkio2OmsIxhuu+EeY= github.com/RocketChat/Rocket.Chat.Go.SDK v0.0.0-20240116134246-a8cbe886bab0/go.mod h1:rjP7sIipbZcagro/6TCk6X0ZeFT2eyudH5+fve/cbBA= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/appscode/go v0.0.0-20191119085241-0887d8ec2ecc/go.mod h1:OawnOmAL4ZX3YaPdN+8HTNwBveT1jMsqP74moa9XUbE= github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= @@ -136,6 +138,8 @@ github.com/google/go-github/v69 v69.2.0/go.mod h1:xne4jymxLR6Uj9b7J7PyTpkMYstEMM github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= +github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU= +github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -184,8 +188,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -207,6 +211,8 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -220,6 +226,16 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= +github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats-server/v2 v2.11.4 h1:oQhvy6He6ER926sGqIKBKuYHH4BGnUQCNb0Y5Qa+M54= +github.com/nats-io/nats-server/v2 v2.11.4/go.mod h1:jFnKKwbNeq6IfLHq+OMnl7vrFRihQ/MkhRbiWfjLdjU= +github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug= +github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nlopes/slack v0.5.0/go.mod h1:jVI4BBK3lSktibKahxBF74txcK2vyvkza1z/+rRnVAM= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -315,8 +331,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190422183909-d864b10871cd/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -339,8 +355,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -356,18 +372,19 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU= -golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s= +golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= +golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= -golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= -golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/pkg/services/nats.go b/pkg/services/nats.go new file mode 100644 index 00000000..78934661 --- /dev/null +++ b/pkg/services/nats.go @@ -0,0 +1,84 @@ +package services + +import ( + "fmt" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" +) + +type NatsNotification struct{} + +type NatsOptions struct { + // Url is the NATS server URL to connect to + // e.g. nats://nats.nats.svc.cluster.local:4222 + Url string `json:"url"` + // Headers is an optional map of headers to include in the NATS message + Headers map[string]string `json:"headers,omitempty"` + // NKey is optional for nkey authentication + NKey string `json:"nkey,omitempty"` + // Username and Password are optional for basic auth + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` +} + +func NewNatsService(opts NatsOptions, defaultConnectionOpts ...nats.Option) NotificationService { + return natsService{opts: opts, defaultConnectionOpts: defaultConnectionOpts} +} + +type natsService struct { + opts NatsOptions + // defaultConnectionOpts are additional options to pass to nats.Connect + defaultConnectionOpts []nats.Option +} + +// Send implements NotificationService. +func (n natsService) Send(notification Notification, dest Destination) error { + var options []nats.Option + options = append(options, n.defaultConnectionOpts...) + + if n.opts.NKey != "" { + // create nkey key pair from seed + keyPair, err := nkeys.FromSeed([]byte(n.opts.NKey)) + if err != nil { + return fmt.Errorf("failed to create NKey from seed: %w", err) + } + + // get the public nkey + publicKey, err := keyPair.PublicKey() + if err != nil { + return fmt.Errorf("failed to get public NKey: %w", err) + } + + // add nkey authentication option + options = append(options, nats.Nkey(publicKey, func(nonce []byte) ([]byte, error) { + return keyPair.Sign(nonce) + })) + } + + if n.opts.Username != "" && n.opts.Password != "" { + options = append(options, nats.UserInfo(n.opts.Username, n.opts.Password)) + } + + conn, err := nats.Connect(n.opts.Url, options...) + if err != nil { + return fmt.Errorf("failed to connect to NATS server: %w", err) + } + defer conn.Close() + + msg := nats.NewMsg(dest.Recipient) + msg.Data = []byte(notification.Message) + + if len(n.opts.Headers) > 0 { + msg.Header = make(nats.Header) + for key, value := range n.opts.Headers { + msg.Header.Set(key, value) + } + } + + err = conn.PublishMsg(msg) + if err != nil { + return fmt.Errorf("failed to publish message to NATS: %w", err) + } + return nil +} diff --git a/pkg/services/nats_test.go b/pkg/services/nats_test.go new file mode 100644 index 00000000..ba0d806e --- /dev/null +++ b/pkg/services/nats_test.go @@ -0,0 +1,167 @@ +package services + +import ( + "testing" + "time" + + natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" + "github.com/stretchr/testify/assert" +) + +func TestNatsService_Send(t *testing.T) { + t.Run("basic successful send without authentication", func(t *testing.T) { + server, err := natsserver.NewServer(&natsserver.Options{ + ServerName: "test-nats-server", + DontListen: true, // no tcp socket + }) + if err != nil { + t.Error(err) + } + defer server.Shutdown() + + server.Start() + + if !server.ReadyForConnections(5 * time.Second) { + t.Error("NATS server not ready for connections") + } + + nc, err := nats.Connect(server.ClientURL(), nats.InProcessServer(server)) + if err != nil { + t.Error("failed to connect to NATS server:", err) + } + sub, err := nc.SubscribeSync("test-topic") + if err != nil { + t.Error("failed to subscribe to test-topic:", err) + } + + natsService := NewNatsService(NatsOptions{ + Url: server.ClientURL(), + Headers: map[string]string{"foo": "bar"}, + }, nats.InProcessServer(server)) + + err = natsService.Send(Notification{Message: "message"}, Destination{Service: "nats", Recipient: "test-topic"}) + if err != nil { + t.Error("Failed to send message:", err) + } + + receivedMsg, err := sub.NextMsg(time.Second) + if err != nil { + t.Error("Failed to receive sent message:", err) + } + + assert.NotNil(t, receivedMsg) + assert.Equal(t, "message", string(receivedMsg.Data), "Received message does not match sent message") + assert.Equal(t, "bar", receivedMsg.Header.Get("foo"), "Received message header does not match expected value") + }) + + t.Run("basic successful send with username and password", func(t *testing.T) { + server, err := natsserver.NewServer(&natsserver.Options{ + ServerName: "test-nats-server", + DontListen: true, // no tcp socket + Users: []*natsserver.User{ + { + Username: "testuser", + Password: "testpassword", + }, + }, + }) + if err != nil { + t.Error(err) + } + defer server.Shutdown() + + server.Start() + + if !server.ReadyForConnections(5 * time.Second) { + t.Error("NATS server not ready for connections") + } + + nc, err := nats.Connect(server.ClientURL(), nats.InProcessServer(server), nats.UserInfo("testuser", "testpassword")) + if err != nil { + t.Error("failed to connect to NATS server:", err) + } + sub, err := nc.SubscribeSync("test-topic") + if err != nil { + t.Error("failed to subscribe to test-topic:", err) + } + + natsService := NewNatsService(NatsOptions{ + Url: server.ClientURL(), + Headers: map[string]string{"foo": "bar"}, + Username: "testuser", + Password: "testpassword", + }, nats.InProcessServer(server)) + + err = natsService.Send(Notification{Message: "message"}, Destination{Service: "nats", Recipient: "test-topic"}) + if err != nil { + t.Error("Failed to send message:", err) + } + + receivedMsg, err := sub.NextMsg(time.Second) + if err != nil { + t.Error("Failed to receive sent message:", err) + } + + assert.NotNil(t, receivedMsg) + assert.Equal(t, "message", string(receivedMsg.Data), "Received message does not match sent message") + assert.Equal(t, "bar", receivedMsg.Header.Get("foo"), "Received message header does not match expected value") + }) + + t.Run("basic successful send with nkey authentication", func(t *testing.T) { + // Generate NKey pair + kp, err := nkeys.CreatePair(nkeys.PrefixByteUser) + assert.NoError(t, err) + defer kp.Wipe() + + publicKey, err := kp.PublicKey() + assert.NoError(t, err) + + seed, err := kp.Seed() + assert.NoError(t, err) + + // Start NATS server with NKey-based authorization + server, err := natsserver.NewServer(&natsserver.Options{ + ServerName: "test-nats-server-nkey", + DontListen: true, + Nkeys: []*natsserver.NkeyUser{{ + Nkey: publicKey, + }}, + }) + assert.NoError(t, err) + defer server.Shutdown() + + server.Start() + assert.True(t, server.ReadyForConnections(5*time.Second), "NATS server not ready") + + // Connect and subscribe using NKey auth + nc, err := nats.Connect(server.ClientURL(), + nats.InProcessServer(server), + nats.Nkey(publicKey, func(nonce []byte) ([]byte, error) { + return kp.Sign(nonce) + }), + ) + assert.NoError(t, err) + + sub, err := nc.SubscribeSync("test-topic") + assert.NoError(t, err) + + // Use service with seed + natsService := NewNatsService(NatsOptions{ + Url: server.ClientURL(), + Headers: map[string]string{"foo": "bar"}, + NKey: string(seed), + }, nats.InProcessServer(server)) + + err = natsService.Send(Notification{Message: "message"}, Destination{Service: "nats", Recipient: "test-topic"}) + assert.NoError(t, err) + + receivedMsg, err := sub.NextMsg(time.Second) + assert.NoError(t, err) + + assert.NotNil(t, receivedMsg) + assert.Equal(t, "message", string(receivedMsg.Data)) + assert.Equal(t, "bar", receivedMsg.Header.Get("foo")) + }) +} diff --git a/pkg/services/services.go b/pkg/services/services.go index ba6376aa..a1444b6c 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -223,6 +223,12 @@ func NewService(serviceType string, optsData []byte) (NotificationService, error return nil, err } return NewWebexService(opts), nil + case "nats": + var opts NatsOptions + if err := yaml.Unmarshal(optsData, &opts); err != nil { + return nil, err + } + return NewNatsService(opts), nil default: return nil, fmt.Errorf("service type '%s' is not supported", serviceType) }