Saving custom data into Log Analytics from a Go application

In a recent hobby project, I needed a database to store my data in. The use case was to store data produced by user in an append only fashion, with no real editing capabilities required. I decided to learn more about Log Analytics that seemingly fit the bill.

Of course you could use anything for this purpose, but I had recently worked with Azure Workbooks I've described in my previous posts here, here and here, and thought I could be lazy and use them again to build the visualizations on top of the data later.

Creating the resources with Bicep

So first we need to create a log analytics workspace. This is very simple. Full code can be found here.

I often like to output all relevant data from my bicep modules.

// lawbase.bicep
param logAnalyticsName string
param location string

resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2022-10-01' = {
  name: logAnalyticsName
  location: location
  properties: {
    sku: {
      name: 'PerGB2018'
    }
    retentionInDays: 60
  }
}

output logAnalyticsName string = logAnalytics.name
output logAnalyticsId string = logAnalytics.id
output logAnalyticsCustomerId string = logAnalytics.properties.customerId

Next, we need to create four things. Full code

Structure for our custom data, and a custom table in the log analytics
I did not end up having to change my data schema, but theoretically you could make those changes here.

A TimeGenerated column is always needed. My data also has a timestamp value built in, and I'm later getting the TimeGenerated value from it in the rule. The reason why I can't use TimeGenerated in my data is that (while I cannot find the link now) the endpoint does not accept values for TimeGenerated that are older than a couple of days, and I needed to generate test data for longer periods in the past. If you try to pass in old data for TimeGenerated, it will not fail, but instead just replace the value with a newer one. Kinda dumb.

@description('Short name for the table, used for the stream name and table name. Should not contain the _CL ending. The template will handle that.')
param painTableShortName string = 'PainDescriptions'
var realTableName = '${painTableShortName}_CL' // Always needs to end in _CL

var tableSchema = [
  {
    name: 'timestamp'
    type: 'datetime'
  }
  {
    name: 'level'
    type: 'int'
  }
  {
    name: 'locationId'
    type: 'int'
  }
  {
    name: 'sideId'
    type: 'int'
  }
  {
    name: 'description'
    type: 'string'
  }
  {
    name: 'numbness'
    type: 'boolean'
  }
  {
    name: 'numbnessDescription'
    type: 'string'
  }
  {
    name: 'locationName'
    type: 'string'
  }
  {
    name: 'sideName'
    type: 'string'
  }
  {
    name: 'userName'
    type: 'string'
  }
]

resource customTable 'Microsoft.OperationalInsights/workspaces/tables@2022-10-01' = {
  name: realTableName
  parent: logAnalytics
  properties: {
    plan: 'Analytics'
    retentionInDays: 730
    totalRetentionInDays: 2556
    schema: {
      name: realTableName
      columns: union(tableSchema, [ {
            name: 'TimeGenerated'
            type: 'datetime'
          } ])
    }
  }
}

A Data Collection Endpoint:
A Data Collection Endpoint is the target your application will send data to.

// data.bicep
param dataCollectionEndpointName string

resource dataCollectionEndpoint 'Microsoft.Insights/dataCollectionEndpoints@2022-06-01' = {
  name: dataCollectionEndpointName
  location: location
  properties: {
    networkAcls: {
      publicNetworkAccess: 'Enabled'
    }
  }
}

// I'm also saving the endpoint url to a key vault. You'll need this for the app.
resource dataCollectionEndpointSecret 'Microsoft.KeyVault/vaults/secrets@2023-02-01' = {
  parent: keyVault
  name: 'dataCollectionEndpoint'
  properties: {
    value: dataCollectionEndpoint.properties.logsIngestion.endpoint
  }
}

A Data Collection Rule:
A Data Collection Rule specifies from which Stream to get the data from. This is identified by the custom table name. The rule also specifies the target Log Analytics Workspace and a KQL transformation clause. If your data does not need to change at all, you could just use "source", but as my data does not have the TimeGenerated field initially I need to extend it from the timestamp column of my data.

param dataCollectionRuleName string
var dataCollectionStreamName = 'Custom-${customTable.name}' // might always require the Custom- prefix, can't remember 100%

resource dataCollectionRule 'Microsoft.Insights/dataCollectionRules@2022-06-01' = {
  name: dataCollectionRuleName
  location: location
  properties: {
    destinations: {
      logAnalytics: [
        {
          workspaceResourceId: logAnalytics.id
          name: guid(logAnalytics.id)
        }
      ]
    }
    dataCollectionEndpointId: dataCollectionEndpoint.id
    dataFlows: [
      {
        streams: [
          dataCollectionStreamName
        ]
        destinations: [
          guid(logAnalytics.id)
        ]
        outputStream: dataCollectionStreamName
        transformKql: 'source | extend TimeGenerated = timestamp'
      }
    ]
    streamDeclarations: {
      '${dataCollectionStreamName}': {
        columns: tableSchema
      }
    }
  }
}

// The rule has an immutable ID that you will need when sending data to it
resource dataCollectionRuleIdSecret 'Microsoft.KeyVault/vaults/secrets@2023-02-01' = {
  parent: keyVault
  name: 'dataCollectionRuleId'
  properties: {
    value: dataCollectionRule.properties.immutableId
  }
}

