Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,22 @@ jobs:
- name: Collect nohup logs
if: failure()
run: cat nohup.out || true

test-python-sdk:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
cd sdks/fs-python
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest
- name: Run Python SDK tests
run: |
cd sdks/fs-python
pytest

4 changes: 4 additions & 0 deletions sdks/fs-python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
venv
*.egg-info
**/__pycache__
build
190 changes: 190 additions & 0 deletions sdks/fs-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# FunctionStream Python SDK

FunctionStream SDK is a powerful Python library for building and deploying serverless functions that process messages from Apache Pulsar. It provides a simple yet flexible framework for creating event-driven applications with robust error handling, metrics collection, and resource management.

## Features

- **Easy Function Development**: Simple API for creating serverless functions
- **Message Processing**: Built-in support for Apache Pulsar message processing
- **Metrics Collection**: Automatic collection of performance metrics
- **Resource Management**: Efficient handling of connections and resources
- **Graceful Shutdown**: Proper cleanup of resources during shutdown
- **Configuration Management**: Flexible configuration through YAML files
- **Schema Validation**: Input and output schema validation
- **Error Handling**: Comprehensive error handling and logging

## Installation

```bash
pip install fs-sdk
```

## Quick Start

1. Create a function that processes messages:

```python
from fs_sdk import FSFunction

async def my_process_function(request_data: dict) -> dict:
# Process the request data
result = process_data(request_data)
return {"result": result}

# Initialize and run the function
function = FSFunction(
process_funcs={
'my_module': my_process_function
}
)

await function.start()
```

2. Create a configuration file (`config.yaml`):

```yaml
pulsar:
service_url: "pulsar://localhost:6650"
authPlugin: "" # Optional
authParams: "" # Optional

module: "my_module"
subscriptionName: "my-subscription"

requestSource:
- pulsar:
topic: "input-topic"

sink:
pulsar:
topic: "output-topic"
```

3. Define your function package (`package.yaml`):

```yaml
name: my_function
type: pulsar
modules:
my_module:
name: my_process
description: "Process incoming messages"
inputSchema:
type: object
properties:
data:
type: string
required:
- data
outputSchema:
type: object
properties:
result:
type: string
```

## Core Components

### FSFunction

The main class for creating serverless functions. It handles:
- Message consumption and processing
- Response generation
- Resource management
- Metrics collection
- Error handling

### Configuration

The SDK uses YAML configuration files to define:
- Pulsar connection settings
- Module selection
- Topic subscriptions
- Input/output topics
- Custom configuration parameters

### Metrics

Built-in metrics collection for:
- Request processing time
- Success/failure rates
- Message throughput
- Resource utilization

## Examples

Check out the `examples` directory for complete examples:

- `string_function.py`: A simple string processing function
- `test_string_function.py`: Test client for the string function
- `config.yaml`: Example configuration
- `package.yaml`: Example package definition

## Best Practices

1. **Error Handling**
- Always handle exceptions in your process functions
- Use proper logging for debugging
- Implement graceful shutdown

2. **Resource Management**
- Close resources properly
- Use context managers when possible
- Monitor resource usage

3. **Configuration**
- Use environment variables for sensitive data
- Validate configuration values
- Document configuration options

4. **Testing**
- Write unit tests for your functions
- Test error scenarios
- Validate input/output schemas

## Development

### Prerequisites

- Python 3.7+
- Apache Pulsar
- pip

### Setup Development Environment

```bash
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
.\venv\Scripts\activate # Windows

# Install dependencies
pip install -r requirements.txt

# Install the package in development mode
pip install -e .
```

### Running Tests

```bash
pytest
```

## Contributing

1. Fork the repository
2. Create a feature branch
3. Commit your changes
4. Push to the branch
5. Create a Pull Request

## License

This project is licensed under the MIT License - see the LICENSE file for details.

## Support

For support, please open an issue in the GitHub repository or contact the maintainers.
33 changes: 33 additions & 0 deletions sdks/fs-python/examples/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# FunctionStream Configuration File
# This configuration file defines the settings for the string processing function example.

pulsar:
service_url: "pulsar://localhost:6650" # Required: URL of the Pulsar broker
authPlugin: "" # Optional: Authentication plugin class name
authParams: "" # Optional: Authentication parameters

module: "string" # Required: Name of the module to use for processing

# Optional: List of source topics to consume from
# Note: Either sources or requestSource must be specified
sources:
- pulsar: # SourceSpec structure with pulsar configuration
topic: "topic-a" # Topic name for regular message consumption

# Required: Name of the subscription for the consumer
subscriptionName: "test-sub"

# Optional: List of request source topics
requestSource:
- pulsar: # SourceSpec structure with pulsar configuration
topic: "string-topic" # Topic name for request messages

# Optional: Output sink configuration
sink:
pulsar: # SinkSpec structure with pulsar configuration
topic: "output" # Topic name for output messages

# Optional: Additional configuration parameters
config:
- test: "Hello from config" # Example configuration value
- test2: "Another config value" # Another example configuration value
28 changes: 28 additions & 0 deletions sdks/fs-python/examples/package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# FunctionStream Package Configuration
# This file defines the package metadata and function specifications for deployment.

# Package name and type
name: my_function # Name of the function package
type: pulsar # Type of message broker to use

# Module definitions
modules:
string: # Module name
name: string_process # Function name
description: "Appends an exclamation mark to the input string" # Function description

# Input schema definition
inputSchema:
type: object
properties:
text: # Input parameter
type: string # Parameter type
required:
- text # Required parameter

# Output schema definition
outputSchema:
type: object
properties:
result: # Output parameter
type: string # Parameter type
85 changes: 85 additions & 0 deletions sdks/fs-python/examples/string_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
String Processing Function Example

This module demonstrates a simple string processing function that appends an exclamation mark
to the input text. It serves as a basic example of how to create and run a FunctionStream
serverless function.

The function:
1. Receives a request containing a text field
2. Appends an exclamation mark to the text
3. Returns the modified text in a response

This example shows the basic structure of a FunctionStream function, including:
- Function definition and implementation
- FSFunction initialization
- Service startup and graceful shutdown
- Error handling
"""

import asyncio
import sys
import os

from fs_sdk import FSFunction

async def string_process_function(request_data: dict) -> dict:
"""
Process a string by appending an exclamation mark.

This function demonstrates a simple string transformation that can be used
as a building block for more complex text processing pipelines.

Args:
request_data (dict): Request data containing a 'text' field with the input string

Returns:
dict: Response containing the processed string with an exclamation mark appended

Example:
Input: {"text": "Hello"}
Output: {"result": "Hello!"}
"""
# Extract the input text from the request data
text = request_data.get('text', '')

# Append an exclamation mark to the text
result = f"{text}!"

# Log the result for debugging purposes
print(f"Result: {result}")

return {"result": result}

async def main():
"""
Main function to initialize and run the string processing service.

This function:
1. Creates an FSFunction instance with the string processing function
2. Starts the service
3. Handles graceful shutdown and error cases
"""
# Initialize the FunctionStream function with our string processor
function = FSFunction(
process_funcs={
'string': string_process_function
}
)

try:
print("Starting string processing function service...")
await function.start()
except asyncio.CancelledError:
print("\nInitiating graceful shutdown...")
except Exception as e:
print(f"\nAn error occurred: {e}")
finally:
await function.close()

if __name__ == "__main__":
try:
# Run the main function in an asyncio event loop
asyncio.run(main())
except KeyboardInterrupt:
print("\nService stopped")
Loading
Loading