"""APIClient - Agent — OpenAPI / Swagger spec parser. Parses OpenAPI 3.x and Swagger 2.0 specs (JSON or YAML) directly, without needing AI tokens. """ import json import re def _try_yaml(text: str) -> dict | None: try: import yaml return yaml.safe_load(text) except Exception: return None def _try_json(text: str) -> dict | None: try: return json.loads(text) except Exception: return None def detect_spec(text: str) -> dict | None: """Try to parse text as OpenAPI/Swagger JSON or YAML. Returns raw dict or None.""" data = _try_json(text) or _try_yaml(text) if not isinstance(data, dict): return None if "openapi" in data or "swagger" in data: return data return None def parse_spec(data: dict) -> dict: """ Parse an OpenAPI 3.x or Swagger 2.0 spec dict into EKIKA's internal format: { "collection_name": str, "base_url": str, "auth_type": str, "endpoints": [...], "environment_variables": {...} } """ version = str(data.get("openapi", data.get("swagger", "2"))) is_v3 = version.startswith("3") # Collection name info = data.get("info", {}) collection_name = info.get("title", "Imported API") # Base URL if is_v3: servers = data.get("servers", []) base_url = servers[0].get("url", "") if servers else "" else: host = data.get("host", "") schemes = data.get("schemes", ["https"]) base_p = data.get("basePath", "/") base_url = f"{schemes[0]}://{host}{base_p}" if host else "" # Clean trailing slash base_url = base_url.rstrip("/") # Auth detection security_schemes = {} if is_v3: security_schemes = data.get("components", {}).get("securitySchemes", {}) else: security_schemes = data.get("securityDefinitions", {}) auth_type = "none" for scheme in security_schemes.values(): t = scheme.get("type", "").lower() if t in ("http", "bearer") or scheme.get("scheme", "").lower() == "bearer": auth_type = "bearer" break if t == "apikey": auth_type = "apikey" break if t in ("basic", "http") and scheme.get("scheme", "").lower() == "basic": auth_type = "basic" break # Endpoints endpoints = [] paths = data.get("paths", {}) for path, path_item in paths.items(): if not isinstance(path_item, dict): continue for method in ("get", "post", "put", "patch", "delete", "head", "options"): op = path_item.get(method) if not isinstance(op, dict): continue name = op.get("summary") or op.get("operationId") or f"{method.upper()} {path}" description = op.get("description", "") # Headers headers: dict = {} # Query params params: dict = {} body_example = "" content_type = "application/json" body_type = "raw" # Parameters for param in op.get("parameters", []): if not isinstance(param, dict): continue p_in = param.get("in", "") p_name = param.get("name", "") if p_in == "query": params[p_name] = param.get("example", "") elif p_in == "header": headers[p_name] = param.get("example", "") # Request body (OpenAPI 3) if is_v3 and "requestBody" in op: rb = op["requestBody"] content = rb.get("content", {}) if "application/json" in content: schema = content["application/json"].get("schema", {}) body_example = _schema_to_example_str(schema) content_type = "application/json" elif "application/x-www-form-urlencoded" in content: body_type = "form-urlencoded" content_type = "" elif content: first_ct = next(iter(content)) content_type = first_ct # Request body (Swagger 2) if not is_v3: consumes = op.get("consumes", data.get("consumes", ["application/json"])) for param in op.get("parameters", []): if param.get("in") == "body": schema = param.get("schema", {}) body_example = _schema_to_example_str(schema) if consumes: content_type = consumes[0] # Add auth header hint if auth_type == "bearer": headers.setdefault("Authorization", "Bearer {{token}}") elif auth_type == "apikey": headers.setdefault("X-API-Key", "{{api_key}}") # Basic test script test_script = ( f"pm.test('Status OK', lambda: pm.response.to_have_status(200))\n" f"pm.test('Has body', lambda: expect(pm.response.text).to_be_truthy())" ) endpoints.append({ "name": name, "method": method.upper(), "path": path, "description": description, "headers": headers, "params": params, "body": body_example, "body_type": body_type, "content_type": content_type, "test_script": test_script, }) # Environment variables env_vars: dict = {} if base_url: env_vars["base_url"] = base_url if auth_type == "bearer": env_vars["token"] = "" elif auth_type == "apikey": env_vars["api_key"] = "" elif auth_type == "basic": env_vars["username"] = "" env_vars["password"] = "" return { "collection_name": collection_name, "base_url": base_url, "auth_type": auth_type, "endpoints": endpoints, "environment_variables": env_vars, } def _schema_to_example_str(schema: dict) -> str: """Generate a compact JSON example string from an OpenAPI schema.""" try: example = _schema_to_example(schema) return json.dumps(example, indent=2, ensure_ascii=False) except Exception: return "" def _schema_to_example(schema: dict, depth: int = 0) -> object: if depth > 5: return {} if not isinstance(schema, dict): return {} # Use provided example first if "example" in schema: return schema["example"] if "default" in schema: return schema["default"] t = schema.get("type", "object") if t == "object" or "properties" in schema: result = {} for k, v in schema.get("properties", {}).items(): result[k] = _schema_to_example(v, depth + 1) return result if t == "array": items = schema.get("items", {}) return [_schema_to_example(items, depth + 1)] if t == "string": fmt = schema.get("format", "") if fmt == "date-time": return "2024-01-01T00:00:00Z" if fmt == "date": return "2024-01-01" if fmt == "email": return "user@example.com" if fmt == "uuid": return "00000000-0000-0000-0000-000000000000" return schema.get("enum", ["string"])[0] if t == "integer": return 0 if t == "number": return 0.0 if t == "boolean": return True return {}