// You'll also need the stream name
resource dataCollectionStreamNameSecret 'Microsoft.KeyVault/vaults/secrets@2023-02-01' = {
  parent: keyVault
  name: 'dataCollectionStreamName'
  properties: {
    value: dataCollectionStreamName
  }
}

And permissions to publish logs to the endpoint
My Azure AD account is a member of this group. You could arguably also just pass in your own object id.

// data.bicep
param developerGroupObjectId string

resource dataCollectionRulePublisherGroup 'Microsoft.Authorization/roleAssignments@2020-04-01-preview' = {
  name: guid(developerGroupObjectId, dataCollectionEndpoint.id)
  scope: dataCollectionRule
  properties: {
    principalId: developerGroupObjectId
    // Monitoring Metrics Publisher
    roleDefinitionId: subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '3913510d-42f4-4e42-8a64-420c390055eb')
  }
}

After the creation of these resources, it took quite a long time, maybe around 15-30 minutes for the ingestion endpoints to function.

Publishing data from the Go application

Now that we have that set up, we should set the the corresponding values in our environment variables and load them to create a new client.

export DATA_COLLECTION_ENDPOINT=myendpoint
export DATA_COLLECTION_RULE_ID=myruleid
export DATA_COLLECTION_STREAM_NAME=mystreamname
// main.go
client, err := database.NewLogAnalyticsClient(
    os.Getenv("DATA_COLLECTION_ENDPOINT_LIVE"),
    os.Getenv("DATA_COLLECTION_RULE_ID_LIVE"),
    os.Getenv("DATA_COLLECTION_STREAM_NAME"))

Install the required packages.

go get -u github.com/Azure/azure-sdk-for-go/sdk/azcore
go get -u github.com/Azure/azure-sdk-for-go/sdk/azidentity
go get -u github.com/Azure/azure-sdk-for-go/sdk/monitor/azingest

Create the client.
Full code

// loganalytics.go 
type LogAnalyticsClient struct {
	client     AzureClient
	ruleId     string
	streamName string
}

func NewLogAnalyticsClient(endpoint, ruleId, streamName string, opts ...LogAnalyticsClientOption) (*LogAnalyticsClient, error) {
	options := &LogAnalyticsClientOptions{}
	for _, opt := range opts {
		opt(options)
	}

	var cred azcore.TokenCredential
	var err error
	if options.CustomCredential != nil {
		cred = options.CustomCredential
	} else {
		cred, err = getCredential()
		if err != nil {
			return nil, fmt.Errorf("unable to get credential: %w", err)
		}
	}

	var client AzureClient
	if options.CustomClient != nil {
		client = options.CustomClient
	} else {
		azClient, err := azingest.NewClient(endpoint, cred, nil)
		if err != nil {
			return nil, fmt.Errorf("unable to create client: %w", err)
		}
		client = azClient
	}

	return &LogAnalyticsClient{
		client:     client,
		ruleId:     ruleId,
		streamName: streamName,
	}, nil
}

type LogAnalyticsClientOptions struct {
	CustomCredential azcore.TokenCredential
	CustomClient     AzureClient
}

type LogAnalyticsClientOption func(*LogAnalyticsClientOptions)

func getCredential() (azcore.TokenCredential, error) {
	var cred azcore.TokenCredential
	var err error

	userAssignedId := os.Getenv("AZURE_CLIENT_ID")
	if userAssignedId != "" {
		cred, err = azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{
			ID: azidentity.ClientID(userAssignedId),
		})
		if err != nil {
			return nil, fmt.Errorf("unable to get managed identity credential: %w", err)
		}
	} else {
		cred, err = azidentity.NewDefaultAzureCredential(nil)
		if err != nil {
			return nil, fmt.Errorf("unable to get default credential: %w", err)
		}
	}
	return cred, nil

}

Lastly, we just need to save the data by calling the client's Upload method. I have a custom LogEntry struct I'm using here.

In a production application you should of course create a context with a timeout that matches your requirements.

type PainDescription struct {
	Timestamp           time.Time `json:"timestamp,omitempty"`
	Level               int       `json:"level"`
	LocationId          int       `json:"locationId"`
	SideId              int       `json:"sideId"`
	Description         string    `json:"description"`
	Numbness            bool      `json:"numbness"`
	NumbnessDescription string    `json:"numbnessDescription,omitempty"`
}

type PainDescriptionLogEntry struct {
	PainDescription
	LocationName string `json:"locationName"`
	SideName     string `json:"sideName"`
	UserName     string `json:"userName"`
}

func (lac *LogAnalyticsClient) SavePainDescriptionsToLogAnalytics(pd []models.PainDescriptionLogEntry) error {

	logs, err := json.Marshal(pd)
	if err != nil {
		return fmt.Errorf("unable to marshal pain descriptions: %w", err)
	}

	_, err = lac.client.Upload(context.Background(), lac.ruleId, lac.streamName, logs, nil)
	if err != nil {
		return fmt.Errorf("unable to upload logs: %w", err)
	}

	return nil
}

And that's it! You can also find a test data generator for my app's requirements here as an example.