@@ -36,6 +36,7 @@ import (
3636 "github.com/kubeflow/pipelines/backend/src/common/util"
3737 k8sapi "github.com/kubeflow/pipelines/backend/src/crd/kubernetes/v2beta1"
3838 "github.com/minio/minio-go/v7"
39+ "github.com/minio/minio-go/v7/pkg/credentials"
3940 "gorm.io/driver/mysql"
4041 "gorm.io/driver/postgres"
4142 "gorm.io/gorm"
@@ -966,6 +967,18 @@ func initBlobObjectStore(ctx context.Context, initConnectionTimeout time.Duratio
966967 glog .Fatalf ("Failed to open blob storage bucket: %v" , err )
967968 }
968969
970+ // For MinIO, ensure the bucket exists (create if it doesn't)
971+ // This is needed because MinIO doesn't auto-create buckets like SeaweedFS does
972+ if bucketName != "" && config .SessionInfo != nil {
973+ switch config .SessionInfo .Provider {
974+ case "minio" , "s3" :
975+ // Try to create bucket using the existing MinIO client
976+ if err := ensureMinioBucketExists (ctx , config , bucketName , k8sClient ); err != nil {
977+ glog .Warningf ("Failed to ensure MinIO bucket exists (may already exist): %v" , err )
978+ }
979+ }
980+ }
981+
969982 glog .Infof ("Successfully initialized blob storage for bucket: %s" , bucketName )
970983 return storage .NewBlobObjectStore (bucket , pipelinePath )
971984}
@@ -1153,6 +1166,66 @@ func createMinioBucket(ctx context.Context, minioClient *minio.Client, bucketNam
11531166 glog .Infof ("Successfully created bucket %s\n " , bucketName )
11541167}
11551168
1169+ // ensureMinioBucketExists creates a MinIO bucket if it doesn't exist, using configuration
1170+ func ensureMinioBucketExists (ctx context.Context , config * objectstore.Config , bucketName string , k8sClient kubernetes.Interface ) error {
1171+ // Create MinIO client using the same configuration as the blob storage
1172+ host := common .GetStringConfigWithDefault ("ObjectStoreConfig.Host" , "" )
1173+ port := common .GetStringConfigWithDefault ("ObjectStoreConfig.Port" , "" )
1174+ secure := common .GetBoolConfigWithDefault ("ObjectStoreConfig.Secure" , false )
1175+ region := common .GetStringConfigWithDefault ("ObjectStoreConfig.Region" , "" )
1176+ accessKey := common .GetStringConfigWithDefault ("ObjectStoreConfig.AccessKey" , "" )
1177+ secretKey := common .GetStringConfigWithDefault ("ObjectStoreConfig.SecretAccessKey" , "" )
1178+
1179+ // Default region for MinIO
1180+ if region == "" {
1181+ region = "us-east-1"
1182+ }
1183+
1184+ // Try to get credentials from Kubernetes secret if not in env
1185+ if k8sClient != nil && accessKey == "" && secretKey == "" {
1186+ secretNamespace := common .GetPodNamespace ()
1187+ if secretNamespace == "" {
1188+ secretNamespace = "kubeflow"
1189+ }
1190+
1191+ secret , err := k8sClient .CoreV1 ().Secrets (secretNamespace ).Get (ctx , "mlpipeline-minio-artifact" , metav1.GetOptions {})
1192+ if err == nil {
1193+ if accessKeyBytes , ok := secret .Data ["accesskey" ]; ok {
1194+ accessKey = string (accessKeyBytes )
1195+ }
1196+ if secretKeyBytes , ok := secret .Data ["secretkey" ]; ok {
1197+ secretKey = string (secretKeyBytes )
1198+ }
1199+ }
1200+ }
1201+
1202+ if accessKey == "" || secretKey == "" {
1203+ return fmt .Errorf ("MinIO credentials not available" )
1204+ }
1205+
1206+ // Build MinIO endpoint
1207+ endpoint := host
1208+ if port != "" {
1209+ endpoint = fmt .Sprintf ("%s:%s" , host , port )
1210+ }
1211+ if endpoint == "" {
1212+ return fmt .Errorf ("MinIO host not configured" )
1213+ }
1214+
1215+ // Create MinIO client
1216+ minioClient , err := minio .New (endpoint , & minio.Options {
1217+ Creds : credentials .NewStaticV4 (accessKey , secretKey , "" ),
1218+ Secure : secure ,
1219+ })
1220+ if err != nil {
1221+ return fmt .Errorf ("failed to create MinIO client: %w" , err )
1222+ }
1223+
1224+ // Use the existing createMinioBucket function
1225+ createMinioBucket (ctx , minioClient , bucketName , region )
1226+ return nil
1227+ }
1228+
11561229func initLogArchive () (logArchive archive.LogArchiveInterface ) {
11571230 logFileName := common .GetStringConfigWithDefault (archiveLogFileName , "" )
11581231 logPathPrefix := common .GetStringConfigWithDefault (archiveLogPathPrefix , "" )
0 commit comments