Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/livepeer/starter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {

// flags
cfg.TestOrchAvail = fs.Bool("startupAvailabilityCheck", *cfg.TestOrchAvail, "Set to false to disable the startup Orchestrator availability check on the configured serviceAddr")
cfg.RemoteSigner = fs.Bool("remoteSigner", *cfg.RemoteSigner, "Set to true to run remote signer service")
cfg.RemoteSignerAddr = fs.String("remoteSignerAddr", *cfg.RemoteSignerAddr, "URL of remote signer service to use (e.g., http://localhost:8935). Gateway only.")

// Gateway metrics
cfg.KafkaBootstrapServers = fs.String("kafkaBootstrapServers", *cfg.KafkaBootstrapServers, "URL of Kafka Bootstrap Servers")
Expand Down
54 changes: 53 additions & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type LivepeerConfig struct {
OrchBlacklist *string
OrchMinLivepeerVersion *string
TestOrchAvail *bool
RemoteSigner *bool
RemoteSignerAddr *string
AIRunnerImage *string
AIRunnerImageOverrides *string
AIVerboseLogs *bool
Expand Down Expand Up @@ -300,6 +302,8 @@ func DefaultLivepeerConfig() LivepeerConfig {

// Flags
defaultTestOrchAvail := true
defaultRemoteSigner := false
defaultRemoteSignerAddr := ""

// Gateway logs
defaultKafkaBootstrapServers := ""
Expand Down Expand Up @@ -419,7 +423,9 @@ func DefaultLivepeerConfig() LivepeerConfig {
OrchMinLivepeerVersion: &defaultMinLivepeerVersion,

// Flags
TestOrchAvail: &defaultTestOrchAvail,
TestOrchAvail: &defaultTestOrchAvail,
RemoteSigner: &defaultRemoteSigner,
RemoteSignerAddr: &defaultRemoteSignerAddr,

// Gateway logs
KafkaBootstrapServers: &defaultKafkaBootstrapServers,
Expand Down Expand Up @@ -676,8 +682,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}
}

// Validate remote signer mode
if *cfg.RemoteSigner {
if *cfg.Network == "offchain" {
exit("Remote signer mode requires on-chain network")
}
}

if *cfg.Redeemer {
n.NodeType = core.RedeemerNode
} else if *cfg.RemoteSigner {
n.NodeType = core.RemoteSignerNode
} else if *cfg.Orchestrator {
n.NodeType = core.OrchestratorNode
if !*cfg.Transcoder {
Expand Down Expand Up @@ -1564,6 +1579,32 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}

bcast := core.NewBroadcaster(n)

// Populate infoSig with remote signer if configured
if *cfg.RemoteSignerAddr != "" {
url, err := url.Parse(*cfg.RemoteSignerAddr)
if err != nil {
glog.Exit("Invalid remote signer addr: ", err)
}

glog.Info("Retrieving OrchestratorInfo fields from remote signer: ", url)
fields, err := server.GetOrchInfoSig(url)
if err != nil {
glog.Exit("Unable to query remote signer: ", err)
}
n.RemoteSignerAddr = url
n.RemoteEthAddr = ethcommon.BytesToAddress(fields.Address)
n.InfoSig = fields.Signature
glog.Info("Using Ethereum address from remote signer: ", n.RemoteEthAddr)
} else {
// Use local signing
infoSig, err := bcast.Sign([]byte(fmt.Sprintf("%v", bcast.Address().Hex())))
if err != nil {
glog.Exit("Unable to generate info sig: ", err)
}
n.InfoSig = infoSig
}

orchBlacklist := parseOrchBlacklist(cfg.OrchBlacklist)
if *cfg.OrchPerfStatsURL != "" && *cfg.Region != "" {
glog.Infof("Using Performance Stats, region=%s, URL=%s, minPerfScore=%v", *cfg.Region, *cfg.OrchPerfStatsURL, *cfg.MinPerfScore)
Expand Down Expand Up @@ -1790,6 +1831,17 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}()
}

// Start remote signer server if in remote signer mode
if n.NodeType == core.RemoteSignerNode {
go func() {
glog.Info("Starting remote signer server on ", *cfg.HttpAddr)
err := server.StartRemoteSignerServer(s, *cfg.HttpAddr)
if err != nil {
exit("Error starting remote signer server: err=%q", err)
}
}()
}

go func() {
if core.OrchestratorNode != n.NodeType {
return
Expand Down
1 change: 1 addition & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type NodeStatus struct {
type Broadcaster interface {
Address() ethcommon.Address
Sign([]byte) ([]byte, error)
OrchInfoSig() []byte
ExtraNodes() int
}

Expand Down
14 changes: 13 additions & 1 deletion core/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@ func (bcast *broadcaster) Sign(msg []byte) ([]byte, error) {
return bcast.node.Eth.Sign(crypto.Keccak256(msg))
}
func (bcast *broadcaster) Address() ethcommon.Address {
if bcast.node == nil || bcast.node.Eth == nil {
if bcast.node == nil {
return ethcommon.Address{}
}
if (bcast.node.RemoteEthAddr != ethcommon.Address{}) {
return bcast.node.RemoteEthAddr
}
if bcast.node.Eth == nil {
return ethcommon.Address{}
}
return bcast.node.Eth.Account().Address
}
func (bcast *broadcaster) OrchInfoSig() []byte {
if bcast == nil || bcast.node == nil {
return nil
}
return bcast.node.InfoSig
}
func (bcast *broadcaster) ExtraNodes() int {
if bcast == nil || bcast.node == nil {
return 0
Expand Down
9 changes: 9 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/eth"
lpmon "github.com/livepeer/go-livepeer/monitor"

ethcommon "github.com/ethereum/go-ethereum/common"
)

var ErrTranscoderAvail = errors.New("ErrTranscoderUnavailable")
Expand All @@ -48,6 +50,7 @@ const (
TranscoderNode
RedeemerNode
AIWorkerNode
RemoteSignerNode
)

var nodeTypeStrs = map[NodeType]string{
Expand All @@ -57,6 +60,7 @@ var nodeTypeStrs = map[NodeType]string{
TranscoderNode: "transcoder",
RedeemerNode: "redeemer",
AIWorkerNode: "aiworker",
RemoteSignerNode: "remotesigner",
}

func (t NodeType) String() string {
Expand Down Expand Up @@ -144,6 +148,11 @@ type LivepeerNode struct {
Sender pm.Sender
ExtraNodes int

// Gateway fields for remote signers
RemoteSignerAddr *url.URL
RemoteEthAddr ethcommon.Address // eth address of the remote signer
InfoSig []byte // sig over eth address for the OrchestratorInfo request

// Thread safety for config fields
mu sync.RWMutex
StorageConfigs map[string]*transcodeConfig
Expand Down
1 change: 1 addition & 0 deletions discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type stubBroadcaster struct{}
func (s *stubBroadcaster) Sign(msg []byte) ([]byte, error) { return []byte{}, nil }
func (s *stubBroadcaster) Address() ethcommon.Address { return ethcommon.Address{} }
func (s *stubBroadcaster) ExtraNodes() int { return 0 }
func (s *stubBroadcaster) OrchInfoSig() []byte { return nil }

func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) {
assert := assert.New(t)
Expand Down
141 changes: 141 additions & 0 deletions server/remote_signer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package server

import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/core"
)

// SignOrchestratorInfo handles signing GetOrchestratorInfo requests for multiple orchestrators
func (ls *LivepeerServer) SignOrchestratorInfo(w http.ResponseWriter, r *http.Request) {
ctx := clog.AddVal(r.Context(), "request_id", string(core.RandomManifestID()))
remoteAddr := getRemoteAddr(r)
clog.Info(ctx, "Orch info signature request", "ip", remoteAddr)

// Get the broadcaster (signer)
// In remote signer mode, we may not have an OrchestratorPool, so create a broadcaster directly
gw := core.NewBroadcaster(ls.LivepeerNode)

// Create empty params for signing
params := GetOrchestratorInfoParams{}

// Generate the request (this creates the signature)
req, err := genOrchestratorReq(gw, params)
if err != nil {
clog.Errorf(ctx, "Failed to generate request: err=%q", err)
respondJsonError(ctx, w, err, http.StatusInternalServerError)
return
}

// Extract signature and format as hex
var (
signature = "0x" + hex.EncodeToString(req.Sig)
address = gw.Address().String()
)

results := map[string]string{
"address": address,
"signature": signature,
}

// Return JSON response
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(results)
}

// StartRemoteSignerServer starts the HTTP server for remote signer mode
func StartRemoteSignerServer(ls *LivepeerServer, bind string) error {
// Register the remote signer endpoint
ls.HTTPMux.Handle("POST /sign-orchestrator-info", http.HandlerFunc(ls.SignOrchestratorInfo))

// Start the HTTP server
glog.Info("Starting Remote Signer server on ", bind)
gw := core.NewBroadcaster(ls.LivepeerNode)
sig, err := gw.Sign([]byte(fmt.Sprintf("%v", gw.Address().Hex())))
if err != nil {
return err
}
ls.LivepeerNode.InfoSig = sig
srv := http.Server{
Addr: bind,
Handler: ls.HTTPMux,
IdleTimeout: HTTPIdleTimeout,
}
return srv.ListenAndServe()
}

// HexBytes represents a byte slice that marshals/unmarshals as hex with 0x prefix
type HexBytes []byte

func (h HexBytes) MarshalJSON() ([]byte, error) {
hexStr := "0x" + hex.EncodeToString(h)
return json.Marshal(hexStr)
}

func (h *HexBytes) UnmarshalJSON(data []byte) error {
var hexStr string
if err := json.Unmarshal(data, &hexStr); err != nil {
return err
}

// Remove 0x prefix if present
if len(hexStr) >= 2 && hexStr[:2] == "0x" {
hexStr = hexStr[2:]
}

// Decode hex string to bytes
decoded, err := hex.DecodeString(hexStr)
if err != nil {
return fmt.Errorf("invalid hex string: %v", err)
}

*h = decoded
return nil
}

// OrchInfoSigResponse represents the response from the remote signer
type OrchInfoSigResponse struct {
Address HexBytes `json:"address"`
Signature HexBytes `json:"signature"`
}

// Calls the remote signer service to get a signature for GetOrchInfo
func GetOrchInfoSig(remoteSignerHost *url.URL) (*OrchInfoSigResponse, error) {

url := remoteSignerHost.ResolveReference(&url.URL{Path: "/sign-orchestrator-info"})

// Create HTTP client with timeout
client := &http.Client{
Timeout: 30 * time.Second,
}

// Make the request
resp, err := client.Post(url.String(), "application/json", nil)
if err != nil {
return nil, fmt.Errorf("failed to call remote signer: %w", err)
}
defer resp.Body.Close()

// Check response status
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("remote signer returned status %d: %s", resp.StatusCode, string(body))
}

// Parse response
var signerResp OrchInfoSigResponse
if err := json.NewDecoder(resp.Body).Decode(&signerResp); err != nil {
return nil, fmt.Errorf("failed to parse remote signer response: %w", err)
}

return &signerResp, nil
}
6 changes: 1 addition & 5 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,7 @@ func startOrchestratorClient(ctx context.Context, uri *url.URL) (net.Orchestrato
}

func genOrchestratorReq(b common.Broadcaster, params GetOrchestratorInfoParams) (*net.OrchestratorRequest, error) {
sig, err := b.Sign([]byte(fmt.Sprintf("%v", b.Address().Hex())))
if err != nil {
return nil, err
}
return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: sig, Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil
return &net.OrchestratorRequest{Address: b.Address().Bytes(), Sig: b.OrchInfoSig(), Capabilities: params.Caps, IgnoreCapacityCheck: params.IgnoreCapacityCheck}, nil
}

func genEndSessionRequest(sess *BroadcastSession) (*net.EndTranscodingSessionRequest, error) {
Expand Down
Loading